You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2015/12/17 21:46:21 UTC

[19/26] incubator-metron git commit: replace opensoc-steaming version 0.4BETA with 0.6BETA 8e7a6b4ad9febbc4ea47ba7810c42cc94d4dee37

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-EnrichmentAdapters/pom.xml
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-EnrichmentAdapters/pom.xml b/opensoc-streaming/OpenSOC-EnrichmentAdapters/pom.xml
index 2f52ad8..fb21130 100644
--- a/opensoc-streaming/OpenSOC-EnrichmentAdapters/pom.xml
+++ b/opensoc-streaming/OpenSOC-EnrichmentAdapters/pom.xml
@@ -15,22 +15,24 @@
 	<parent>
 		<groupId>com.opensoc</groupId>
 		<artifactId>OpenSOC-Streaming</artifactId>
-		<version>0.3BETA-SNAPSHOT</version>
+		<version>0.6BETA</version>
 	</parent>
 	<artifactId>OpenSOC-EnrichmentAdapters</artifactId>
 
 	<properties>
+       <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>		
 		<mysql.version>5.1.31</mysql.version>
 		<slf4j.version>1.7.7</slf4j.version>
 		<hbase.client.version>0.96.1-hadoop2</hbase.client.version>
-		<storm.hdfs.version>0.1.2</storm.hdfs.version>	
+		<storm.hdfs.version>0.1.2</storm.hdfs.version>
 		<guava.version>17.0</guava.version>
 	</properties>
 	<dependencies>
 		<dependency>
 			<groupId>com.opensoc</groupId>
 			<artifactId>OpenSOC-Common</artifactId>
-			<version>${parent.version}</version>
+			<version>${project.parent.version}</version>
 		</dependency>
 		<dependency>
 			<groupId>org.slf4j</groupId>
@@ -62,12 +64,24 @@
 			<groupId>org.apache.hadoop</groupId>
 			<artifactId>hadoop-hdfs</artifactId>
 			<version>${global_hadoop_version}</version>
+			  <exclusions>
+				<exclusion>
+				   <artifactId>servlet-api</artifactId>
+				   <groupId>javax.servlet</groupId>
+				  </exclusion>
+		    </exclusions>					
 		</dependency>
 		<dependency>
 			<groupId>org.apache.storm</groupId>
 			<artifactId>storm-core</artifactId>
 			<version>${global_storm_version}</version>
 			<scope>provided</scope>
+			  <exclusions>
+				<exclusion>
+				   <artifactId>servlet-api</artifactId>
+				   <groupId>javax.servlet</groupId>
+				  </exclusion>
+		    </exclusions>					
 		</dependency>
 		<dependency>
 			<groupId>com.google.guava</groupId>
@@ -78,56 +92,74 @@
 			<groupId>org.apache.hadoop</groupId>
 			<artifactId>hadoop-common</artifactId>
 			<version>${global_hadoop_version}</version>
+			<exclusions>
+				<exclusion>
+				   <artifactId>servlet-api</artifactId>
+				   <groupId>javax.servlet</groupId>
+				  </exclusion>
+		    </exclusions>			
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>${global_junit_version}</version>
+		</dependency>
+		<dependency>
+			<groupId>commons-validator</groupId>
+			<artifactId>commons-validator</artifactId>
+			<version>1.4.0</version>
 		</dependency>
-  		<dependency>
-  			<groupId>junit</groupId>
-  			<artifactId>junit</artifactId>
-  			<version>3.8.2</version>
-  		</dependency>	
-  		<dependency>
-  	    <groupId>commons-validator</groupId>
-    <artifactId>commons-validator</artifactId>
-    <version>1.4.0</version>
-    </dependency>	
-  		
-  			
+
 	</dependencies>
-   <reporting>
-    <plugins>
-     <plugin>
-     <groupId>org.apache.maven.plugins</groupId>
-     <artifactId>maven-surefire-plugin</artifactId>
-     	<configuration>
-	   		<systemProperties>
-	   		    <property>
-	   		         <name>mode</name>
-	   		         <value>local</value>
-	   		    </property>
-	   		</systemProperties>
-		</configuration>
-     </plugin>
-	<!-- Normally, dependency report takes time, skip it -->
-      <plugin>
-		<groupId>org.apache.maven.plugins</groupId>
-		<artifactId>maven-project-info-reports-plugin</artifactId>
-		<version>2.7</version>
-	 
-		<configuration>
-	          <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
-		</configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>emma-maven-plugin</artifactId>
-        <version>1.0-alpha-3</version>
-      </plugin>    
-      <plugin>
-          <groupId>org.apache.maven.plugins</groupId>
-          <artifactId>maven-pmd-plugin</artifactId>
-          <configuration>
-            <targetJdk>1.7</targetJdk>
-	  </configuration>
-        </plugin>        
-    </plugins>
-  </reporting>  	
+	<reporting>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<systemProperties>
+						<property>
+							<name>mode</name>
+							<value>global</value>
+						</property>
+					</systemProperties>
+				</configuration>
+			</plugin>
+			<!-- Normally, dependency report takes time, skip it -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-project-info-reports-plugin</artifactId>
+				<version>2.7</version>
+
+				<configuration>
+					<dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>emma-maven-plugin</artifactId>
+				<version>1.0-alpha-3</version>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-pmd-plugin</artifactId>
+				<configuration>
+					<targetJdk>1.7</targetJdk>
+				</configuration>
+			</plugin>
+		</plugins>
+	</reporting>
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.1</version>
+				<configuration>
+					<source>1.7</source>
+					<target>1.7</target>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-EnrichmentAdapters/readme.md
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-EnrichmentAdapters/readme.md b/opensoc-streaming/OpenSOC-EnrichmentAdapters/readme.md
new file mode 100644
index 0000000..7c08218
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-EnrichmentAdapters/readme.md
@@ -0,0 +1,125 @@
+#OpenSOC-Enrichments
+
+##Module Description
+
+This module enables enrichment of message metafields with additional information from various enrichment sources.  Currently there is only a limited number of enrichments available, but this is an extensible framework that can be extended with additional enrichments.  Enrichments currently available are geo, whois, hosts, and CIF.
+
+##Message Format
+
+Enrichment bolts are designed to go after the parser bolts.  Parser bolts will parse the telemetry, taking it from its native format and producing a standard JSON that would look like so:
+
+```json
+{
+"message": 
+{"ip_src_addr": xxxx, 
+"ip_dst_addr": xxxx, 
+"ip_src_port": xxxx, 
+"ip_dst_port": xxxx, 
+"protocol": xxxx, 
+"additional-field 1": xxx,
+}
+
+}
+```
+
+A single enrichment bolt would enrich the message and produce a JSON enrichment and attach it to the message.  Enrichments are stackable so multiple enrichments can be attached sequentially after a single parser bolt.  Stacked enrichments would produce messages under the "enrichment" tag and attach it to the message like so:
+
+```json
+{
+"message": 
+{"ip_src_addr": xxxx, 
+"ip_dst_addr": xxxx, 
+"ip_src_port": xxxx, 
+"ip_dst_port": xxxx, 
+"protocol": xxxx, 
+"additional-field 1": xxxx,
+},
+"enrichment" : {"geo": xxxx, "whois": xxxx, "hosts": xxxxx, "CIF": "xxxxx"}
+
+}
+```
+
+##Enrichment Sources
+
+Each enrichment has to have an anrichment source which can serve as a lookup table for enriching relevant message fields.  In order to minimize the use of additional platforms and tools we primarily try to rely on HBase as much as possible to store the enrichment information for lookup by key.  In order to use Hbase we have to pre-process the enrichment feeds for bulk-loading into HBase with specific key format optimized for retrieval as well as utilize caches within the enrichment bolts to be able to provide enrichments real-time.  Our wiki contains information on how to setup the environment, pre-process feeds, and plug in the enrichment sources.
+
+##Enrichment Bolt
+
+The enrichment bolt is designed to be extensible to be re-used for all kinds of enrichment processes.  The bolt signature for declaration in a storm topology is as follows:
+
+
+
+```
+GenericEnrichmentBolt geo_enrichment = new GenericEnrichmentBolt()
+.withEnrichmentTag(
+config.getString("bolt.enrichment.geo.enrichment_tag"))
+.withAdapter(geo_adapter)
+.withMaxTimeRetain(
+config.getInt("bolt.enrichment.geo.MAX_TIME_RETAIN_MINUTES"))
+.withMaxCacheSize(
+config.getInt("bolt.enrichment.geo.MAX_CACHE_SIZE_OBJECTS_NUM"))
+.withKeys(geo_keys).withMetricConfiguration(config);
+
+```
+
+EnrichmentTag - Name of the enrichment (geo, whois, hosts, etc)
+Keys - Keys which this enrichment is able to enrich (hosts field for hosts enrichment, source_ip, dest_ip, for geo enrichment, etc)
+MaxTimeToRetain & MaxCacheSize - define the caching policy of the enrichment bolt
+Adapter - which adapter to use with the enrichment bolt instance
+
+###Geo Adapter
+Geo adapter is able to do geo enrichment on hosts and destination IPs.  The open source verison of the geo adapter uses the free Geo feeds from MaxMind.  The format of these feeds does not easily lend itself to a no-sql DB so this adapter is designed to work with mySql.  But it is extensible enough to be made work with a variety of other back ends.
+
+The signature of a geo adapter is as follows;
+
+```
+GeoMysqlAdapter geo_adapter = new GeoMysqlAdapter(
+config.getString("mysql.ip"), config.getInt("mysql.port"),
+config.getString("mysql.username"),
+config.getString("mysql.password"),
+config.getString("bolt.enrichment.geo.adapter.table"));
+
+```
+
+###Hosts Adapter
+The hosts adapter is designed to enrich message format with the static host information that can be read from a standard text file.  This adapter is intended for use with a network crawling script that can identify all customer assets and place them in a text file.  For example, this script would identify all workstations, printers, appliantces, etc.  Then if any of these assets are seen in the telemetry messages flowing through the adapter this enrichment would fire and the relevant known information about a host would be attached.  We are currently working on porting this adapter to work with HBase, but this work is not ready yet.  The known hosts file is located under the /etc/whitelists config directory of OpenSOC.
+
+The signature of the hosts adapter is as follows:
+
+```
+Map<String, JSONObject> known_hosts = SettingsLoader
+.loadKnownHosts(hosts_path);
+
+HostFromPropertiesFileAdapter host_adapter = new HostFromPropertiesFileAdapter(
+known_hosts);
+
+```
+* The source and dest ips refer to the name of the message JSON key where the host information is located
+
+###Whois Adapter
+Whois adapter enriches the host name with additional whois information obtained from our proprietary Cisco feed.  The enricher itself is provided in this open source distribution, but the feed is not.  You have to have your own feed in order to use it.  Alternatively, you can contact us for providing you with this feed, but we would have to charge you a fee (we can't distribute it for free). The implemetation of the whois enrichment we provide works with HBase
+
+The signature of the whois adapter is as follows:
+
+```
+
+EnrichmentAdapter whois_adapter = new WhoisHBaseAdapter(
+config.getString("bolt.enrichment.whois.hbase.table.name"),
+config.getString("kafka.zk.list"),
+config.getString("kafka.zk.port"));
+```
+
+###CIF Adapter
+CIF adapter is designed to take in CIF feeds and cross-reference them against every message processed by Storm.  If there is a hit then the relevant information is attached to the message.  
+
+The signature of the CIF adapter is as follows:
+
+```
+CIFHbaseAdapter = new CIFHbaseAdapter(config
+.getString("kafka.zk.list"), config
+.getString("kafka.zk.port"), config
+.getString("bolt.enrichment.cif.tablename")))
+```
+
+##Stacking Enrichments
+Enrichments can be stacked.  By default each enrichment bolt listens on the "message" stream.  In order to create and stack enrichment bolts create a new bolt and instantiate the appropariate adapter.  You can look at our sample topologies to see how enrichments can be stacked
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/adapters/geo/GeoMysqlAdapter.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/adapters/geo/GeoMysqlAdapter.java b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/adapters/geo/GeoMysqlAdapter.java
index cfb6673..d62632b 100644
--- a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/adapters/geo/GeoMysqlAdapter.java
+++ b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/adapters/geo/GeoMysqlAdapter.java
@@ -35,7 +35,7 @@ public class GeoMysqlAdapter extends AbstractGeoAdapter {
 	private String _username;
 	private String _password;
 	private String _tablename;
-	InetAddressValidator ipvalidator = new InetAddressValidator();
+	private InetAddressValidator ipvalidator = new InetAddressValidator();
 
 	public GeoMysqlAdapter(String ip, int port, String username,
 			String password, String tablename) {
@@ -141,6 +141,8 @@ public class GeoMysqlAdapter extends AbstractGeoAdapter {
 			jo.put("longitude", resultSet.getString("longitude"));
 			jo.put("dmaCode", resultSet.getString("dmaCode"));
 			jo.put("locID", resultSet.getString("locID"));
+			
+			jo.put("location_point", jo.get("longitude") + "," + jo.get("latitude"));
 
 			_LOG.debug("Returning enrichment: " + jo);
 
@@ -172,7 +174,6 @@ public class GeoMysqlAdapter extends AbstractGeoAdapter {
 
 			_LOG.info("[OpenSOC] Set JDBC connection....");
 
-
 			return true;
 		} catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/adapters/host/HostFromPropertiesFileAdapter.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/adapters/host/HostFromPropertiesFileAdapter.java b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/adapters/host/HostFromPropertiesFileAdapter.java
index b393fb5..e6f693a 100644
--- a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/adapters/host/HostFromPropertiesFileAdapter.java
+++ b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/adapters/host/HostFromPropertiesFileAdapter.java
@@ -21,6 +21,7 @@ import java.util.Map;
 
 import org.json.simple.JSONObject;
 
+@SuppressWarnings("serial")
 public class HostFromPropertiesFileAdapter extends AbstractHostAdapter {
 	
 	Map<String, JSONObject> _known_hosts;
@@ -40,7 +41,8 @@ public class HostFromPropertiesFileAdapter extends AbstractHostAdapter {
 			return false;
 	}
 
-	@Override
+	@SuppressWarnings("unchecked")
+    @Override
 	public JSONObject enrich(String metadata) {
 		
 		

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/adapters/threat/AbstractThreatAdapter.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/adapters/threat/AbstractThreatAdapter.java b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/adapters/threat/AbstractThreatAdapter.java
new file mode 100644
index 0000000..395ee48
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/adapters/threat/AbstractThreatAdapter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.enrichment.adapters.threat;
+
+import java.io.Serializable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.opensoc.enrichment.interfaces.EnrichmentAdapter;
+
+public abstract class AbstractThreatAdapter implements EnrichmentAdapter,Serializable{
+
+	
+	private static final long serialVersionUID = 1524030932856141771L;
+	protected static final Logger LOG = LoggerFactory
+			.getLogger(AbstractThreatAdapter.class);
+	
+	abstract public boolean initializeAdapter();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/adapters/threat/ThreatHbaseAdapter.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/adapters/threat/ThreatHbaseAdapter.java b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/adapters/threat/ThreatHbaseAdapter.java
new file mode 100644
index 0000000..97d02d4
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/adapters/threat/ThreatHbaseAdapter.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.enrichment.adapters.threat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.log4j.Logger;
+
+@SuppressWarnings("unchecked")
+public class ThreatHbaseAdapter extends AbstractThreatAdapter {
+
+	private static final long serialVersionUID = 1L;
+	private String _tableName;
+	private HTableInterface table;
+	private String _quorum;
+	private String _port;
+
+	public ThreatHbaseAdapter(String quorum, String port, String tableName) {
+		_quorum = quorum;
+		_port = port;
+		_tableName = tableName;
+	}
+
+	/** The LOGGER. */
+	private static final Logger LOGGER = Logger
+			.getLogger(ThreatHbaseAdapter.class);
+
+	public JSONObject enrich(String metadata) {
+
+		JSONObject output = new JSONObject();
+		LOGGER.debug("=======Looking Up For:" + metadata);
+		output.putAll(getThreatObject(metadata));
+
+		return output;
+	}
+
+	@SuppressWarnings({ "rawtypes", "deprecation" })
+	protected Map getThreatObject(String key) {
+
+		LOGGER.debug("=======Pinging HBase For:" + key);
+		
+		Get get = new Get(Bytes.toBytes(key));
+		Result rs;
+		Map output = new HashMap();
+
+		try {
+			rs = table.get(get);
+
+			if (!rs.isEmpty()) {
+				byte[] source_family = Bytes.toBytes("source");
+				JSONParser parser = new JSONParser();
+				
+				Map<byte[], byte[]> sourceFamilyMap = rs.getFamilyMap(source_family);
+				
+				for (Map.Entry<byte[], byte[]> entry  : sourceFamilyMap.entrySet()) {
+					String k = Bytes.toString(entry.getKey());
+					LOGGER.debug("=======Found intel from source: " + k);
+					output.put(k,parser.parse(Bytes.toString(entry.getValue())));
+	            }
+			}
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		} catch (ParseException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+		return output;
+	}
+
+	@Override
+	public boolean initializeAdapter() {
+
+		// Initialize HBase Table
+		Configuration conf = null;
+		conf = HBaseConfiguration.create();
+		conf.set("hbase.zookeeper.quorum", _quorum);
+		conf.set("hbase.zookeeper.property.clientPort", _port);
+
+		try {
+			LOGGER.debug("=======Connecting to HBASE===========");
+			LOGGER.debug("=======ZOOKEEPER = "
+					+ conf.get("hbase.zookeeper.quorum"));
+			HConnection connection = HConnectionManager.createConnection(conf);
+			table = connection.getTable(_tableName);
+			return true;
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			LOGGER.debug("=======Unable to Connect to HBASE===========");
+			e.printStackTrace();
+		}
+
+		return false;
+	}
+
+	
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/adapters/whois/WhoisHBaseAdapter.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/adapters/whois/WhoisHBaseAdapter.java b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/adapters/whois/WhoisHBaseAdapter.java
index 838f8fe..503618a 100644
--- a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/adapters/whois/WhoisHBaseAdapter.java
+++ b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/adapters/whois/WhoisHBaseAdapter.java
@@ -18,6 +18,7 @@
 package com.opensoc.enrichment.adapters.whois;
 
 import java.io.IOException;
+import java.util.Arrays;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -29,6 +30,9 @@ import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Result;
 import org.json.simple.JSONObject;
 
+import com.google.common.base.Joiner;
+import com.opensoc.tldextractor.BasicTldExtractor;
+
 public class WhoisHBaseAdapter extends AbstractWhoisAdapter {
 
 	/**
@@ -39,6 +43,7 @@ public class WhoisHBaseAdapter extends AbstractWhoisAdapter {
 	private String _table_name;
 	private String _quorum;
 	private String _port;
+	private BasicTldExtractor tldex = new BasicTldExtractor();
 
 	public WhoisHBaseAdapter(String table_name, String quorum, String port) {
 		_table_name = table_name;
@@ -88,11 +93,13 @@ public class WhoisHBaseAdapter extends AbstractWhoisAdapter {
 	}
 
 	@SuppressWarnings({ "unchecked", "deprecation" })
-	public JSONObject enrich(String metadata) {
+	public JSONObject enrich(String metadataIn) {
+		
+		String metadata = tldex.extract2LD(metadataIn);
 
 		LOG.trace("[OpenSOC] Pinging HBase For:" + metadata);
 
-
+        
 		JSONObject output = new JSONObject();
 		JSONObject payload = new JSONObject();
 
@@ -108,12 +115,22 @@ public class WhoisHBaseAdapter extends AbstractWhoisAdapter {
 			output.put("whois", payload);
 
 		} catch (IOException e) {
-			output.put(metadata, "{}");
+			payload.put(metadata, "{}");
+			output.put("whois", payload);
 			e.printStackTrace();
 		}
 
 		return output;
 
 	}
+	
+//	private String format(String input) {
+//		String output = input;
+//		String[] tokens = input.split("\\.");
+//		if(tokens.length > 2) {
+//			output = Joiner.on(".").join(Arrays.copyOfRange(tokens, 1, tokens.length));;
+//		}
+//		return output;
+//	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/common/AbstractEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/common/AbstractEnrichmentBolt.java b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/common/AbstractEnrichmentBolt.java
index f7aa0fa..be1ef96 100644
--- a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/common/AbstractEnrichmentBolt.java
+++ b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/common/AbstractEnrichmentBolt.java
@@ -51,8 +51,8 @@ public abstract class AbstractEnrichmentBolt extends BaseRichBolt {
 	protected String _OutputFieldName;
 
 	protected String _enrichment_tag;
-	protected Long _MAX_CACHE_SIZE;
-	protected Long _MAX_TIME_RETAIN;
+	protected Long _MAX_CACHE_SIZE_OBJECTS_NUM;
+	protected Long _MAX_TIME_RETAIN_MINUTES;
 
 	// JSON Keys to be enriched
 	protected List<String> _jsonKeys;
@@ -86,10 +86,10 @@ public abstract class AbstractEnrichmentBolt extends BaseRichBolt {
 			throw new IllegalStateException("OutputFieldName must be specified");
 		if (this._enrichment_tag == null)
 			throw new IllegalStateException("enrichment_tag must be specified");
-		if (this._MAX_CACHE_SIZE == null)
-			throw new IllegalStateException("MAX_CACHE_SIZE must be specified");
-		if (this._MAX_TIME_RETAIN == null)
-			throw new IllegalStateException("MAX_TIME_RETAIN must be specified");
+		if (this._MAX_CACHE_SIZE_OBJECTS_NUM == null)
+			throw new IllegalStateException("MAX_CACHE_SIZE_OBJECTS_NUM must be specified");
+		if (this._MAX_TIME_RETAIN_MINUTES == null)
+			throw new IllegalStateException("MAX_TIME_RETAIN_MINUTES must be specified");
 		if (this._adapter == null)
 			throw new IllegalStateException("Adapter must be specified");
 		if (this._jsonKeys == null)
@@ -102,8 +102,8 @@ public abstract class AbstractEnrichmentBolt extends BaseRichBolt {
 			}
 		};
 
-		cache = CacheBuilder.newBuilder().maximumSize(_MAX_CACHE_SIZE)
-				.expireAfterWrite(_MAX_TIME_RETAIN, TimeUnit.MINUTES)
+		cache = CacheBuilder.newBuilder().maximumSize(_MAX_CACHE_SIZE_OBJECTS_NUM)
+				.expireAfterWrite(_MAX_TIME_RETAIN_MINUTES, TimeUnit.MINUTES)
 				.build(loader);
 
 		boolean success = _adapter.initializeAdapter();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/common/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/common/GenericEnrichmentBolt.java b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/common/GenericEnrichmentBolt.java
index 2735a51..37c151f 100644
--- a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/common/GenericEnrichmentBolt.java
+++ b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/java/com/opensoc/enrichment/common/GenericEnrichmentBolt.java
@@ -34,9 +34,9 @@ import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 
 import com.opensoc.enrichment.interfaces.EnrichmentAdapter;
+import com.opensoc.helpers.topology.ErrorGenerator;
 import com.opensoc.json.serialization.JSONEncoderHelper;
 import com.opensoc.metrics.MetricReporter;
-import com.opensoc.topologyhelpers.ErrorGenerator;
 
 /**
  * Uses an adapter to enrich telemetry messages with additional metadata
@@ -99,24 +99,24 @@ public class GenericEnrichmentBolt extends AbstractEnrichmentBolt {
 	}
 
 	/**
-	 * @param MAX_CACHE_SIZE
+	 * @param MAX_CACHE_SIZE_OBJECTS_NUM
 	 *            Maximum size of cache before flushing
 	 * @return Instance of this class
 	 */
 
-	public GenericEnrichmentBolt withMaxCacheSize(long MAX_CACHE_SIZE) {
-		_MAX_CACHE_SIZE = MAX_CACHE_SIZE;
+	public GenericEnrichmentBolt withMaxCacheSize(long MAX_CACHE_SIZE_OBJECTS_NUM) {
+		_MAX_CACHE_SIZE_OBJECTS_NUM = MAX_CACHE_SIZE_OBJECTS_NUM;
 		return this;
 	}
 
 	/**
-	 * @param MAX_TIME_RETAIN
+	 * @param MAX_TIME_RETAIN_MINUTES
 	 *            Maximum time to retain cached entry before expiring
 	 * @return Instance of this class
 	 */
 
-	public GenericEnrichmentBolt withMaxTimeRetain(long MAX_TIME_RETAIN) {
-		_MAX_TIME_RETAIN = MAX_TIME_RETAIN;
+	public GenericEnrichmentBolt withMaxTimeRetain(long MAX_TIME_RETAIN_MINUTES) {
+		_MAX_TIME_RETAIN_MINUTES = MAX_TIME_RETAIN_MINUTES;
 		return this;
 	}
 
@@ -186,6 +186,11 @@ public class GenericEnrichmentBolt extends AbstractEnrichmentBolt {
 							+ "not present in message " + message);
 					continue;
 				}
+				
+				// If the field is empty, no need to enrich
+				if ( jsonvalue.length() == 0) {
+					continue;
+				}
 
 				JSONObject enrichment = cache.getUnchecked(jsonvalue);
 				LOG.trace("[OpenSOC] Enriched: " + jsonkey + " -> "
@@ -239,7 +244,7 @@ public class GenericEnrichmentBolt extends AbstractEnrichmentBolt {
 				failCounter.inc();
 			}
 			
-			JSONObject error = ErrorGenerator.generateErrorMessage("Enrichment problem: " + in_json, e.toString());
+			JSONObject error = ErrorGenerator.generateErrorMessage("Enrichment problem: " + in_json, e);
 			_collector.emit("error", new Values(error));
 		}
 		

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/resources/hbase-site.xml b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/resources/hbase-site.xml
index dc7cba5..8d812a9 100644
--- a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/resources/hbase-site.xml
+++ b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/main/resources/hbase-site.xml
@@ -1,90 +1,131 @@
-<!--Tue Feb 11 02:34:08 2014 -->
-<configuration>
-
-	<property>
-		<name>hbase.regionserver.global.memstore.lowerLimit</name>
-		<value>0.38</value>
-	</property>
-	<property>
-		<name>zookeeper.session.timeout</name>
-		<value>20</value>
-	</property>
-
-	<property>
-		<name>hbase.security.authorization</name>
-		<value>false</value>
-	</property>
-	<property>
-		<name>hbase.cluster.distributed</name>
-		<value>true</value>
-	</property>
-	
-	<property>
-		<name>hbase.hstore.flush.retries.number</name>
-		<value>120</value>
-	</property>
-	<property>
-		<name>hbase.hregion.memstore.block.multiplier</name>
-		<value>4</value>
-	</property>
-	<property>
-		<name>hbase.hstore.blockingStoreFiles</name>
-		<value>200</value>
-	</property>
-	<property>
-		<name>hbase.defaults.for.version.skip</name>
-		<value>true</value>
-	</property>
-	<property>
-		<name>hbase.regionserver.global.memstore.upperLimit</name>
-		<value>0.4</value>
-	</property>
-	<property>
-		<name>hbase.hregion.memstore.mslab.enabled</name>
-		<value>true</value>
-	</property>
-	<property>
-		<name>hbase.client.keyvalue.maxsize</name>
-		<value>10485760</value>
-	</property>
-	<property>
-		<name>hbase.superuser</name>
-		<value>hbase</value>
-	</property>
-	<property>
-		<name>hfile.block.cache.size</name>
-		<value>0.40</value>
-	</property>
-	<property>
-		<name>zookeeper.znode.parent</name>
-		<value>/hbase-unsecure</value>
-	</property>
-	<property>
-		<name>hbase.hregion.max.filesize</name>
-		<value>10737418240</value>
-	</property>
-	<property>
-		<name>hbase.zookeeper.property.clientPort</name>
-		<value>2181</value>
-	</property>
-	<property>
-		<name>hbase.security.authentication</name>
-		<value>simple</value>
-	</property>
-	<property>
-		<name>hbase.client.scanner.caching</name>
-		<value>100</value>
-	</property>
-	<property>
-		<name>hbase.hregion.memstore.flush.size</name>
-		<value>134217728</value>
-	</property>
-	<property>
-		<name>hbase.hregion.majorcompaction</name>
-		<value>86400000</value>
-	</property>
-	<property>
-		<name>hbase.client.write.buffer</name>
-		<value>500000000</value>
-	</property>
-</configuration>
\ No newline at end of file
+<!--Tue Apr  1 18:16:39 2014-->
+  <configuration>
+    <property>
+    <name>hbase.tmp.dir</name>
+    <value>/disk/h/hbase</value>
+  </property>
+    <property>
+    <name>hbase.hregion.memstore.chunkpool.maxsize</name>
+    <value>0.5</value>
+  </property>
+    <property>
+    <name>hbase.regionserver.codecs</name>
+    <value>lzo,gz,snappy</value>
+  </property>
+    <property>
+    <name>hbase.hstore.flush.retries.number</name>
+    <value>120</value>
+  </property>
+    <property>
+    <name>hbase.client.keyvalue.maxsize</name>
+    <value>10485760</value>
+  </property>
+    <property>
+    <name>hbase.rootdir</name>
+    <value>hdfs://nn1:8020/apps/hbase/data</value>
+  </property>
+    <property>
+    <name>hbase.defaults.for.version.skip</name>
+    <value>true</value>
+  </property>
+    <property>
+    <name>hbase.client.scanner.caching</name>
+    <value>100</value>
+  </property>
+    <property>
+    <name>hbase.superuser</name>
+    <value>hbase</value>
+  </property>
+    <property>
+    <name>hfile.block.cache.size</name>
+    <value>0.40</value>
+  </property>
+    <property>
+    <name>hbase.regionserver.checksum.verify</name>
+    <value>true</value>
+  </property>
+    <property>
+    <name>hbase.hregion.memstore.mslab.enabled</name>
+    <value>true</value>
+  </property>
+    <property>
+    <name>hbase.hregion.max.filesize</name>
+    <value>107374182400</value>
+  </property>
+    <property>
+    <name>hbase.cluster.distributed</name>
+    <value>true</value>
+  </property>
+    <property>
+    <name>zookeeper.session.timeout</name>
+    <value>30000</value>
+  </property>
+    <property>
+    <name>zookeeper.znode.parent</name>
+    <value>/hbase-unsecure</value>
+  </property>
+    <property>
+    <name>hbase.regionserver.global.memstore.lowerLimit</name>
+    <value>0.38</value>
+  </property>
+    <property>
+    <name>hbase.regionserver.handler.count</name>
+    <value>240</value>
+  </property>
+    <property>
+    <name>hbase.hregion.memstore.mslab.chunksize</name>
+    <value>8388608</value>
+  </property>
+    <property>
+    <name>hbase.zookeeper.quorum</name>
+    <value>zkpr1,zkpr2,zkpr3</value>
+  </property>
+    <property>
+    <name>hbase.zookeeper.useMulti</name>
+    <value>true</value>
+  </property>
+    <property>
+    <name>hbase.hregion.majorcompaction</name>
+    <value>86400000</value>
+  </property>
+    <property>
+    <name>hbase.hstore.blockingStoreFiles</name>
+    <value>200</value>
+  </property>
+    <property>
+    <name>hbase.zookeeper.property.clientPort</name>
+    <value>2181</value>
+  </property>
+    <property>
+    <name>hbase.hregion.memstore.flush.size</name>
+    <value>134217728</value>
+  </property>
+    <property>
+    <name>hbase.security.authorization</name>
+    <value>false</value>
+  </property>
+    <property>
+    <name>hbase.regionserver.global.memstore.upperLimit</name>
+    <value>0.4</value>
+  </property>
+    <property>
+    <name>hbase.hstore.compactionThreshold</name>
+    <value>4</value>
+  </property>
+    <property>
+    <name>hbase.hregion.memstore.block.multiplier</name>
+    <value>8</value>
+  </property>
+    <property>
+    <name>hbase.security.authentication</name>
+    <value>simple</value>
+  </property>
+    <property>
+    <name>dfs.client.read.shortcircuit</name>
+    <value>true</value>
+  </property>
+  <property>
+    <name>dfs.domain.socket.path</name>
+    <value>/var/run/hdfs/dn_socket</value>
+  </property>
+  </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/java/com/opensoc/enrichment/adapters/cif/CIFHbaseAdapterTest.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/java/com/opensoc/enrichment/adapters/cif/CIFHbaseAdapterTest.java b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/java/com/opensoc/enrichment/adapters/cif/CIFHbaseAdapterTest.java
index e7810d4..82390e9 100644
--- a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/java/com/opensoc/enrichment/adapters/cif/CIFHbaseAdapterTest.java
+++ b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/java/com/opensoc/enrichment/adapters/cif/CIFHbaseAdapterTest.java
@@ -18,6 +18,7 @@
  */
 package com.opensoc.enrichment.adapters.cif;
 
+import java.net.InetAddress;
 import java.util.Properties;
 
 import com.opensoc.test.AbstractTestContext;
@@ -37,6 +38,7 @@ public class CIFHbaseAdapterTest extends AbstractTestContext {
     private static CIFHbaseAdapter cifHbaseAdapter=null;
 
 
+
     /**
      * Constructs a new <code>CIFHbaseAdapterTest</code> instance.
      * @param name
@@ -70,8 +72,33 @@ public class CIFHbaseAdapterTest extends AbstractTestContext {
 
     protected void setUp() throws Exception {
         super.setUp();
+        
         Properties prop = super.getTestProperties();
         assertNotNull(prop);
+        
+        if(skipTests(this.getMode())){
+            return;//skip tests
+        }
+        
+        String[] zk = prop.get("kafka.zk.list").toString().split(",");
+        
+        for(String z : zk)
+        {
+        	InetAddress address = InetAddress.getByName(z);
+            boolean reachable = address.isReachable(100);
+
+            if(!reachable)
+            {
+            	this.setMode("local");
+            	//throw new Exception("Unable to reach zookeeper, skipping CIF adapter test");
+            	break;
+            }
+            
+        }
+        
+        if(skipTests(this.getMode()))
+            return;//skip tests
+            
         System.out.println("kafka.zk.list ="+(String) prop.get("kafka.zk.list"));
         System.out.println("kafka.zk.list ="+(String) prop.get("kafka.zk.port"));   
         System.out.println("kafka.zk.list ="+(String) prop.get("bolt.enrichment.cif.tablename"));   

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/java/com/opensoc/enrichment/adapters/geo/GeoMysqlAdapterTest.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/java/com/opensoc/enrichment/adapters/geo/GeoMysqlAdapterTest.java b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/java/com/opensoc/enrichment/adapters/geo/GeoMysqlAdapterTest.java
index 173819b..ca54500 100644
--- a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/java/com/opensoc/enrichment/adapters/geo/GeoMysqlAdapterTest.java
+++ b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/java/com/opensoc/enrichment/adapters/geo/GeoMysqlAdapterTest.java
@@ -16,21 +16,22 @@
  */
 package com.opensoc.enrichment.adapters.geo;
 
+import java.net.URL;
 import java.util.Properties;
 
 import org.json.simple.JSONObject;
 
-import com.opensoc.test.AbstractTestContext;
+import com.opensoc.test.AbstractSchemaTest;
 
  /**
  * <ul>
- * <li>Title: </li>
- * <li>Description: </li>
+ * <li>Title: GeoMySqlAdapterTest</li>
+ * <li>Description: Tests for GeoMySqlAdapter</li>
  * <li>Created: Aug 25, 2014</li>
  * </ul>
  * @version $Revision: 1.1 $
  */
-public class GeoMysqlAdapterTest extends AbstractTestContext  {
+public class GeoMysqlAdapterTest extends AbstractSchemaTest {
 
     private static GeoMysqlAdapter geoMySqlAdapter=null;
     private static boolean connected=false;
@@ -72,9 +73,12 @@ public class GeoMysqlAdapterTest extends AbstractTestContext  {
             System.out.println(getClass().getName()+" Skipping Tests !!Local Mode");
             return;//skip tests
        }else{
-        geoMySqlAdapter=new GeoMysqlAdapter((String)prop.get("mysql.ip"), (new Integer((String)prop.get("mysql.port"))).intValue(),(String)prop.get("mysql.username"),(String)prop.get("mysql.password"), (String)prop.get("bolt.enrichment.geo.adapter.table"));
-        connected =geoMySqlAdapter.initializeAdapter();
-        assertTrue(connected);
+           GeoMysqlAdapterTest.setGeoMySqlAdapter(new GeoMysqlAdapter((String)prop.get("mysql.ip"), (new Integer((String)prop.get("mysql.port"))).intValue(),(String)prop.get("mysql.username"),(String)prop.get("mysql.password"), (String)prop.get("bolt.enrichment.geo.adapter.table")));
+           connected =geoMySqlAdapter.initializeAdapter();
+           assertTrue(connected);
+           URL schema_url = getClass().getClassLoader().getResource(
+               "TestSchemas/GeoMySqlSchema.json");
+           super.setSchemaJsonString(super.readSchemaFromFile(schema_url));  
        }
     }
 
@@ -85,7 +89,7 @@ public class GeoMysqlAdapterTest extends AbstractTestContext  {
 
     protected void tearDown() throws Exception {
         super.tearDown();
-        geoMySqlAdapter=null;
+        GeoMysqlAdapterTest.setGeoMySqlAdapter(null);
     }
 
     /**
@@ -95,16 +99,24 @@ public class GeoMysqlAdapterTest extends AbstractTestContext  {
         if(skipTests(this.getMode())){
             return;//skip tests
        }else{
-        JSONObject json = geoMySqlAdapter.enrich("72.163.4.161");
-        
-        //assert Geo Response is not null
-        assertNotNull(json);
-        
-        //assert LocId is not null
-        assertNotNull(json.get("locID"));
+           
+         try {           
+                JSONObject json = geoMySqlAdapter.enrich("72.163.4.161");
+                
+                //assert Geo Response is not null
+                System.out.println("json ="+json);
+                assertNotNull(json);
         
-        //assert right LocId is being returned
-        assertEquals("4522",json.get("locID"));
+                assertEquals(true, super.validateJsonData(super.getSchemaJsonString(), json.toString()));
+                //assert LocId is not null
+                assertNotNull(json.get("locID"));
+                
+                //assert right LocId is being returned
+                assertEquals("4522",json.get("locID"));    
+         } catch (Exception e) {
+            e.printStackTrace();
+            fail("Json validation Failed");
+         }
        }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/java/com/opensoc/enrichment/adapters/whois/WhoisHBaseAdapterTest.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/java/com/opensoc/enrichment/adapters/whois/WhoisHBaseAdapterTest.java b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/java/com/opensoc/enrichment/adapters/whois/WhoisHBaseAdapterTest.java
index 3d2e219..3057c13 100644
--- a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/java/com/opensoc/enrichment/adapters/whois/WhoisHBaseAdapterTest.java
+++ b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/java/com/opensoc/enrichment/adapters/whois/WhoisHBaseAdapterTest.java
@@ -18,6 +18,7 @@
  */
 package com.opensoc.enrichment.adapters.whois;
 
+import java.net.InetAddress;
 import java.util.Properties;
 
 import org.json.simple.JSONObject;
@@ -68,13 +69,39 @@ public class WhoisHBaseAdapterTest extends AbstractTestContext {
         super.setUp();
         Properties prop = super.getTestProperties();
         assertNotNull(prop);   
+        
         if(skipTests(this.getMode())){
             return;//skip tests
-       }else{ 
+        }
+        
+        String[] zk = prop.get("kafka.zk.list").toString().split(",");
+        
+        for(String z : zk)
+        {
+        	InetAddress address = InetAddress.getByName(z);
+            boolean reachable = address.isReachable(100);
+
+            if(!reachable)
+            {
+            	this.setMode("local");
+            	break;
+            	//throw new Exception("Unable to reach zookeeper, skipping WHois adapter test");
+            }
+            
+            System.out.println("kafka.zk.list ="+(String) prop.get("kafka.zk.list"));
+            System.out.println("kafka.zk.list ="+(String) prop.get("kafka.zk.port"));   
+            System.out.println("kafka.zk.list ="+(String) prop.get("bolt.enrichment.cif.tablename")); 
+            
+        }
+        
+        if(skipTests(this.getMode())){
+            System.out.println("Local Mode Skipping tests !! ");
+        }else{
             whoisHbaseAdapter=new WhoisHBaseAdapter((String)prop.get("bolt.enrichment.whois.hbase.table.name"),(String)prop.get("kafka.zk.list"),(String)prop.get("kafka.zk.port"));
             connected =whoisHbaseAdapter.initializeAdapter();
             assertTrue(connected);
-       }
+        }
+       
     }
 
     /* 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/CIFHbaseAdapterTest.properties
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/CIFHbaseAdapterTest.properties b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/CIFHbaseAdapterTest.properties
index 43ef4f6..8217353 100644
--- a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/CIFHbaseAdapterTest.properties
+++ b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/CIFHbaseAdapterTest.properties
@@ -1,11 +1,11 @@
 kafka.zk.port=2181
-kafka.zk.list=zkpr1,zkpr2,zkpr3
-kafka.zk=zkpr1:2181,zkpr2:2181,zkpr3:2181
+kafka.zk.list=zkpr1
+kafka.zk=zkpr1:2181
 
 #CIF Enrichment
 bolt.enrichment.cif.tablename=cif_table
 bolt.enrichment.cif.host=tld
 bolt.enrichment.cif.email=email
-bolt.enrichment.cif.MAX_CACHE_SIZE=10000
-bolt.enrichment.cif.MAX_TIME_RETAIN=10
+bolt.enrichment.cif.MAX_CACHE_SIZE_OBJECTS_NUM=10000
+bolt.enrichment.cif.MAX_TIME_RETAIN_MINUTES=10
 bolt.enrichment.cif.enrichment_tag=cif

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/GeoMysqlAdapterTest.properties
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/GeoMysqlAdapterTest.properties b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/GeoMysqlAdapterTest.properties
index fe95233..3a4e179 100644
--- a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/GeoMysqlAdapterTest.properties
+++ b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/GeoMysqlAdapterTest.properties
@@ -1,11 +1,11 @@
 mysql.ip=172.30.9.120
-mysql.port=0
+mysql.port=3306
 mysql.username=test
 mysql.password=123123
 
 #GeoEnrichment
-
 bolt.enrichment.geo.enrichment_tag=geo
 bolt.enrichment.geo.adapter.table=GEO
-bolt.enrichment.geo.MAX_CACHE_SIZE=10000
-bolt.enrichment.geo.MAX_TIME_RETAIN=10
+bolt.enrichment.geo.MAX_CACHE_SIZE_OBJECTS_NUM=10000
+bolt.enrichment.geo.MAX_TIME_RETAIN_MINUTES=10
+bolt.enrichment.geo.source=ip_src_addr,ip_dst_addr

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/TestSchemas/CIFHbaseSchema.json
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/TestSchemas/CIFHbaseSchema.json b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/TestSchemas/CIFHbaseSchema.json
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/TestSchemas/GeoMySqlSchema.json
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/TestSchemas/GeoMySqlSchema.json b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/TestSchemas/GeoMySqlSchema.json
new file mode 100644
index 0000000..c4f2a82
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/TestSchemas/GeoMySqlSchema.json
@@ -0,0 +1,42 @@
+{
+"title": "GeoMySql Schema",
+"type": "object",
+"properties": {
+
+         "city"    : {
+					   "type": "string"
+				  },
+		 "country" : {
+						"type": "string"
+					},
+		 "dmaCode" :
+		 			 {
+						"type": "string"
+					},
+	     "geoHash" : 
+	     			{
+						"type": "string"
+					},
+		 "latitude" : 
+		 			{
+						"type": "string"
+				   },
+		 "locID" : 
+		 			{
+					   "type": "string"
+				   },
+		 "location_point" : 
+		 			{
+					   "type": "string"
+				    },
+		 "longitude" : 
+		 			{
+						"type": "string"
+					},
+		 "postalCode" : 
+		 			{
+						"type": "string"
+					}
+   },
+   "required": ["city", "country", "dmaCode","latitude","locID","location_point","postalCode"]
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/TestSchemas/WhoisHbaseSchema.json
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/TestSchemas/WhoisHbaseSchema.json b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/TestSchemas/WhoisHbaseSchema.json
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/WhoisHbaseAdapterTest.properties
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/WhoisHbaseAdapterTest.properties b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/WhoisHbaseAdapterTest.properties
index b80dfcd..4f264ed 100644
--- a/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/WhoisHbaseAdapterTest.properties
+++ b/opensoc-streaming/OpenSOC-EnrichmentAdapters/src/test/resources/WhoisHbaseAdapterTest.properties
@@ -1,11 +1,11 @@
 kafka.zk.port=2181
-kafka.zk.list=zkpr1,zkpr2,zkpr3
-kafka.zk=zkpr1:2181,zkpr2:2181,zkpr3:2181
+kafka.zk.list=zkpr1
+kafka.zk=zkpr1:2181
 
 #WhoisEnrichment
 
 bolt.enrichment.whois.hbase.table.name=whois
 bolt.enrichment.whois.enrichment_tag=whois
 bolt.enrichment.whois.source=tld
-bolt.enrichment.whois.MAX_CACHE_SIZE=10000
-bolt.enrichment.whois.MAX_TIME_RETAIN=10
+bolt.enrichment.whois.MAX_CACHE_SIZE_OBJECTS_NUM=10000
+bolt.enrichment.whois.MAX_TIME_RETAIN_MINUTES=10

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Indexing/pom.xml
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Indexing/pom.xml b/opensoc-streaming/OpenSOC-Indexing/pom.xml
index 96e2bd0..d55ab7f 100644
--- a/opensoc-streaming/OpenSOC-Indexing/pom.xml
+++ b/opensoc-streaming/OpenSOC-Indexing/pom.xml
@@ -15,11 +15,13 @@
 	<parent>
 		<groupId>com.opensoc</groupId>
 		<artifactId>OpenSOC-Streaming</artifactId>
-		<version>0.3BETA-SNAPSHOT</version>
+		<version>0.6BETA</version>
 	</parent>
 	<artifactId>OpenSOC-Indexing</artifactId>
 	<properties>
-		<elastic.search.version>1.2.1</elastic.search.version>
+       <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>		
+		<elastic.search.version>1.3.1</elastic.search.version>
 		<http.client.version>4.3.4</http.client.version>
 		<jsonsimple.version>1.1.1</jsonsimple.version>
 	</properties>
@@ -28,13 +30,19 @@
 		<dependency>
 			<groupId>com.opensoc</groupId>
 			<artifactId>OpenSOC-Common</artifactId>
-			<version>${parent.version}</version>
+			<version>${project.parent.version}</version>
 		</dependency>
 		<dependency>
 			<groupId>org.apache.storm</groupId>
 			<artifactId>storm-core</artifactId>
 			<version>${global_storm_version}</version>
 			<scope>provided</scope>
+			  <exclusions>
+				<exclusion>
+				   <artifactId>servlet-api</artifactId>
+				   <groupId>javax.servlet</groupId>
+				  </exclusion>
+		    </exclusions>					
 		</dependency>
 		<dependency>
 			<groupId>org.elasticsearch</groupId>
@@ -86,4 +94,4 @@
       </plugin>
     </plugins>
   </reporting>  	
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Indexing/readme.md
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Indexing/readme.md b/opensoc-streaming/OpenSOC-Indexing/readme.md
new file mode 100644
index 0000000..bd9d7ac
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Indexing/readme.md
@@ -0,0 +1,61 @@
+#OpenSOC-Indexing
+
+##Module Description
+
+This module provides the indexing capability to OpenSOC components.  The primary indexing engine for now is Elastic Search, but Solr may be supported at some point in the future as well.  There are three types of messages that are commonly indexed in OpenSOC topologies: messages, alerts, and errors.  Messages are telemetry messages parsed by the parser bolt.  Alerts are alerts generated by the alerts bolt.  Errors are an optional feature where each OpenSOC bolt in addition to outputting errors in the log file will also index them for immediate analysis.
+
+###Index bolt
+
+The signature of the index bolt is as follows:
+
+```
+TelemetryIndexingBolt indexing_bolt = new TelemetryIndexingBolt()
+.withIndexIP(config.getString("es.ip"))
+.withIndexPort(config.getInt("es.port"))
+.withClusterName(config.getString("es.clustername"))
+.withIndexName(
+config.getString("bolt.error.indexing.indexname"))
+.withDocumentName(
+config.getString("bolt.error.indexing.documentname"))
+.withBulk(config.getInt("bolt.error.indexing.bulk"))
+.withIndexAdapter(adapter)
+.withMetricConfiguration(config);
+
+```
+
+###IndexAdapters
+
+*com.opensoc.indexing.adapters.ESBaseBulkAdapter - bulk ingest messages into Elastic Search
+*com.opensoc.indexing.adapters.ESBaseBulkRotatingAdapter - does everything adapter above does, but is able to rotate the index names based on size
+*com.opensoc.indexing.adapters.ESTimedBulkRotatingAdapter - does everything adapter above does, but is able to rotate the index names based on size and time
+*com.opensoc.indexing.adapters.SolrAdapter - currently under development
+
+/etc/ directory contains all environment-related configs
+
+##Sample Input and Generator Spout
+
+The sample input for topologies provided in this release was checked in here:
+
+```
+https://github.com/OpenSOC/opensoc-streaming/tree/master/OpenSOC-Topologies/src/main/resources/SampleInput
+```
+
+We provide a generator spout that is able to drive these topologies.  In production we run with the kafka spout, but for documentation on that please reference the Storm project documentation
+
+The generator spout comes with the following signature:
+
+```
+GenericInternalTestSpout testSpout = new GenericInternalTestSpout()
+.withFilename(test_file_path).withRepeating(
+config.getBoolean("spout.test.parallelism.repeat"));
+```
+
+* the repeat variable defines if the generator spout will loop through the input or stop once it gets to the end of file
+
+###Additional Storm Bolts
+In addition to custom bolts developed for OpenSOC we utilize standard bolts and spouts included with the Storm release.  We will not provide documentation for these spouts and bolts since they are provided as part of Storm.  These spouts bolts are:
+
+* KafkaSpout
+* KafkaBolt
+* HDFSBolt
+* HBaseBolt
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/TelemetryIndexingBolt.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/TelemetryIndexingBolt.java b/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/TelemetryIndexingBolt.java
index 965deb5..2c4e0a9 100644
--- a/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/TelemetryIndexingBolt.java
+++ b/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/TelemetryIndexingBolt.java
@@ -33,10 +33,10 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 
+import com.opensoc.helpers.topology.ErrorGenerator;
 import com.opensoc.index.interfaces.IndexAdapter;
 import com.opensoc.json.serialization.JSONEncoderHelper;
 import com.opensoc.metrics.MetricReporter;
-import com.opensoc.topologyhelpers.ErrorGenerator;
 
 /**
  * 
@@ -59,6 +59,8 @@ import com.opensoc.topologyhelpers.ErrorGenerator;
 public class TelemetryIndexingBolt extends AbstractIndexingBolt {
 
 	private JSONObject metricConfiguration;
+	private String _indexDateFormat;
+	
 	private Set<Tuple> tuple_queue = new HashSet<Tuple>();
 
 	/**
@@ -140,7 +142,18 @@ public class TelemetryIndexingBolt extends AbstractIndexingBolt {
 
 		return this;
 	}
+	
+	/**
+	 * 
+	 * @param dateFormat
+	 *           timestamp to append to index names
+	 * @return instance of bolt
+	 */
+	public TelemetryIndexingBolt withIndexTimestamp(String indexTimestamp) {
+		_indexDateFormat = indexTimestamp;
 
+		return this;
+	}
 	/**
 	 * 
 	 * @param config
@@ -161,7 +174,7 @@ public class TelemetryIndexingBolt extends AbstractIndexingBolt {
 		try {
 			
 			_adapter.initializeConnection(_IndexIP, _IndexPort,
-					_ClusterName, _IndexName, _DocumentName, _BulkIndexNumber);
+					_ClusterName, _IndexName, _DocumentName, _BulkIndexNumber, _indexDateFormat);
 			
 			_reporter = new MetricReporter();
 			_reporter.initialize(metricConfiguration,
@@ -170,10 +183,8 @@ public class TelemetryIndexingBolt extends AbstractIndexingBolt {
 		} catch (Exception e) {
 			
 			e.printStackTrace();
-			
-			String error_as_string = org.apache.commons.lang.exception.ExceptionUtils.getStackTrace(e);
-			
-			JSONObject error = ErrorGenerator.generateErrorMessage(new String("bulk index problem"), error_as_string);
+					
+			JSONObject error = ErrorGenerator.generateErrorMessage(new String("bulk index problem"), e);
 			_collector.emit("error", new Values(error));
 		}
 
@@ -222,9 +233,8 @@ public class TelemetryIndexingBolt extends AbstractIndexingBolt {
 				_collector.fail(setElement);
 				failCounter.inc();
 				
-				String error_as_string = org.apache.commons.lang.exception.ExceptionUtils.getStackTrace(e);
 				
-				JSONObject error = ErrorGenerator.generateErrorMessage(new String("bulk index problem"), error_as_string);
+				JSONObject error = ErrorGenerator.generateErrorMessage(new String("bulk index problem"), e);
 				_collector.emit("error", new Values(error));
 			}
 			tuple_queue.clear();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/adapters/AbstractIndexAdapter.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/adapters/AbstractIndexAdapter.java b/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/adapters/AbstractIndexAdapter.java
index 3644c9e..6dafbe7 100644
--- a/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/adapters/AbstractIndexAdapter.java
+++ b/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/adapters/AbstractIndexAdapter.java
@@ -20,6 +20,6 @@ public abstract class AbstractIndexAdapter implements IndexAdapter, Serializable
 
 	abstract public boolean initializeConnection(String ip, int port,
 			String cluster_name, String index_name, String document_name,
-			int bulk) throws Exception;
+			int bulk, String date_format) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/adapters/ESBaseBulkAdapter.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/adapters/ESBaseBulkAdapter.java b/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/adapters/ESBaseBulkAdapter.java
index 97af748..e5ed283 100644
--- a/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/adapters/ESBaseBulkAdapter.java
+++ b/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/adapters/ESBaseBulkAdapter.java
@@ -1,12 +1,11 @@
 package com.opensoc.indexing.adapters;
 
 import java.io.Serializable;
-import java.util.HashSet;
 import java.util.Iterator;
-import java.util.Set;
+import java.util.Map;
 
 import org.apache.commons.collections.Bag;
-import org.apache.commons.collections.HashBag;
+import org.apache.commons.collections.bag.HashBag;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequestBuilder;
@@ -35,7 +34,7 @@ public class ESBaseBulkAdapter extends AbstractIndexAdapter implements
 	@Override
 	public boolean initializeConnection(String ip, int port,
 			String cluster_name, String index_name, String document_name,
-			int bulk_size) throws Exception {
+			int bulk_size, String date_format) throws Exception {
 
 		bulk_set = new HashBag();
 
@@ -141,4 +140,9 @@ public class ESBaseBulkAdapter extends AbstractIndexAdapter implements
 			return false;
 		}
 	}
+
+	public void setOptionalSettings(Map<String, String> settings) {
+		// TODO Auto-generated method stub
+		
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/adapters/ESBulkRotatingAdapter.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/adapters/ESBulkRotatingAdapter.java b/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/adapters/ESBulkRotatingAdapter.java
index 022bbde..ebdc7b0 100644
--- a/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/adapters/ESBulkRotatingAdapter.java
+++ b/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/adapters/ESBulkRotatingAdapter.java
@@ -3,6 +3,7 @@ package com.opensoc.indexing.adapters;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.Map;
 
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
@@ -34,11 +35,11 @@ public class ESBulkRotatingAdapter extends AbstractIndexAdapter {
 	private HttpClient httpclient;
 	private HttpPost post;
 
-	private DateFormat dateFormat = new SimpleDateFormat("yyyy.MM.dd.HH");
+	private DateFormat dateFormat;
 
 	public boolean initializeConnection(String ip, int port,
 			String cluster_name, String index_name, String document_name,
-			int bulk_size) {
+			int bulk_size, String date_format) {
 
 		_LOG.info("Initializing ESBulkAdapter...");
 
@@ -51,9 +52,11 @@ public class ESBulkRotatingAdapter extends AbstractIndexAdapter {
 			_document_name = document_name;
 
 			_bulk_size = bulk_size - 1;
+			
 
+			dateFormat = new SimpleDateFormat(date_format);
+			
 			element_count = 0;
-			index_postfix = dateFormat.format(new Date());
 			running_index_postfix = "NONE";
 
 			Settings settings = ImmutableSettings.settingsBuilder()
@@ -76,7 +79,7 @@ public class ESBulkRotatingAdapter extends AbstractIndexAdapter {
 
 		index_postfix = dateFormat.format(new Date());
 
-		bulkRequest.add(client.prepareIndex(_index_name + "-" + index_postfix,
+		bulkRequest.add(client.prepareIndex(_index_name + "_" + index_postfix,
 				_document_name).setSource(raw_message));
 
 		return doIndex();
@@ -86,7 +89,7 @@ public class ESBulkRotatingAdapter extends AbstractIndexAdapter {
 
 		index_postfix = dateFormat.format(new Date());
 
-		bulkRequest.add(client.prepareIndex(_index_name + "-" + index_postfix,
+		bulkRequest.add(client.prepareIndex(_index_name + "_" + index_postfix,
 				_document_name).setSource(raw_message));
 
 		return doIndex();
@@ -149,4 +152,9 @@ public class ESBulkRotatingAdapter extends AbstractIndexAdapter {
 		return 1;
 	}
 
+	public void setOptionalSettings(Map<String, String> settings) {
+		// TODO Auto-generated method stub
+		
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/adapters/ESTimedRotatingAdapter.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/adapters/ESTimedRotatingAdapter.java b/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/adapters/ESTimedRotatingAdapter.java
index e0a8b98..a94ef97 100644
--- a/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/adapters/ESTimedRotatingAdapter.java
+++ b/opensoc-streaming/OpenSOC-Indexing/src/main/java/com/opensoc/indexing/adapters/ESTimedRotatingAdapter.java
@@ -5,6 +5,7 @@ import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.Iterator;
+import java.util.Map;
 
 import org.apache.commons.collections.Bag;
 import org.apache.commons.collections.HashBag;
@@ -29,16 +30,23 @@ public class ESTimedRotatingAdapter extends AbstractIndexAdapter implements
 	private int _port;
 	private String _ip;
 	public transient TransportClient client;
-	private DateFormat dateFormat = new SimpleDateFormat("yyyy.MM.dd.HH");
+	private DateFormat dateFormat;
+	
+	private Map<String, String> tuning_settings;
 
 	private Bag bulk_set;
 
 	private Settings settings;
+	
+	public void setOptionalSettings(Map<String, String> settings)
+	{
+		tuning_settings = settings;
+	}
 
 	@Override
 	public boolean initializeConnection(String ip, int port,
 			String cluster_name, String index_name, String document_name,
-			int bulk_size) throws Exception {
+			int bulk_size, String date_format) throws Exception {
 
 		bulk_set = new HashBag();
 
@@ -51,11 +59,25 @@ public class ESTimedRotatingAdapter extends AbstractIndexAdapter implements
 			_index_name = index_name;
 			_document_name = document_name;
 			_bulk_size = bulk_size;
+			
+
+			dateFormat = new SimpleDateFormat(date_format);
 
 			System.out.println("Bulk indexing is set to: " + _bulk_size);
 
-			settings = ImmutableSettings.settingsBuilder()
-					.put("cluster.name", _cluster_name).build();
+			ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder() ;	
+			
+			if(tuning_settings != null && tuning_settings.size() > 0)
+			{
+					builder.put(tuning_settings);
+			}
+			
+			builder.put("cluster.name", _cluster_name);
+			builder.put("client.transport.ping_timeout","500s");
+			
+			
+			settings = builder.build();
+					
 			client = new TransportClient(settings)
 					.addTransportAddress(new InetSocketTransportAddress(_ip,
 							_port));
@@ -83,7 +105,7 @@ public class ESTimedRotatingAdapter extends AbstractIndexAdapter implements
 			bulk_set.add(raw_message);
 			set_size = bulk_set.size();
 			
-			System.out.println("Bulk size is now: " + bulk_set.size());
+			_LOG.trace("[OpenSOC] Incremented bulk size to: " + bulk_set.size());
 		}
 
 		try {
@@ -122,7 +144,7 @@ public class ESTimedRotatingAdapter extends AbstractIndexAdapter implements
 				while (iterator.hasNext()) {
 					JSONObject setElement = iterator.next();
 					
-					System.out.println("Flushing to index: " + _index_name+ "_" + index_postfix);
+					_LOG.trace("[OpenSOC] Flushing to index: " + _index_name+ "_" + index_postfix);
 
 					IndexRequestBuilder a = client.prepareIndex(_index_name+ "_" + index_postfix,
 							_document_name);
@@ -131,22 +153,27 @@ public class ESTimedRotatingAdapter extends AbstractIndexAdapter implements
 
 				}
 
-				System.out.println("Performing bulk load of size: "
+				_LOG.trace("[OpenSOC] Performing bulk load of size: "
 						+ bulkRequest.numberOfActions());
 
 				BulkResponse resp = bulkRequest.execute().actionGet();
 				
+				for(BulkItemResponse r: resp.getItems())
+				{
+					r.getResponse();
+					_LOG.trace("[OpenSOC] ES SUCCESS MESSAGE: " + r.getFailureMessage());
+				}
 				
-				System.out.println("[OpenSOC] Received bulk response: "
-						+ resp.buildFailureMessage());
 				bulk_set.clear();
 				
 				if (resp.hasFailures()) {
-				    
+					_LOG.error("[OpenSOC] Received bulk response error: "
+							+ resp.buildFailureMessage());
+					
 					for(BulkItemResponse r: resp.getItems())
 					{
 						r.getResponse();
-						System.out.println("FAILURE MESSAGE: " + r.getFailureMessage());
+						_LOG.error("[OpenSOC] ES FAILURE MESSAGE: " + r.getFailureMessage());
 					}
 				}
 				

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-MessageParsers/pom.xml
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-MessageParsers/pom.xml b/opensoc-streaming/OpenSOC-MessageParsers/pom.xml
index 5cfdfa7..9a7d651 100644
--- a/opensoc-streaming/OpenSOC-MessageParsers/pom.xml
+++ b/opensoc-streaming/OpenSOC-MessageParsers/pom.xml
@@ -15,14 +15,18 @@
 	<parent>
 		<groupId>com.opensoc</groupId>
 		<artifactId>OpenSOC-Streaming</artifactId>
-		<version>0.3BETA-SNAPSHOT</version>
+		<version>0.6BETA</version>
 	</parent>
 	<artifactId>OpenSOC-MessageParsers</artifactId>
+    <properties>
+ 		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>	
+    </properties>
 	<dependencies>
 		<dependency>
 			<groupId>com.opensoc</groupId>
 			<artifactId>OpenSOC-Common</artifactId>
-			<version>${parent.version}</version>
+			<version>${project.parent.version}</version>
 		</dependency>
 		<dependency>
 			<groupId>org.apache.storm</groupId>
@@ -40,11 +44,7 @@
 			<artifactId>guava</artifactId>
 			<version>${global_guava_version}</version>
 		</dependency>
-		<dependency>
-			<groupId>com.github.fge</groupId>
-			<artifactId>json-schema-validator</artifactId>
-			<version>${global_json_schema_validator_version}</version>
-		</dependency>
+
 		<dependency>
 			<groupId>io.thekraken</groupId>
 			<artifactId>grok</artifactId>
@@ -74,15 +74,31 @@
 				<configuration>
 					<targetJdk>1.7</targetJdk>
 				</configuration>
+				
 			</plugin>
 		</plugins>
 	</reporting>
 	<build>
+	<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.1</version>
+				<inherited>true</inherited>
+				<configuration>
+					<source>1.7</source>
+					<target>1.7</target>
+				</configuration>
+			</plugin>
+			</plugins>
 		<resources>
 		<resource>
 				<directory>src/main/resources</directory>
 			</resource>
 			<resource>
+				<directory>src/main/resources/patterns</directory>
+			</resource>
+			<resource>
 				<directory>src/test/resources</directory>
 			</resource>
 		</resources>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-MessageParsers/readme.md
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-MessageParsers/readme.md b/opensoc-streaming/OpenSOC-MessageParsers/readme.md
new file mode 100644
index 0000000..128932a
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-MessageParsers/readme.md
@@ -0,0 +1,82 @@
+#OpenSOC-Parsers
+
+##Module Description
+
+This module provides a list of parsers that can be used with the OpenSOC framework.  There are two types of parsers.  First type is a Java parser.  This kind of parser is optimized for speed and performance and is built for use with higher velicity topologies.  These parsers are not easily modifiable and in order to make changes to them the entire topology need to be recompiled.  The second type of parser provided with the system is a Grok parser.  This type of parser is primarily designed for lower-velocity topologies or for quickly standing up a parser for a new telemetry before a permanent Java parser can be written for it.
+
+##Message Format
+
+All opensoc messages follow a specific format in order to ingest a message.  If a message does not conform to this format it will be dropped and put onto an error queue for further examination.  The message must be of a JSON format and must have a JSON tag message like so:
+
+```
+{"message" : message content}
+
+```
+
+Where appropriate there is also a standardization around the 5-tuple JSON fields.  This is done so the topology correlation engine further down stream can correlate messages from different topologies by these fields.  We are currently working on expanding the message standardization beyond these fields, but this feature is not yet availabe.  The standard field names are as follows:
+
+* ip_src_addr: layer 3 source IP
+* ip_dst_addr: layer 3 dest IP
+* ip_src_port: layer 4 source port
+* ip_dst_port: layer 4 dest port
+* protocol: layer 4 protocol
+* timestamp (epoch)
+* original_string: A human friendly string representation of the message
+
+The timestamp and original_string fields are madatory. The remaining standard fields are optional.  If any of the optional fields are not applicable then the field should be left out of the JSON.
+
+So putting it all together a typical OpenSOC message with all 5-tuple fields present would look like the following:
+
+```json
+{
+"message": 
+{"ip_src_addr": xxxx, 
+"ip_dst_addr": xxxx, 
+"ip_src_port": xxxx, 
+"ip_dst_port": xxxx, 
+"protocol": xxxx, 
+"original_string": xxx,
+"additional-field 1": xxx,
+}
+
+}
+```
+
+##Parser Bolt
+
+The OpenSOC parser bolt is a standard bolt, which can be extended with multiple Java and Grok parser adapter for parsing different topology messages.  The bolt signature for declaration in a storm topology is as follows:
+
+```
+AbstractParserBolt parser_bolt = new TelemetryParserBolt()
+.withMessageParser(parser)
+.withMessageFilter(new GenericMessageFilter())
+.withMetricConfig(config);
+
+```
+
+Metric Config - optional argument for exporting custom metrics to graphite.  If set to null no metrics will be exported.  If set, then a list of metrics defined in the metrics.conf file of each topology will define will metrics are exported and how often.
+
+Message Filter - a filter defining which messages can be dropped.  This feature is only present in the Java paerer adapters
+
+Message Parser - defines the parser adapter to be used for a topology
+
+##Parser Adapters
+
+Parser adapters are loaded dynamically in each OpenSOC topology.  They are defined in topology.conf in the configuration item bolt.parser.adapter
+
+###Java Parser Adapters
+Java parser adapters are indended for higher-velocity topologies and are not easily changed or extended.  As the adoption of OpenSOC continues we plan on extending our library of Java adapters to process more log formats.  As of this moment the Java adapters included with OpenSOC are:
+
+* com.opensoc.parsing.parsers.BasicIseParser : Parse ISE messages
+* com.opensoc.parsing.parsers.BasicBroParser : Parse Bro messages
+* com.opensoc.parsing.parsers.BasicSourcefireParser : Parse Sourcefire messages
+* com.opensoc.parsing.parsers.BasicLancopeParser : Parse Lancope messages
+
+###Grok Parser Adapters
+Grok parser adapters are designed primarly for someone who is not a Java coder for quickly standing up a parser adapter for lower velocity topologies.  Grok relies on Regex for message parsing, which is much slower than purpose-built Java parsers, but is more extensible.  Grok parsers are defined via a config file and the topplogy does not need to be recombiled in order to make changes to them.  An example of a Grok perser is:
+
+* com.opensoc.parsing.parsers.GrokSourcefireParser
+
+For more information on the Grok project please refer to the following link:
+
+https://github.com/thekrakken/java-grok

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-MessageParsers/src/main/java/com/opensoc/parsing/AbstractParserBolt.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-MessageParsers/src/main/java/com/opensoc/parsing/AbstractParserBolt.java b/opensoc-streaming/OpenSOC-MessageParsers/src/main/java/com/opensoc/parsing/AbstractParserBolt.java
index ace7141..7dc5d4f 100644
--- a/opensoc-streaming/OpenSOC-MessageParsers/src/main/java/com/opensoc/parsing/AbstractParserBolt.java
+++ b/opensoc-streaming/OpenSOC-MessageParsers/src/main/java/com/opensoc/parsing/AbstractParserBolt.java
@@ -18,9 +18,7 @@
 package com.opensoc.parsing;
 
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.util.Map;
-import java.util.zip.Deflater;
 
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
@@ -105,32 +103,15 @@ public abstract class AbstractParserBolt extends BaseRichBolt {
 	public boolean checkForSchemaCorrectness(JSONObject message) {
 		int correct = 0;
 
-		if (message.containsKey("ip_src_addr")) {
-			correct++;
-			LOG.trace("[OpenSOC] Message contains ip_src_addr");
-		}
-		if (message.containsKey("ip_dst_addr")) {
-			correct++;
-			LOG.trace("[OpenSOC] Message contains ip_dst_addr");
-		}
-		if (message.containsKey("ip_src_port")) {
-			correct++;
-			LOG.trace("[OpenSOC] Message contains ip_src_port");
-		}
-		if (message.containsKey("ip_dst_port")) {
-			correct++;
-			LOG.trace("[OpenSOC] Message contains ip_dst_port");
-		}
-		if (message.containsKey("protocol")) {
-			correct++;
-			LOG.trace("[OpenSOC] Message contains protocol");
-		}
-
-		if (correct == 0) {
-			LOG.trace("[OpenSOC] Message conforms to schema: " + message);
+		
+		if (!(message.containsKey("original_string"))) {
+			LOG.trace("[OpenSOC] Message does not have original_string: " + message);
+			return false;
+		} else if (!(message.containsKey("timestamp"))) { 
+			LOG.trace("[OpenSOC] Message does not have timestamp: " + message);
 			return false;
 		} else {
-			LOG.trace("[OpenSOC] Message does not conform to schema: "
+			LOG.trace("[OpenSOC] Message conforms to schema: "
 					+ message);
 			return true;
 		}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-MessageParsers/src/main/java/com/opensoc/parsing/PcapParserBolt.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-MessageParsers/src/main/java/com/opensoc/parsing/PcapParserBolt.java b/opensoc-streaming/OpenSOC-MessageParsers/src/main/java/com/opensoc/parsing/PcapParserBolt.java
index bd3951b..4fb6482 100644
--- a/opensoc-streaming/OpenSOC-MessageParsers/src/main/java/com/opensoc/parsing/PcapParserBolt.java
+++ b/opensoc-streaming/OpenSOC-MessageParsers/src/main/java/com/opensoc/parsing/PcapParserBolt.java
@@ -7,9 +7,9 @@ import org.apache.log4j.Logger;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
 
+import com.opensoc.helpers.topology.ErrorGenerator;
 import com.opensoc.parsing.parsers.PcapParser;
 import com.opensoc.pcap.PacketInfo;
-import com.opensoc.topologyhelpers.ErrorGenerator;
 
 import backtype.storm.generated.Grouping;
 import backtype.storm.task.OutputCollector;
@@ -49,11 +49,9 @@ private Map conf;
   @SuppressWarnings("unused")
 private int numberOfCharsToUseForShuffleGrouping = 4;
 
-  /** The micro sec multiplier. */
-  private long microSecMultiplier = 1L;
+  /** The divisor to convert nanos to expected time precision. */
+  private long timePrecisionDivisor = 1L;
 
-  /** The sec multiplier. */
-  private long secMultiplier = 1000000L;
 
   // HBaseStreamPartitioner hBaseStreamPartitioner = null ;
 
@@ -64,6 +62,26 @@ private int numberOfCharsToUseForShuffleGrouping = 4;
 
   }
 
+  public PcapParserBolt withTsPrecision(String tsPrecision) {
+	if (tsPrecision.equalsIgnoreCase("MILLI")) {
+	  //Convert nanos to millis
+	  LOG.info("Configured for MILLI, setting timePrecisionDivisor to 1000000L" );
+	  timePrecisionDivisor = 1000000L;
+	} else if (tsPrecision.equalsIgnoreCase("MICRO")) {
+	  //Convert nanos to micro
+	  LOG.info("Configured for MICRO, setting timePrecisionDivisor to 1000L" );
+	  timePrecisionDivisor = 1000L;
+	} else if (tsPrecision.equalsIgnoreCase("NANO")) {
+	  //Keep nano as is.
+	  LOG.info("Configured for NANO, setting timePrecisionDivisor to 1L" );
+	  timePrecisionDivisor = 1L;
+	} else {
+	  LOG.info("bolt.parser.ts.precision not set. Default to NANO");
+	  timePrecisionDivisor = 1L;
+	}
+	return this;
+  }
+  
   /*
    * (non-Javadoc)
    * 
@@ -116,19 +134,7 @@ private int numberOfCharsToUseForShuffleGrouping = 4;
     
     Grouping._Fields a;
 
-    if (conf.containsKey("bolt.parser.ts.precision")) {
-      String timePrecision = conf.get("bolt.parser.ts.precision").toString();
-      if (timePrecision.equalsIgnoreCase("MILLI")) {
-        microSecMultiplier = 1L / 1000;
-        secMultiplier = 1000L;
-      } else if (timePrecision.equalsIgnoreCase("MICRO")) {
-        microSecMultiplier = 1L;
-        secMultiplier = 1000000L;
-      } else if (timePrecision.equalsIgnoreCase("NANO")) {
-        microSecMultiplier = 1000L;
-        secMultiplier = 1000000000L;
-      }
-    }
+
     // hBaseStreamPartitioner = new HBaseStreamPartitioner(
     // conf.get("bolt.hbase.table.name").toString(),
     // 0,
@@ -165,6 +171,7 @@ public void execute(Tuple input) {
 
         for (PacketInfo packetInfo : packetInfoList) {
         	
+        	
         	String string_pcap = packetInfo.getJsonIndexDoc();
         	Object obj=JSONValue.parse(string_pcap);
         	  JSONObject header=(JSONObject)obj;
@@ -172,40 +179,6 @@ public void execute(Tuple input) {
         	JSONObject message = new JSONObject();
         	//message.put("key", packetInfo.getKey());
         	
-        	if(header.containsKey("src_addr"))
-        	{
-        		String tmp = header.get("src_addr").toString();
-        		header.remove("src_addr");
-        		header.put("ip_src_addr", tmp);
-        	}
-        	
-        	if(header.containsKey("dst_addr"))
-        	{
-        		String tmp = header.get("dst_addr").toString();
-        		header.remove("dst_addr");
-        		header.put("ip_dst_addr", tmp);
-        	}
-        	
-        	if(header.containsKey("src_port"))
-        	{
-        		String tmp = header.get("src_port").toString();
-        		header.remove("src_port");
-        		header.put("ip_src_port", tmp);
-        	}
-        	
-        	if(message.containsKey("dst_port"))
-        	{
-        		String tmp = header.get("dst_port").toString();
-        		header.remove("dst_port");
-        		header.put("ip_dst_port", tmp);
-        	}
-        	if(message.containsKey("ip_protocol"))
-        	{
-        		String tmp = header.get("ip_protocol").toString();
-        		header.remove("ip_protocol");
-        		header.put("protocol", tmp);
-        	}
-        	
         	message.put("message", header);
         	
         	collector.emit("message", new Values(packetInfo.getKey(), message));
@@ -214,7 +187,7 @@ public void execute(Tuple input) {
         	
           collector.emit("pcap_header_stream", new Values(packetInfo.getJsonDoc(), packetInfo.getKey()));
           collector.emit("pcap_data_stream", new Values(packetInfo.getKey(),
-              (packetInfo.getPacketHeader().getTsSec() * secMultiplier + packetInfo.getPacketHeader().getTsUsec() * microSecMultiplier),
+             packetInfo.getPacketTimeInNanos() / timePrecisionDivisor,
               input.getBinary(0)));
 
           // collector.emit(new Values(packetInfo.getJsonDoc(), packetInfo
@@ -230,11 +203,9 @@ public void execute(Tuple input) {
       e.printStackTrace();
       LOG.error("Exception while processing tuple", e);
       
-      String error_as_string = org.apache.commons.lang.exception.ExceptionUtils
-				.getStackTrace(e);
 
 		JSONObject error = ErrorGenerator.generateErrorMessage(
-				"Alerts problem: " + input.getBinary(0), error_as_string);
+				"Alerts problem: " + input.getBinary(0), e);
 		collector.emit("error", new Values(error));
 		
       return;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-MessageParsers/src/main/java/com/opensoc/parsing/TelemetryParserBolt.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-MessageParsers/src/main/java/com/opensoc/parsing/TelemetryParserBolt.java b/opensoc-streaming/OpenSOC-MessageParsers/src/main/java/com/opensoc/parsing/TelemetryParserBolt.java
index b324eb7..8a48764 100644
--- a/opensoc-streaming/OpenSOC-MessageParsers/src/main/java/com/opensoc/parsing/TelemetryParserBolt.java
+++ b/opensoc-streaming/OpenSOC-MessageParsers/src/main/java/com/opensoc/parsing/TelemetryParserBolt.java
@@ -30,11 +30,11 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 
+import com.opensoc.helpers.topology.ErrorGenerator;
 import com.opensoc.json.serialization.JSONEncoderHelper;
 import com.opensoc.metrics.MetricReporter;
 import com.opensoc.parser.interfaces.MessageFilter;
 import com.opensoc.parser.interfaces.MessageParser;
-import com.opensoc.topologyhelpers.ErrorGenerator;
 
 /**
  * Uses an adapter to parse a telemetry message from its native format into a
@@ -125,6 +125,11 @@ public class TelemetryParserBolt extends AbstractParserBolt {
 			LOG.info("[OpenSOC] Metric reporter is not initialized");
 		}
 		this.registerCounters();
+		
+		if(_parser != null)
+		_parser.init();
+		
+		
 	}
 
 	@SuppressWarnings("unchecked")
@@ -203,7 +208,7 @@ public class TelemetryParserBolt extends AbstractParserBolt {
 
 			JSONObject error = ErrorGenerator.generateErrorMessage(
 					"Parsing problem: " + new String(original_message),
-					e.toString());
+					e);
 			_collector.emit("error", new Values(error));
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-MessageParsers/src/main/java/com/opensoc/parsing/parsers/AbstractParser.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-MessageParsers/src/main/java/com/opensoc/parsing/parsers/AbstractParser.java b/opensoc-streaming/OpenSOC-MessageParsers/src/main/java/com/opensoc/parsing/parsers/AbstractParser.java
index e5fa29e..728e275 100644
--- a/opensoc-streaming/OpenSOC-MessageParsers/src/main/java/com/opensoc/parsing/parsers/AbstractParser.java
+++ b/opensoc-streaming/OpenSOC-MessageParsers/src/main/java/com/opensoc/parsing/parsers/AbstractParser.java
@@ -34,9 +34,15 @@ public abstract class AbstractParser implements MessageParser, Serializable {
 
 	public void initializeParser() {
 		_LOG.debug("Initializing adapter...");
+		
 
 	}
-
+	
+	public void init() {
+		
+	}
+	
+	
 	abstract public JSONObject parse(byte[] raw_message);
 
 }