You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/02/16 18:10:21 UTC

[5/5] incubator-metron git commit: METRON-35 Implement threat intelligence message enrichment closes apache/incubator-metron#22

METRON-35 Implement threat intelligence message enrichment closes apache/incubator-metron#22


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/1ddfd12c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/1ddfd12c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/1ddfd12c

Branch: refs/heads/master
Commit: 1ddfd12c274ccf45042d9a6533e96840936c1aaa
Parents: 905b09f
Author: cestella <ce...@gmail.com>
Authored: Tue Feb 16 12:09:54 2016 -0500
Committer: cstella <ce...@gmail.com>
Committed: Tue Feb 16 12:09:54 2016 -0500

----------------------------------------------------------------------
 .gitignore                                      |   2 +-
 .../inventory/singlenode-vagrant/group_vars/all |   2 +
 deployment/roles/hadoop_setup/tasks/main.yml    |   6 +-
 .../roles/metron_streaming/tasks/main.yml       |  14 +
 deployment/roles/metron_streaming/vars/main.yml |   2 +
 metron-streaming/Metron-Alerts/pom.xml          | 274 ++++----
 .../src/main/resources/hbase-site.xml           | 131 ----
 metron-streaming/Metron-Common/pom.xml          | 378 ++++++-----
 .../enrichment/EnrichmentSplitterBolt.java      | 112 ++++
 .../interfaces/EnrichmentAdapter.java           |   1 +
 .../java/org/apache/metron/hbase/Connector.java |   6 +-
 .../java/org/apache/metron/hbase/HBaseBolt.java |  37 +-
 .../apache/metron/hbase/HTableConnector.java    |  38 +-
 .../org/apache/metron/hbase/HTableProvider.java |  17 +
 .../org/apache/metron/hbase/TableConfig.java    | 103 +++
 .../org/apache/metron/hbase/TableProvider.java  |  14 +
 .../apache/metron/hbase/TupleTableConfig.java   | 102 +--
 .../apache/metron/reference/lookup/Lookup.java  |  61 ++
 .../metron/reference/lookup/LookupKey.java      |   8 +
 .../lookup/accesstracker/AccessTracker.java     |  23 +
 .../lookup/accesstracker/AccessTrackerUtil.java |  69 ++
 .../accesstracker/BloomAccessTracker.java       | 132 ++++
 .../accesstracker/PersistentAccessTracker.java  | 192 ++++++
 .../reference/lookup/handler/Handler.java       |  13 +
 .../metron/threatintel/ThreatIntelKey.java      |  75 +++
 .../metron/threatintel/ThreatIntelResults.java  |  55 ++
 .../metron/threatintel/hbase/Converter.java     |  82 +++
 .../threatintel/hbase/ThreatIntelLookup.java    |  60 ++
 metron-streaming/Metron-DataLoads/pom.xml       | 339 +++++++---
 .../src/main/assembly/assembly.xml              |  28 +
 .../src/main/bash/threatintel_bulk_load.sh      |  21 +
 .../src/main/bash/threatintel_bulk_prune.sh     |  21 +
 .../dataloads/LeastRecentlyUsedPruner.java      | 207 ++++++
 .../metron/dataloads/ThreatIntelBulkLoader.java | 199 ++++++
 .../metron/dataloads/extractor/Extractor.java   |  14 +
 .../dataloads/extractor/ExtractorCreator.java   |  10 +
 .../dataloads/extractor/ExtractorHandler.java   |  73 ++
 .../metron/dataloads/extractor/Extractors.java  |  44 ++
 .../dataloads/extractor/csv/CSVExtractor.java   |  99 +++
 .../extractor/inputformat/Formats.java          |  41 ++
 .../inputformat/InputFormatHandler.java         |  14 +
 .../extractor/inputformat/WholeFileFormat.java  |  95 +++
 .../dataloads/extractor/stix/StixExtractor.java |  96 +++
 .../stix/types/AbstractObjectTypeHandler.java   |  20 +
 .../extractor/stix/types/AddressHandler.java    |  61 ++
 .../extractor/stix/types/DomainHandler.java     |  41 ++
 .../extractor/stix/types/HostnameHandler.java   |  39 ++
 .../extractor/stix/types/ObjectTypeHandler.java |  16 +
 .../stix/types/ObjectTypeHandlers.java          |  28 +
 .../dataloads/hbase/mr/BulkLoadMapper.java      |  52 ++
 .../metron/dataloads/hbase/mr/PrunerMapper.java |  62 ++
 .../src/main/resources/hbase-site.xml           | 100 ---
 .../dataloads/extractor/ExtractorTest.java      |  61 ++
 .../extractor/csv/CSVExtractorTest.java         | 102 +++
 .../extractor/stix/StixExtractorTest.java       | 185 +++++
 .../dataloads/hbase/HBaseConverterTest.java     |  53 ++
 .../hbase/mr/BulkLoadMapperIntegrationTest.java |  91 +++
 .../dataloads/hbase/mr/BulkLoadMapperTest.java  |  78 +++
 .../metron/dataloads/hbase/mr/HBaseUtil.java    |  58 ++
 .../LeastRecentlyUsedPrunerIntegrationTest.java | 124 ++++
 .../src/main/resources/hbase-site.xml           | 127 ----
 .../Metron-EnrichmentAdapters/pom.xml           | 302 ++++-----
 .../adapters/cif/CIFHbaseAdapter.java           |   5 +
 .../enrichment/adapters/geo/GeoAdapter.java     |   5 +
 .../adapters/host/HostFromJSONListAdapter.java  |   5 +
 .../host/HostFromPropertiesFileAdapter.java     |   7 +-
 .../enrichment/adapters/jdbc/JdbcAdapter.java   |   4 +-
 .../adapters/threat/ThreatHbaseAdapter.java     |   8 +-
 .../adapters/whois/WhoisHBaseAdapter.java       |   7 +-
 .../enrichment/bolt/EnrichmentJoinBolt.java     |  15 +-
 .../enrichment/bolt/GenericEnrichmentBolt.java  |  19 +-
 .../metron/threatintel/ThreatIntelAdapter.java  |  97 +++
 .../metron/threatintel/ThreatIntelConfig.java   | 111 +++
 .../src/main/resources/hbase-site.xml           | 131 ----
 .../adapters/cif/CIFHbaseAdapterTest.java       |   4 +-
 .../adapters/geo/GeoMysqlAdapterTest.java       |   4 +-
 .../adapters/whois/WhoisHBaseAdapterTest.java   |   3 +-
 metron-streaming/Metron-Indexing/pom.xml        | 188 +++---
 metron-streaming/Metron-MessageParsers/pom.xml  |  10 +
 .../apache/metron/bolt/TelemetryParserBolt.java |  54 +-
 .../src/main/resources/hbase-site.xml           | 127 ----
 metron-streaming/Metron-Topologies/pom.xml      | 428 +++++++-----
 .../Metron_Configs/etc/env/config.properties    |  10 +-
 .../resources/Metron_Configs/etc/hbase-site.xml | 127 ----
 .../Metron_Configs/topologies/pcap/local.yaml   | 122 +++-
 .../Metron_Configs/topologies/pcap/remote.yaml  | 190 ++++--
 .../src/main/resources/hbase-site.xml           | 131 ----
 .../integration/pcap/PcapIntegrationTest.java   |  53 +-
 .../util/integration/ComponentRunner.java       |   2 +-
 .../components/ElasticSearchComponent.java      |  11 +-
 .../integration/util/mock/MockGeoAdapter.java   |   5 +
 .../integration/util/mock/MockHTable.java       | 668 +++++++++++++++++++
 .../util/threatintel/ThreatIntelHelper.java     |  22 +
 metron-streaming/pom.xml                        |  17 +-
 94 files changed, 5583 insertions(+), 1917 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 7da5300..c9d729b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -16,4 +16,4 @@ target
 *.project
 *.classpath
 *.settings
-
+*hbase-site.xml

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/deployment/inventory/singlenode-vagrant/group_vars/all
----------------------------------------------------------------------
diff --git a/deployment/inventory/singlenode-vagrant/group_vars/all b/deployment/inventory/singlenode-vagrant/group_vars/all
index 44d954b..bf3c610 100644
--- a/deployment/inventory/singlenode-vagrant/group_vars/all
+++ b/deployment/inventory/singlenode-vagrant/group_vars/all
@@ -29,6 +29,8 @@ yaf_topic: "ipfix"
 snort_topic: "snort"
 bro_topic: "bro"
 pcap_hbase_table: "pcap_test"
+tracker_hbase_table: "access_tracker"
+threatintel_ip_hbase_table: "malicious_ip"
 pycapa_repo: "https://github.com/OpenSOC/pycapa.git"
 pycapa_home: "/opt/pycapa"
 pycapa_topic: "pcap"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/deployment/roles/hadoop_setup/tasks/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/hadoop_setup/tasks/main.yml b/deployment/roles/hadoop_setup/tasks/main.yml
index 2e240e8..d8069a0 100644
--- a/deployment/roles/hadoop_setup/tasks/main.yml
+++ b/deployment/roles/hadoop_setup/tasks/main.yml
@@ -1,7 +1,11 @@
 ---
 - name: Create HBase tables
-  shell: echo "create '{{ pcap_hbase_table }}','t'" | hbase shell -n
+  shell: echo "create '{{ item }}','t'" | hbase shell -n
   ignore_errors: yes
+  with_items:
+    - "{{ pcap_hbase_table }}"
+    - "{{ tracker_hbase_table }}"
+    - "{{ threatintel_ip_hbase_table }}"
 
 - name: Create Kafka topics
   shell: "{{ kafka_home }}/bin/kafka-topics.sh --zookeeper {{ zookeeper_url }} --create --topic {{ item }} --partitions 1 --replication-factor 1"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/deployment/roles/metron_streaming/tasks/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/tasks/main.yml b/deployment/roles/metron_streaming/tasks/main.yml
index de387cf..0164d55 100644
--- a/deployment/roles/metron_streaming/tasks/main.yml
+++ b/deployment/roles/metron_streaming/tasks/main.yml
@@ -3,6 +3,7 @@
   file: path="{{ metron_directory }}/{{ item.name }}"  state=directory mode=0755
   with_items:
       - { name: 'lib'}
+      - { name: 'bin'}
       - { name: 'config'}
 
 - stat: path={{ metron_jar_path }}
@@ -16,6 +17,14 @@
     src: "{{ metron_jar_path }}"
     dest: "{{ metron_directory }}/lib/"
 
+- name: Copy Metron DataLoads bundle
+  copy:
+    src: "{{ metron_dataloads_path }}"
+    dest: "{{ metron_directory }}"
+
+- name: Unbundle Metron DataLoads bundle
+  shell: cd {{ metron_directory }} && tar xzvf *.tar.gz
+
 - name: Alternatives link for "java"
   alternatives: name={{ item.name }} link={{ item.link }}  path={{ item.path }}
   with_items:
@@ -50,6 +59,11 @@
     - { regexp: "spout.kafka.topic.pcap=", line: "spout.kafka.topic.pcap={{ pycapa_topic }}" }
     - { regexp: "spout.kafka.topic.bro=", line: "spout.kafka.topic.bro={{ bro_topic }}" }
     - { regexp: "bolt.hbase.table.name=", line: "bolt.hbase.table.name={{ pcap_hbase_table }}" }
+    - { regexp: "threat.intel.tracker.table=", line: "threat.intel.tracker.table={{ tracker_hbase_table }}" }
+    - { regexp: "threat.intel.tracker.cf=", line: "threat.intel.tracker.cf=t" }
+    - { regexp: "threat.intel.ip.table=", line: "threat.intel.ip.table={{ threatintel_ip_hbase_table }}" }
+    - { regexp: "threat.intel.ip.cf=", line: "threat.intel.ip.cf=t" }
+    - { regexp: "mysql.ip=", line: "mysql.ip={{ groups.search[0] }}" }
 
 - name: Add Elasticsearch templates for topologies
   uri:

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/deployment/roles/metron_streaming/vars/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/vars/main.yml b/deployment/roles/metron_streaming/vars/main.yml
index e7004c1..a318db3 100644
--- a/deployment/roles/metron_streaming/vars/main.yml
+++ b/deployment/roles/metron_streaming/vars/main.yml
@@ -2,6 +2,8 @@
 # vars file for elasticsearch
 metron_directory: /usr/metron/{{ metron_version }}
 metron_jar_name: Metron-Topologies-{{ metron_version }}.jar
+metron_dataloads_name: Metron-DataLoads-{{ metron_version }}-archive.tar.gz
 metron_jar_path: "{{ playbook_dir }}/../../metron-streaming/Metron-Topologies/target/{{ metron_jar_name }}"
+metron_dataloads_path: "{{ playbook_dir }}/../../metron-streaming/Metron-DataLoads/target/{{ metron_dataloads_name }}"
 metron_src_config_path: "{{ playbook_dir }}/../../metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs"
 metron_properties_config_path: "{{ metron_directory }}/config/etc/env/config.properties"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Alerts/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/pom.xml b/metron-streaming/Metron-Alerts/pom.xml
index 971a4bb..8a2a6c8 100644
--- a/metron-streaming/Metron-Alerts/pom.xml
+++ b/metron-streaming/Metron-Alerts/pom.xml
@@ -10,127 +10,157 @@
 	the specific language governing permissions and limitations under the License. -->
 
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-	<parent>
-		<groupId>org.apache.metron</groupId>
-		<artifactId>Metron-Streaming</artifactId>
-		<version>0.6BETA</version>
-	</parent>
-	<artifactId>Metron-Alerts</artifactId>
-	<name>Metron-Alerts</name>
-	<description>Taggers for alerts</description>
-	<properties>
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.metron</groupId>
+        <artifactId>Metron-Streaming</artifactId>
+        <version>0.6BETA</version>
+    </parent>
+    <artifactId>Metron-Alerts</artifactId>
+    <name>Metron-Alerts</name>
+    <description>Taggers for alerts</description>
+    <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>		
-		<commons.validator.version>1.4.0</commons.validator.version>
-	</properties>
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.metron</groupId>
-			<artifactId>Metron-Common</artifactId>
-			<version>${project.parent.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>com.googlecode.json-simple</groupId>
-			<artifactId>json-simple</artifactId>
-			<version>${global_json_simple_version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.storm</groupId>
-			<artifactId>storm-core</artifactId>
-			<version>${global_storm_version}</version>
-			<scope>provided</scope>
-			<exclusions>
-				<exclusion>
-				   <artifactId>servlet-api</artifactId>
-				   <groupId>javax.servlet</groupId>
-				  </exclusion>
-		    </exclusions>					
-		</dependency>
-		<dependency>
-			<groupId>org.apache.kafka</groupId>
-			<artifactId>kafka_2.9.2</artifactId>
-			<version>${global_kafka_version}</version>
-			<scope>provided</scope>
-			<exclusions>
-				<exclusion>
-					<groupId>com.sun.jmx</groupId>
-					<artifactId>jmxri</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>com.sun.jdmk</groupId>
-					<artifactId>jmxtools</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>javax.jms</groupId>
-					<artifactId>jms</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-		<dependency>
-			<groupId>com.codahale.metrics</groupId>
-			<artifactId>metrics-core</artifactId>
-			<version>${global_metrics_version}</version>
-		</dependency>
-		<dependency>
-			<groupId>commons-validator</groupId>
-			<artifactId>commons-validator</artifactId>
-			<version>${commons.validator.version}</version>
-			<exclusions>
-				<exclusion>
-				
-				<groupId>commons-beanutils</groupId>
-				
-				<artifactId>commons-beanutils</artifactId>
-				
-				</exclusion>
-			</exclusions>
-		</dependency>
-	</dependencies>
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-surefire-plugin</artifactId>
-				<version>2.18</version>
-				<configuration>
-					<systemProperties>
-						<property>
-							<name>mode</name>
-							<value>local</value>
-						</property>
-					</systemProperties>
-				</configuration>
-			</plugin>		
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<version>3.1</version>
-				<configuration>
-					<source>1.7</source>
-					<target>1.7</target>
-				</configuration>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-pmd-plugin</artifactId>
-				<version>3.3</version>
-				<configuration>
-					<targetJdk>1.7</targetJdk>
-				</configuration>
-			</plugin>
-			<plugin>
-				<groupId>org.codehaus.mojo</groupId>
-				<artifactId>emma-maven-plugin</artifactId>
-				<version>1.0-alpha-3</version>
-				<inherited>true</inherited>
-			</plugin>			
-		</plugins>
-		<resources>
-			<resource>
-				<directory>src/main/resources</directory>
-			</resource>
-		</resources>
-	</build>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+        <commons.validator.version>1.4.0</commons.validator.version>
+    </properties>
+    <dependencies>
+       <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${global_hbase_guava_version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>Metron-Common</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.googlecode.json-simple</groupId>
+            <artifactId>json-simple</artifactId>
+            <version>${global_json_simple_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${global_storm_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.9.2</artifactId>
+            <version>${global_kafka_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.sun.jmx</groupId>
+                    <artifactId>jmxri</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jdmk</groupId>
+                    <artifactId>jmxtools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.jms</groupId>
+                    <artifactId>jms</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <version>${global_hbase_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.codahale.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+            <version>${global_metrics_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-validator</groupId>
+            <artifactId>commons-validator</artifactId>
+            <version>${commons.validator.version}</version>
+            <exclusions>
+                <exclusion>
+
+                    <groupId>commons-beanutils</groupId>
+
+                    <artifactId>commons-beanutils</artifactId>
+
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.18</version>
+                <configuration>
+                    <systemProperties>
+                        <property>
+                            <name>mode</name>
+                            <value>local</value>
+                        </property>
+                    </systemProperties>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version>
+                <configuration>
+                    <source>1.7</source>
+                    <target>1.7</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-pmd-plugin</artifactId>
+                <version>3.3</version>
+                <configuration>
+                    <targetJdk>1.7</targetJdk>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>emma-maven-plugin</artifactId>
+                <version>1.0-alpha-3</version>
+                <inherited>true</inherited>
+            </plugin>
+        </plugins>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+    </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Alerts/src/main/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/resources/hbase-site.xml b/metron-streaming/Metron-Alerts/src/main/resources/hbase-site.xml
deleted file mode 100644
index 8d812a9..0000000
--- a/metron-streaming/Metron-Alerts/src/main/resources/hbase-site.xml
+++ /dev/null
@@ -1,131 +0,0 @@
-<!--Tue Apr  1 18:16:39 2014-->
-  <configuration>
-    <property>
-    <name>hbase.tmp.dir</name>
-    <value>/disk/h/hbase</value>
-  </property>
-    <property>
-    <name>hbase.hregion.memstore.chunkpool.maxsize</name>
-    <value>0.5</value>
-  </property>
-    <property>
-    <name>hbase.regionserver.codecs</name>
-    <value>lzo,gz,snappy</value>
-  </property>
-    <property>
-    <name>hbase.hstore.flush.retries.number</name>
-    <value>120</value>
-  </property>
-    <property>
-    <name>hbase.client.keyvalue.maxsize</name>
-    <value>10485760</value>
-  </property>
-    <property>
-    <name>hbase.rootdir</name>
-    <value>hdfs://nn1:8020/apps/hbase/data</value>
-  </property>
-    <property>
-    <name>hbase.defaults.for.version.skip</name>
-    <value>true</value>
-  </property>
-    <property>
-    <name>hbase.client.scanner.caching</name>
-    <value>100</value>
-  </property>
-    <property>
-    <name>hbase.superuser</name>
-    <value>hbase</value>
-  </property>
-    <property>
-    <name>hfile.block.cache.size</name>
-    <value>0.40</value>
-  </property>
-    <property>
-    <name>hbase.regionserver.checksum.verify</name>
-    <value>true</value>
-  </property>
-    <property>
-    <name>hbase.hregion.memstore.mslab.enabled</name>
-    <value>true</value>
-  </property>
-    <property>
-    <name>hbase.hregion.max.filesize</name>
-    <value>107374182400</value>
-  </property>
-    <property>
-    <name>hbase.cluster.distributed</name>
-    <value>true</value>
-  </property>
-    <property>
-    <name>zookeeper.session.timeout</name>
-    <value>30000</value>
-  </property>
-    <property>
-    <name>zookeeper.znode.parent</name>
-    <value>/hbase-unsecure</value>
-  </property>
-    <property>
-    <name>hbase.regionserver.global.memstore.lowerLimit</name>
-    <value>0.38</value>
-  </property>
-    <property>
-    <name>hbase.regionserver.handler.count</name>
-    <value>240</value>
-  </property>
-    <property>
-    <name>hbase.hregion.memstore.mslab.chunksize</name>
-    <value>8388608</value>
-  </property>
-    <property>
-    <name>hbase.zookeeper.quorum</name>
-    <value>zkpr1,zkpr2,zkpr3</value>
-  </property>
-    <property>
-    <name>hbase.zookeeper.useMulti</name>
-    <value>true</value>
-  </property>
-    <property>
-    <name>hbase.hregion.majorcompaction</name>
-    <value>86400000</value>
-  </property>
-    <property>
-    <name>hbase.hstore.blockingStoreFiles</name>
-    <value>200</value>
-  </property>
-    <property>
-    <name>hbase.zookeeper.property.clientPort</name>
-    <value>2181</value>
-  </property>
-    <property>
-    <name>hbase.hregion.memstore.flush.size</name>
-    <value>134217728</value>
-  </property>
-    <property>
-    <name>hbase.security.authorization</name>
-    <value>false</value>
-  </property>
-    <property>
-    <name>hbase.regionserver.global.memstore.upperLimit</name>
-    <value>0.4</value>
-  </property>
-    <property>
-    <name>hbase.hstore.compactionThreshold</name>
-    <value>4</value>
-  </property>
-    <property>
-    <name>hbase.hregion.memstore.block.multiplier</name>
-    <value>8</value>
-  </property>
-    <property>
-    <name>hbase.security.authentication</name>
-    <value>simple</value>
-  </property>
-    <property>
-    <name>dfs.client.read.shortcircuit</name>
-    <value>true</value>
-  </property>
-  <property>
-    <name>dfs.domain.socket.path</name>
-    <value>/var/run/hdfs/dn_socket</value>
-  </property>
-  </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/pom.xml b/metron-streaming/Metron-Common/pom.xml
index 75ee9d9..97ec9c7 100644
--- a/metron-streaming/Metron-Common/pom.xml
+++ b/metron-streaming/Metron-Common/pom.xml
@@ -10,161 +10,229 @@
 	the specific language governing permissions and limitations under the License. -->
 
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-	<parent>
-		<groupId>org.apache.metron</groupId>
-		<artifactId>Metron-Streaming</artifactId>
-		<version>0.6BETA</version>
-	</parent>
-	<artifactId>Metron-Common</artifactId>
-	<name>Metron-Common</name>
-	<description>Components common to all enrichments</description>
-	<properties>
-		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-		<commons.config.version>1.10</commons.config.version>
-		<hbase.version>0.98.5-hadoop2</hbase.version>
-	</properties>
-	<repositories>
-		<repository>
-			<id>Metron-Kraken-Repo</id>
-			<name>Metron Kraken Repository</name>
-			<url>https://raw.github.com/opensoc/kraken/mvn-repo</url>
-		</repository>
-	</repositories>
-	<dependencies>
-		<dependency>
-			<groupId>com.googlecode.json-simple</groupId>
-			<artifactId>json-simple</artifactId>
-			<version>${global_json_simple_version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.storm</groupId>
-			<artifactId>storm-core</artifactId>
-			<version>${global_storm_version}</version>
-			<scope>provided</scope>
-			<exclusions>
-				<exclusion>
-				
-				   <artifactId>servlet-api</artifactId>
-				
-				   <groupId>javax.servlet</groupId>
-				
-				  </exclusion>
-			</exclusions>			
-		</dependency>
-		<dependency>
-			<groupId>org.apache.kafka</groupId>
-			<artifactId>kafka_2.9.2</artifactId>
-			<version>${global_kafka_version}</version>
-			<scope>provided</scope>
-			<exclusions>
-				<exclusion>
-					<groupId>com.sun.jmx</groupId>
-					<artifactId>jmxri</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>com.sun.jdmk</groupId>
-					<artifactId>jmxtools</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>javax.jms</groupId>
-					<artifactId>jms</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-		<dependency>
-			<groupId>com.codahale.metrics</groupId>
-			<artifactId>metrics-core</artifactId>
-			<version>${global_metrics_version}</version>
-		</dependency>
-		<dependency>
-			<groupId>com.codahale.metrics</groupId>
-			<artifactId>metrics-graphite</artifactId>
-			<version>${global_metrics_version}</version>
-		</dependency>
-		<dependency>
-			<groupId>commons-configuration</groupId>
-			<artifactId>commons-configuration</artifactId>
-			<version>${commons.config.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.krakenapps</groupId>
-			<artifactId>kraken-pcap</artifactId>
-			<version>${global_pcap_version}</version>
-		</dependency>
-		<dependency>
-			<groupId>junit</groupId>
-			<artifactId>junit</artifactId>
-			<version>${global_junit_version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hbase</groupId>
-			<artifactId>hbase-client</artifactId>
-			<version>${hbase.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>org.slf4j</groupId>
-					<artifactId>slf4j-log4j12</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>log4j</groupId>
-					<artifactId>log4j</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-		<dependency>
-			<groupId>com.github.fge</groupId>
-			<artifactId>json-schema-validator</artifactId>
-			<version>${global_json_schema_validator_version}</version>
-		</dependency>
-	</dependencies>
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.metron</groupId>
+        <artifactId>Metron-Streaming</artifactId>
+        <version>0.6BETA</version>
+    </parent>
+    <artifactId>Metron-Common</artifactId>
+    <name>Metron-Common</name>
+    <description>Components common to all enrichments</description>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+        <commons.config.version>1.10</commons.config.version>
+    </properties>
+    <repositories>
+        <repository>
+            <id>Metron-Kraken-Repo</id>
+            <name>Metron Kraken Repository</name>
+            <url>https://raw.github.com/opensoc/kraken/mvn-repo</url>
+        </repository>
+    </repositories>
+    <dependencies>
+        <dependency>
+            <groupId>com.opencsv</groupId>
+            <artifactId>opencsv</artifactId>
+            <version>${global_opencsv_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.googlecode.json-simple</groupId>
+            <artifactId>json-simple</artifactId>
+            <version>${global_json_simple_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${global_storm_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+            <version>${global_slf4j_version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.9.2</artifactId>
+            <version>${global_kafka_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.sun.jmx</groupId>
+                    <artifactId>jmxri</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jdmk</groupId>
+                    <artifactId>jmxtools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.jms</groupId>
+                    <artifactId>jms</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.codahale.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+            <version>${global_metrics_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.codahale.metrics</groupId>
+            <artifactId>metrics-graphite</artifactId>
+            <version>${global_metrics_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-configuration</groupId>
+            <artifactId>commons-configuration</artifactId>
+            <version>${commons.config.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.krakenapps</groupId>
+            <artifactId>kraken-pcap</artifactId>
+            <version>${global_pcap_version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>slf4j-api</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>slf4j-simple</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${global_guava_version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${global_junit_version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <version>${global_hbase_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.github.fge</groupId>
+            <artifactId>json-schema-validator</artifactId>
+            <version>${global_json_schema_validator_version}</version>
+        </dependency>
+    </dependencies>
 
-	<reporting>
-		<plugins>
-			<!-- Normally, dependency report takes time, skip it -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-project-info-reports-plugin</artifactId>
-				<version>2.7</version>
+    <reporting>
+        <plugins>
+            <!-- Normally, dependency report takes time, skip it -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-project-info-reports-plugin</artifactId>
+                <version>2.7</version>
 
-				<configuration>
-					<dependencyLocationsEnabled>false</dependencyLocationsEnabled>
-				</configuration>
-			</plugin>
-			<plugin>
-				<groupId>org.codehaus.mojo</groupId>
-				<artifactId>emma-maven-plugin</artifactId>
-				<version>1.0-alpha-3</version>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-pmd-plugin</artifactId>
-				<configuration>
-					<targetJdk>1.7</targetJdk>
-				</configuration>
-			</plugin>
-		</plugins>
-	</reporting>
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<version>3.1</version>
-				<configuration>
-					<source>1.7</source>
-					<compilerArgument>-Xlint:unchecked</compilerArgument>
-					<target>1.7</target>
-				</configuration>
-			</plugin>
-		</plugins>
-		<resources>
-			<resource>
-				<directory>src/main/resources</directory>
-			</resource>
-		</resources>
-	</build>
+                <configuration>
+                    <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>emma-maven-plugin</artifactId>
+                <version>1.0-alpha-3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-pmd-plugin</artifactId>
+                <configuration>
+                    <targetJdk>1.7</targetJdk>
+                </configuration>
+            </plugin>
+        </plugins>
+    </reporting>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version>
+                <configuration>
+                    <source>1.7</source>
+                    <compilerArgument>-Xlint:unchecked</compilerArgument>
+                    <target>1.7</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>1.4</version>
+                <configuration>
+                    <createDependencyReducedPom>true</createDependencyReducedPom>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <relocations>
+                                <relocation>
+                                    <pattern>com.google.common</pattern>
+                                    <shadedPattern>org.apache.metron.guava</shadedPattern>
+                                </relocation>
+                            </relocations>
+                            <transformers>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+                                    <resource>.yaml</resource>
+                                </transformer>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass></mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+    </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentSplitterBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentSplitterBolt.java
new file mode 100644
index 0000000..503d3ae
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentSplitterBolt.java
@@ -0,0 +1,112 @@
+package org.apache.metron.enrichment;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import com.google.common.base.Splitter;
+import org.apache.metron.bolt.SplitBolt;
+import org.apache.metron.domain.Enrichment;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * Created by cstella on 2/10/16.
+ */
+public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
+    protected static final Logger LOG = LoggerFactory.getLogger(EnrichmentSplitterBolt.class);
+    protected List<Enrichment> enrichments = new ArrayList<>();
+    protected String messageFieldName = "message";
+    /**
+     * @param enrichments A class for sending tuples to enrichment bolt
+     * @return Instance of this class
+     */
+    public EnrichmentSplitterBolt withEnrichments(List<Enrichment> enrichments) {
+        this.enrichments = enrichments;
+        return this;
+    }
+    public EnrichmentSplitterBolt withMessageFieldName(String messageFieldName) {
+        this.messageFieldName = messageFieldName;
+        return this;
+    }
+    @Override
+    public void prepare(Map map, TopologyContext topologyContext) {
+
+    }
+    @Override
+    public String getKey(Tuple tuple, JSONObject message) {
+        String key = null;
+        try {
+            key = tuple.getStringByField("key");
+        }
+        catch(Throwable t) {
+            //swallowing this just in case.
+        }
+        if(key != null) {
+            return key;
+        }
+        else {
+            return UUID.randomUUID().toString();
+        }
+    }
+
+    @Override
+    public List<JSONObject> generateMessages(Tuple tuple) {
+        return Arrays.asList((JSONObject)tuple.getValueByField(messageFieldName));
+    }
+
+    @Override
+    public Set<String> getStreamIds() {
+        Set<String> streamIds = new HashSet<>();
+        for(Enrichment enrichment: enrichments) {
+            streamIds.add(enrichment.getName());
+        }
+        return streamIds;
+    }
+    @SuppressWarnings("unchecked")
+    @Override
+    public Map<String, JSONObject> splitMessage(JSONObject message) {
+
+        Map<String, JSONObject> streamMessageMap = new HashMap<>();
+        for (Enrichment enrichment : enrichments) {
+            List<String> fields = enrichment.getFields();
+            if (fields != null && fields.size() > 0) {
+                JSONObject enrichmentObject = new JSONObject();
+                for (String field : fields) {
+                    enrichmentObject.put(field, getField(message,field));
+                }
+                streamMessageMap.put(enrichment.getName(), enrichmentObject);
+            }
+        }
+        /*if(message != null && enrichments.size() != 1) {
+            throw new RuntimeException("JSON: " + message.toJSONString() + " => " + streamMessageMap);
+        }*/
+        return streamMessageMap;
+    }
+
+    public Object getField(JSONObject object, String path) {
+        Map ret = object;
+        for(String node: Splitter.on('/').split(path))  {
+            Object o = ret.get(node);
+            if(o instanceof Map) {
+                ret = (Map) o;
+            }
+            else {
+                return o;
+            }
+        }
+        return ret;
+    }
+
+    @Override
+    public void declareOther(OutputFieldsDeclarer declarer) {
+
+    }
+
+    @Override
+    public void emitOther(Tuple tuple, List<JSONObject> messages) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
index 8eb5937..a51bebd 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
@@ -21,6 +21,7 @@ import org.json.simple.JSONObject;
 
 public interface EnrichmentAdapter<T>
 {
+	void logAccess(T value);
 	JSONObject enrich(T value);
 	boolean initializeAdapter();
 	void cleanup();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java
index 33b4eb8..a025e70 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java
@@ -11,15 +11,15 @@ import java.io.Serializable;
  * Created by cstella on 1/29/16.
  */
 public abstract class Connector {
-  protected TupleTableConfig tableConf;
+  protected TableConfig tableConf;
   protected String _quorum;
   protected String _port;
 
-  public Connector(final TupleTableConfig conf, String _quorum, String _port) throws IOException {
+  public Connector(final TableConfig conf, String _quorum, String _port) throws IOException {
     this.tableConf = conf;
     this._quorum = _quorum;
     this._port = _port;
   }
-  public abstract void put(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException;
+  public abstract void put(Put put) throws IOException;
   public abstract void close();
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java
index 64f3531..e026b18 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java
@@ -11,6 +11,7 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.log4j.Logger;
 import org.json.simple.JSONObject;
 
@@ -44,7 +45,6 @@ public class HBaseBolt implements IRichBolt {
   protected TupleTableConfig conf;
   protected boolean autoAck = true;
   protected Connector connector;
-  private String connectorImpl;
   private String _quorum;
   private String _port;
 
@@ -70,32 +70,11 @@ public class HBaseBolt implements IRichBolt {
     String hostPortPair = Iterables.getFirst(Splitter.on(",").split(connString), "");
     return Iterables.getLast(Splitter.on(":").split(hostPortPair),DEFAULT_ZK_PORT);
   }
-  public HBaseBolt withConnector(String connectorImpl) {
-    this.connectorImpl = connectorImpl;
-    return this;
-  }
+
 
   public Connector createConnector() throws IOException{
     initialize();
-    if(connectorImpl == null || connectorImpl.length() == 0 || connectorImpl.charAt(0) == '$') {
-      return new HTableConnector(conf, _quorum, _port);
-    }
-    else {
-      try {
-        Class<? extends Connector> clazz = (Class<? extends Connector>) Class.forName(connectorImpl);
-        return clazz.getConstructor(TupleTableConfig.class, String.class, String.class).newInstance(conf, _quorum, _port);
-      } catch (InstantiationException e) {
-        throw new IOException("Unable to instantiate connector.", e);
-      } catch (IllegalAccessException e) {
-        throw new IOException("Unable to instantiate connector: illegal access", e);
-      } catch (InvocationTargetException e) {
-        throw new IOException("Unable to instantiate connector", e);
-      } catch (NoSuchMethodException e) {
-        throw new IOException("Unable to instantiate connector: no such method", e);
-      } catch (ClassNotFoundException e) {
-        throw new IOException("Unable to instantiate connector: class not found", e);
-      }
-    }
+    return new HTableConnector(conf, _quorum, _port);
   }
 
   public void initialize() {
@@ -121,8 +100,9 @@ public class HBaseBolt implements IRichBolt {
     this.collector = collector;
 
     try {
-      this.connector = createConnector();
-
+      if(connector == null) {
+        this.connector = createConnector();
+      }
 		
     } catch (IOException e) {
       throw new RuntimeException(e);
@@ -135,11 +115,12 @@ public class HBaseBolt implements IRichBolt {
   
   public void execute(Tuple input) {
     try {
-      this.connector.put(conf.getPutFromTuple(input));
+      Put p = conf.getPutFromTuple(input);
+      this.connector.put(p);
     } catch (IOException ex) {
 
   		JSONObject error = ErrorGenerator.generateErrorMessage(
-  				"Alerts problem: " + input.getBinary(0), ex);
+  				"Alerts problem: " + input.toString(), ex);
   		collector.emit("error", new Values(error));
   		
       throw new RuntimeException(ex);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java
index 5302882..74c6acf 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java
@@ -3,6 +3,7 @@ package org.apache.metron.hbase;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
 
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
@@ -11,6 +12,7 @@ import com.google.common.collect.Iterables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -30,10 +32,9 @@ import javax.annotation.Nullable;
 public class HTableConnector extends Connector implements Serializable{
   private static final Logger LOG = Logger.getLogger(HTableConnector.class);
   private Configuration conf;
-  protected HTable table;
+  protected HTableInterface table;
   private String tableName;
-
-
+  private String connectorImpl;
 
 
   /**
@@ -41,8 +42,9 @@ public class HTableConnector extends Connector implements Serializable{
    * @param conf The {@link TupleTableConfig}
    * @throws IOException
    */
-  public HTableConnector(final TupleTableConfig conf, String _quorum, String _port) throws IOException {
+  public HTableConnector(final TableConfig conf, String _quorum, String _port) throws IOException {
     super(conf, _quorum, _port);
+    this.connectorImpl = conf.getConnectorImpl();
     this.tableName = conf.getTableName();
     this.conf = HBaseConfiguration.create();
     
@@ -56,7 +58,7 @@ public class HTableConnector extends Connector implements Serializable{
       this.conf.get("hbase.rootdir")));
 
     try {
-      this.table = new HTable(this.conf, this.tableName);
+      this.table = getTableProvider().getTable(this.conf, this.tableName);
     } catch (IOException ex) {
       throw new IOException("Unable to establish connection to HBase table " + this.tableName, ex);
     }
@@ -88,6 +90,28 @@ public class HTableConnector extends Connector implements Serializable{
     }
   }
 
+  protected TableProvider getTableProvider() throws IOException {
+    if(connectorImpl == null || connectorImpl.length() == 0 || connectorImpl.charAt(0) == '$') {
+      return new HTableProvider();
+    }
+    else {
+      try {
+        Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(connectorImpl);
+        return clazz.getConstructor().newInstance();
+      } catch (InstantiationException e) {
+        throw new IOException("Unable to instantiate connector.", e);
+      } catch (IllegalAccessException e) {
+        throw new IOException("Unable to instantiate connector: illegal access", e);
+      } catch (InvocationTargetException e) {
+        throw new IOException("Unable to instantiate connector", e);
+      } catch (NoSuchMethodException e) {
+        throw new IOException("Unable to instantiate connector: no such method", e);
+      } catch (ClassNotFoundException e) {
+        throw new IOException("Unable to instantiate connector: class not found", e);
+      }
+    }
+  }
+
   /**
    * Checks to see if table contains the given column family
    * @param columnFamily The column family name
@@ -101,12 +125,12 @@ public class HTableConnector extends Connector implements Serializable{
   /**
    * @return the table
    */
-  public HTable getTable() {
+  public HTableInterface getTable() {
     return table;
   }
 
   @Override
-  public void put(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
+  public void put(Put put) throws IOException {
       table.put(put);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java
new file mode 100644
index 0000000..f7e066c
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java
@@ -0,0 +1,17 @@
+package org.apache.metron.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+
+import java.io.IOException;
+
+/**
+ * Created by cstella on 2/11/16.
+ */
+public class HTableProvider implements TableProvider {
+    @Override
+    public HTableInterface getTable(Configuration config, String tableName) throws IOException {
+        return new HTable(config, tableName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableConfig.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableConfig.java
new file mode 100644
index 0000000..8c0966b
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableConfig.java
@@ -0,0 +1,103 @@
+package org.apache.metron.hbase;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Created by cstella on 2/11/16.
+ */
+public class TableConfig implements Serializable {
+    static final long serialVersionUID = -1L;
+    private String tableName;
+    private boolean batch = true;
+    protected Map<String, Set<String>> columnFamilies = new HashMap<>();
+    private long writeBufferSize = 0L;
+    private String connectorImpl;
+
+    public TableConfig() {
+
+    }
+
+    public TableConfig(String tableName) {
+        this.tableName = tableName;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public TableConfig withConnectorImpl(String impl) {
+        connectorImpl = impl;
+        return this;
+    }
+
+    public TableConfig withTable(String table) {
+        this.tableName = table;
+        return this;
+    }
+
+    public TableConfig withBatch(Boolean isBatch) {
+        this.batch = isBatch;
+        return this;
+    }
+
+    public String getConnectorImpl() {
+        return connectorImpl;
+    }
+
+    /**
+     * @return Whether batch mode is enabled
+     */
+    public boolean isBatch() {
+        return batch;
+    }
+
+    /**
+     * @param batch
+     *          Whether to enable HBase's client-side write buffer.
+     *          <p>
+     *          When enabled your bolt will store put operations locally until the
+     *          write buffer is full, so they can be sent to HBase in a single RPC
+     *          call. When disabled each put operation is effectively an RPC and
+     *          is sent straight to HBase. As your bolt can process thousands of
+     *          values per second it is recommended that the write buffer is
+     *          enabled.
+     *          <p>
+     *          Enabled by default
+     */
+    public void setBatch(boolean batch) {
+        this.batch = batch;
+    }
+    /**
+     * @param writeBufferSize
+     *          Overrides the client-side write buffer size.
+     *          <p>
+     *          By default the write buffer size is 2 MB (2097152 bytes). If you
+     *          are storing larger data, you may want to consider increasing this
+     *          value to allow your bolt to efficiently group together a larger
+     *          number of records per RPC
+     *          <p>
+     *          Overrides the write buffer size you have set in your
+     *          hbase-site.xml e.g. <code>hbase.client.write.buffer</code>
+     */
+    public void setWriteBufferSize(long writeBufferSize) {
+        this.writeBufferSize = writeBufferSize;
+    }
+
+    /**
+     * @return the writeBufferSize
+     */
+    public long getWriteBufferSize() {
+        return writeBufferSize;
+    }
+    /**
+     * @return A Set of configured column families
+     */
+    public Set<String> getColumnFamilies() {
+        return this.columnFamilies.keySet();
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableProvider.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableProvider.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableProvider.java
new file mode 100644
index 0000000..d643a83
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableProvider.java
@@ -0,0 +1,14 @@
+package org.apache.metron.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Created by cstella on 2/11/16.
+ */
+public interface TableProvider extends Serializable {
+    HTableInterface getTable(Configuration config, String tableName) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java
index d2c789a..6bacfb8 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java
@@ -1,5 +1,6 @@
 package org.apache.metron.hbase;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -8,28 +9,27 @@ import java.util.NavigableMap;
 import java.util.Set;
 import java.util.TreeMap;
 
+import com.google.common.base.Joiner;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import backtype.storm.tuple.Tuple;
+import org.apache.log4j.Logger;
 
 /**
  * Configuration for Storm {@link Tuple} to HBase serialization.
  */
 @SuppressWarnings("serial")
-public class TupleTableConfig implements Serializable {
-  
+public class TupleTableConfig extends TableConfig implements Serializable {
+  private static final Logger LOG = Logger.getLogger(TupleTableConfig.class);
+  static final long serialVersionUID = -1L;
   public static final long DEFAULT_INCREMENT = 1L;
   
-  private String tableName;
   protected String tupleRowKeyField;
   protected String tupleTimestampField;
-  protected Map<String, Set<String>> columnFamilies;
-  private boolean batch = true;
   protected Durability durability = Durability.USE_DEFAULT;
-  private long writeBufferSize = 0L;
   private String fields;
 
   /**
@@ -41,7 +41,7 @@ public class TupleTableConfig implements Serializable {
    *          The {@link Tuple} field used to set the rowKey
    */
   public TupleTableConfig(final String table, final String rowKeyField) {
-    this.tableName = table;
+    super(table);
     this.tupleRowKeyField = rowKeyField;
     this.tupleTimestampField = "";
     this.columnFamilies = new HashMap<String, Set<String>>();
@@ -58,20 +58,18 @@ public class TupleTableConfig implements Serializable {
    *          The {@link Tuple} field used to set the timestamp
    */
   public TupleTableConfig(final String table, final String rowKeyField, final String timestampField) {
-    this.tableName = table;
+    super(table);
     this.tupleRowKeyField = rowKeyField;
     this.tupleTimestampField = timestampField;
     this.columnFamilies = new HashMap<String, Set<String>>();
   }
 
   public TupleTableConfig() {
+    super(null);
     this.columnFamilies = new HashMap<String, Set<String>>();
   }
 
-  public TupleTableConfig withTable(String table) {
-    this.tableName = table;
-    return this;
-  }
+
 
   public TupleTableConfig withRowKeyField(String rowKeyField) {
     this.tupleRowKeyField = rowKeyField;
@@ -88,10 +86,7 @@ public class TupleTableConfig implements Serializable {
     return this;
   }
 
-  public TupleTableConfig withBatch(Boolean isBatch) {
-    this.batch = isBatch;
-    return this;
-  }
+
 
   public String getFields() {
     return fields;
@@ -125,8 +120,14 @@ public class TupleTableConfig implements Serializable {
    *          The {@link Tuple}
    * @return {@link Put}
    */
-  public Put getPutFromTuple(final Tuple tuple) {
-    byte[] rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField));
+  public Put getPutFromTuple(final Tuple tuple) throws IOException{
+    byte[] rowKey = null;
+    try {
+      rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField));
+    }
+    catch(IllegalArgumentException iae) {
+      throw new IOException("Unable to retrieve " + tupleRowKeyField + " from " + tuple + " [ " + Joiner.on(',').join(tuple.getFields()) + " ]", iae);
+    }
     
     long ts = 0;
     if (!tupleTimestampField.equals("")) {
@@ -225,37 +226,8 @@ public class TupleTableConfig implements Serializable {
     inc.getFamilyMapOfLongs().put(family, set);
   }
   
-  /**
-   * @return the tableName
-   */
-  public String getTableName() {
-    return tableName;
-  }
-  
-  /**
-   * @return Whether batch mode is enabled
-   */
-  public boolean isBatch() {
-    return batch;
-  }
-  
-  /**
-   * @param batch
-   *          Whether to enable HBase's client-side write buffer.
-   *          <p>
-   *          When enabled your bolt will store put operations locally until the
-   *          write buffer is full, so they can be sent to HBase in a single RPC
-   *          call. When disabled each put operation is effectively an RPC and
-   *          is sent straight to HBase. As your bolt can process thousands of
-   *          values per second it is recommended that the write buffer is
-   *          enabled.
-   *          <p>
-   *          Enabled by default
-   */
-  public void setBatch(boolean batch) {
-    this.batch = batch;
-  }
-  
+
+
   /**
    * @param durability
    *          Sets whether to write to HBase's edit log.
@@ -276,36 +248,8 @@ public class TupleTableConfig implements Serializable {
     return  durability;
   }
   
-  /**
-   * @param writeBufferSize
-   *          Overrides the client-side write buffer size.
-   *          <p>
-   *          By default the write buffer size is 2 MB (2097152 bytes). If you
-   *          are storing larger data, you may want to consider increasing this
-   *          value to allow your bolt to efficiently group together a larger
-   *          number of records per RPC
-   *          <p>
-   *          Overrides the write buffer size you have set in your
-   *          hbase-site.xml e.g. <code>hbase.client.write.buffer</code>
-   */
-  public void setWriteBufferSize(long writeBufferSize) {
-    this.writeBufferSize = writeBufferSize;
-  }
-  
-  /**
-   * @return the writeBufferSize
-   */
-  public long getWriteBufferSize() {
-    return writeBufferSize;
-  }
-  
-  /**
-   * @return A Set of configured column families
-   */
-  public Set<String> getColumnFamilies() {
-    return this.columnFamilies.keySet();
-  }
-  
+
+
   /**
    * @return the tupleRowKeyField
    */

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/Lookup.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/Lookup.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/Lookup.java
new file mode 100644
index 0000000..052c7a6
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/Lookup.java
@@ -0,0 +1,61 @@
+package org.apache.metron.reference.lookup;
+
+import org.apache.metron.reference.lookup.accesstracker.AccessTracker;
+import org.apache.metron.reference.lookup.handler.Handler;
+
+import java.io.IOException;
+
+/**
+ * Created by cstella on 2/5/16.
+ */
+public class Lookup<CONTEXT_T, KEY_T extends LookupKey, RESULT_T> implements Handler<CONTEXT_T, KEY_T, RESULT_T> {
+    private String name;
+    private AccessTracker accessTracker;
+    private Handler<CONTEXT_T, KEY_T, RESULT_T> lookupHandler;
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public AccessTracker getAccessTracker() {
+        return accessTracker;
+    }
+
+    public void setAccessTracker(AccessTracker accessTracker) {
+        this.accessTracker = accessTracker;
+    }
+
+    public Handler< CONTEXT_T, KEY_T, RESULT_T > getLookupHandler() {
+        return lookupHandler;
+    }
+
+    public void setLookupHandler(Handler< CONTEXT_T, KEY_T, RESULT_T > lookupHandler) {
+        this.lookupHandler = lookupHandler;
+    }
+
+    @Override
+    public boolean exists(KEY_T key, CONTEXT_T context, boolean logAccess) throws IOException {
+        if(logAccess) {
+            accessTracker.logAccess(key);
+        }
+        return lookupHandler.exists(key, context, logAccess);
+    }
+
+    @Override
+    public RESULT_T get(KEY_T key, CONTEXT_T context, boolean logAccess) throws IOException {
+        if(logAccess) {
+            accessTracker.logAccess(key);
+        }
+        return lookupHandler.get(key, context, logAccess);
+    }
+
+    @Override
+    public void close() throws Exception {
+        accessTracker.cleanup();
+        lookupHandler.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKey.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKey.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKey.java
new file mode 100644
index 0000000..3a227fd
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKey.java
@@ -0,0 +1,8 @@
+package org.apache.metron.reference.lookup;
+
+/**
+ * Created by cstella on 2/5/16.
+ */
+public interface LookupKey {
+    byte[] toBytes();
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/AccessTracker.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/AccessTracker.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/AccessTracker.java
new file mode 100644
index 0000000..0b0846e
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/AccessTracker.java
@@ -0,0 +1,23 @@
+package org.apache.metron.reference.lookup.accesstracker;
+
+import org.apache.metron.reference.lookup.LookupKey;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/5/16.
+ */
+public interface AccessTracker extends Serializable{
+    void logAccess(LookupKey key);
+    void configure(Map<String, Object> config);
+    boolean hasSeen(LookupKey key);
+    String getName();
+    AccessTracker union(AccessTracker tracker);
+    void reset();
+    boolean isFull();
+    void cleanup() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/AccessTrackerUtil.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/AccessTrackerUtil.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/AccessTrackerUtil.java
new file mode 100644
index 0000000..18df030
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/AccessTrackerUtil.java
@@ -0,0 +1,69 @@
+package org.apache.metron.reference.lookup.accesstracker;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import javax.annotation.Nullable;
+import java.io.*;
+
+/**
+ * Created by cstella on 2/5/16.
+ */
+public enum AccessTrackerUtil {
+    INSTANCE;
+
+    public static byte[] COLUMN = Bytes.toBytes("v");
+
+    public AccessTracker deserializeTracker(byte[] bytes) throws IOException, ClassNotFoundException {
+        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
+        return (AccessTracker) ois.readObject();
+    }
+    public byte[] serializeTracker(AccessTracker tracker) throws IOException {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(bos);
+        oos.writeObject(tracker);
+        oos.flush();
+        oos.close();
+        return bos.toByteArray();
+    }
+
+
+    public void persistTracker(HTableInterface accessTrackerTable, String columnFamily, PersistentAccessTracker.AccessTrackerKey key, AccessTracker underlyingTracker) throws IOException {
+        Put put = new Put(key.toRowKey());
+        put.add(Bytes.toBytes(columnFamily), COLUMN, serializeTracker(underlyingTracker));
+        accessTrackerTable.put(put);
+    }
+
+    public Iterable<AccessTracker> loadAll(HTableInterface accessTrackerTable, final String columnFamily, final String name, final long earliest) throws IOException {
+        Scan scan = new Scan(PersistentAccessTracker.AccessTrackerKey.getTimestampScanKey(name, earliest));
+        ResultScanner scanner = accessTrackerTable.getScanner(scan);
+        return Iterables.transform(scanner, new Function<Result, AccessTracker>() {
+
+            @Nullable
+            @Override
+            public AccessTracker apply(@Nullable Result result) {
+                try {
+                    return deserializeTracker(result.getValue(Bytes.toBytes(columnFamily), COLUMN));
+                } catch (Exception e) {
+                    throw new RuntimeException("Unable to deserialize " + name + " @ " + earliest);
+                }
+            }
+        });
+    }
+
+
+    public AccessTracker loadAll(Iterable<AccessTracker> trackers) throws IOException, ClassNotFoundException {
+        AccessTracker tracker = null;
+        for(AccessTracker t : trackers) {
+            if(tracker == null) {
+                tracker = t;
+            }
+            else {
+                tracker = tracker.union(t);
+            }
+        }
+        return tracker;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/BloomAccessTracker.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/BloomAccessTracker.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/BloomAccessTracker.java
new file mode 100644
index 0000000..d8edef5
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/BloomAccessTracker.java
@@ -0,0 +1,132 @@
+package org.apache.metron.reference.lookup.accesstracker;
+
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnel;
+import com.google.common.hash.PrimitiveSink;
+import org.apache.metron.reference.lookup.LookupKey;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/5/16.
+ */
+public class BloomAccessTracker implements AccessTracker {
+    private static final long serialVersionUID = 1L;
+    public static final String EXPECTED_INSERTIONS_KEY = "expected_insertions";
+    public static final String FALSE_POSITIVE_RATE_KEY = "false_positive_rate";
+    public static final String NAME_KEY = "name";
+
+    private static class LookupKeyFunnel implements Funnel<LookupKey> {
+        @Override
+        public void funnel(LookupKey lookupKey, PrimitiveSink primitiveSink) {
+            primitiveSink.putBytes(lookupKey.toBytes());
+        }
+
+
+        @Override
+        public boolean equals(Object obj) {
+            return this.getClass().equals(obj.getClass());
+        }
+    }
+
+    private static Funnel<LookupKey> LOOKUPKEY_FUNNEL = new LookupKeyFunnel();
+
+    BloomFilter<LookupKey> filter;
+    String name;
+    int expectedInsertions;
+    double falsePositiveRate;
+    int numInsertions = 0;
+
+    public BloomAccessTracker(String name, int expectedInsertions, double falsePositiveRate) {
+        this.name = name;
+        this.expectedInsertions = expectedInsertions;
+        this.falsePositiveRate = falsePositiveRate;
+        filter = BloomFilter.create(LOOKUPKEY_FUNNEL, expectedInsertions, falsePositiveRate);
+    }
+    public BloomAccessTracker() {}
+    public BloomAccessTracker(Map<String, Object> config) {
+        configure(config);
+    }
+
+    protected BloomFilter<LookupKey> getFilter() {
+        return filter;
+    }
+    @Override
+    public void logAccess(LookupKey key) {
+        numInsertions++;
+        filter.put(key);
+    }
+
+    @Override
+    public void configure(Map<String, Object> config) {
+        expectedInsertions = toInt(config.get(EXPECTED_INSERTIONS_KEY));
+        falsePositiveRate = toDouble(config.get(FALSE_POSITIVE_RATE_KEY));
+        name = config.get(NAME_KEY).toString();
+        filter = BloomFilter.create(LOOKUPKEY_FUNNEL, expectedInsertions, falsePositiveRate);
+    }
+
+    @Override
+    public boolean hasSeen(LookupKey key) {
+        return filter.mightContain(key);
+    }
+
+    @Override
+    public void reset() {
+        filter = BloomFilter.create(LOOKUPKEY_FUNNEL, expectedInsertions, falsePositiveRate);
+    }
+
+    private static double toDouble(Object o) {
+        if(o instanceof String) {
+            return Double.parseDouble((String)o);
+        }
+        else if(o instanceof Number) {
+            return ((Number) o).doubleValue();
+        }
+        else {
+            throw new IllegalStateException("Unable to convert " + o + " to a double.");
+        }
+    }
+    private static int toInt(Object o) {
+        if(o instanceof String) {
+            return Integer.parseInt((String)o);
+        }
+        else if(o instanceof Number) {
+            return ((Number) o).intValue();
+        }
+        else {
+            throw new IllegalStateException("Unable to convert " + o + " to a double.");
+        }
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+
+    @Override
+    public AccessTracker union(AccessTracker tracker) {
+        if(filter == null) {
+            throw new IllegalStateException("Unable to union access tracker, because this tracker is not initialized.");
+        }
+        if(tracker instanceof BloomAccessTracker ) {
+            filter.putAll(((BloomAccessTracker)tracker).getFilter());
+            return this;
+        }
+        else {
+            throw new IllegalStateException("Unable to union access tracker, because it's not of the right type (BloomAccessTracker)");
+        }
+    }
+
+    @Override
+    public boolean isFull() {
+        return numInsertions >= expectedInsertions;
+    }
+
+    @Override
+    public void cleanup() throws IOException {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/PersistentAccessTracker.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/PersistentAccessTracker.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/PersistentAccessTracker.java
new file mode 100644
index 0000000..9e239a8
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/PersistentAccessTracker.java
@@ -0,0 +1,192 @@
+package org.apache.metron.reference.lookup.accesstracker;
+
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.log4j.Logger;
+import org.apache.metron.reference.lookup.LookupKey;
+
+import java.io.*;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * Created by cstella on 2/5/16.
+ */
+public class PersistentAccessTracker implements AccessTracker {
+    private static final Logger LOG = Logger.getLogger(PersistentAccessTracker.class);
+    private static final long serialVersionUID = 1L;
+
+    public static class AccessTrackerKey {
+        String name;
+        String containerName;
+        long timestamp;
+        public AccessTrackerKey(String name, String containerName, long timestamp) {
+            this.name = name;
+            this.containerName = containerName;
+            this.timestamp = timestamp;
+        }
+
+        public byte[] toRowKey() {
+            ByteArrayOutputStream os = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream(os);
+            try {
+                dos.writeUTF(name);
+                dos.writeLong(timestamp);
+                dos.writeUTF(containerName);
+                dos.flush();
+            } catch (IOException e) {
+                throw new RuntimeException("Unable to write rowkey: " + this, e);
+            }
+
+            return os.toByteArray();
+        }
+
+        public static byte[] getTimestampScanKey(String name, long timestamp) {
+            ByteArrayOutputStream os = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream(os);
+            try {
+                dos.writeUTF(name);
+                dos.writeLong(timestamp);
+            } catch (IOException e) {
+                throw new RuntimeException("Unable to create scan key " , e);
+            }
+
+            return os.toByteArray();
+        }
+
+        public static AccessTrackerKey fromRowKey(byte[] rowKey) {
+            ByteArrayInputStream is = new ByteArrayInputStream(rowKey);
+            DataInputStream dis = new DataInputStream(is);
+            try {
+                String name = dis.readUTF();
+                long timestamp = dis.readLong();
+                String containerName = dis.readUTF();
+                return new AccessTrackerKey(name, containerName, timestamp);
+            } catch (IOException e) {
+                throw new RuntimeException("Unable to read rowkey: ", e);
+            }
+        }
+    }
+
+    private static class Persister extends TimerTask {
+        PersistentAccessTracker tracker;
+        public Persister(PersistentAccessTracker tracker) {
+            this.tracker = tracker;
+        }
+        /**
+         * The action to be performed by this timer task.
+         */
+        @Override
+        public void run() {
+            tracker.persist(false);
+        }
+    }
+
+    Object sync = new Object();
+    HTableInterface accessTrackerTable;
+    String accessTrackerColumnFamily;
+    AccessTracker underlyingTracker;
+    long timestamp = System.currentTimeMillis();
+    String name;
+    String containerName;
+    private Timer timer;
+    long maxMillisecondsBetweenPersists;
+
+    public PersistentAccessTracker( String name
+                                  , String containerName
+                                  , HTableInterface accessTrackerTable
+                                  , String columnFamily
+                                  , AccessTracker underlyingTracker
+                                  , long maxMillisecondsBetweenPersists
+                                  )
+    {
+        this.containerName = containerName;
+        this.accessTrackerTable = accessTrackerTable;
+        this.name = name;
+        this.accessTrackerColumnFamily = columnFamily;
+        this.underlyingTracker = underlyingTracker;
+        this.maxMillisecondsBetweenPersists = maxMillisecondsBetweenPersists;
+        timer = new Timer();
+        if(maxMillisecondsBetweenPersists > 0) {
+            timer.scheduleAtFixedRate(new Persister(this), maxMillisecondsBetweenPersists, maxMillisecondsBetweenPersists);
+        }
+    }
+
+    public void persist(boolean force) {
+        synchronized(sync) {
+            if(force || (System.currentTimeMillis() - timestamp) >= maxMillisecondsBetweenPersists) {
+                //persist
+                try {
+                    AccessTrackerUtil.INSTANCE.persistTracker(accessTrackerTable, accessTrackerColumnFamily, new AccessTrackerKey(name, containerName, timestamp), underlyingTracker);
+                    timestamp = System.currentTimeMillis();
+                    reset();
+                } catch (IOException e) {
+                    LOG.error("Unable to persist access tracker.", e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void logAccess(LookupKey key) {
+        synchronized (sync) {
+            underlyingTracker.logAccess(key);
+            if (isFull()) {
+                persist(true);
+            }
+        }
+    }
+
+    @Override
+    public void configure(Map<String, Object> config) {
+        underlyingTracker.configure(config);
+    }
+
+    @Override
+    public boolean hasSeen(LookupKey key) {
+        synchronized(sync) {
+            return underlyingTracker.hasSeen(key);
+        }
+    }
+
+    @Override
+    public String getName() {
+        return underlyingTracker.getName();
+    }
+
+    @Override
+    public AccessTracker union(AccessTracker tracker) {
+        PersistentAccessTracker t1 = (PersistentAccessTracker)tracker;
+        underlyingTracker = underlyingTracker.union(t1.underlyingTracker);
+        return this;
+    }
+
+    @Override
+    public void reset() {
+        synchronized(sync) {
+            underlyingTracker.reset();
+        }
+    }
+
+    @Override
+    public boolean isFull() {
+        synchronized (sync) {
+            return underlyingTracker.isFull();
+        }
+    }
+
+    @Override
+    public void cleanup() throws IOException {
+        synchronized(sync) {
+            try {
+                persist(true);
+            }
+            catch(Throwable t) {
+                LOG.error("Unable to persist underlying tracker", t);
+            }
+            underlyingTracker.cleanup();
+            accessTrackerTable.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/handler/Handler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/handler/Handler.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/handler/Handler.java
new file mode 100644
index 0000000..096e206
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/handler/Handler.java
@@ -0,0 +1,13 @@
+package org.apache.metron.reference.lookup.handler;
+
+import org.apache.metron.reference.lookup.LookupKey;
+
+import java.io.IOException;
+
+/**
+ * Created by cstella on 2/5/16.
+ */
+public interface Handler<CONTEXT_T, KEY_T extends LookupKey, RESULT_T> extends AutoCloseable{
+    boolean exists(KEY_T key, CONTEXT_T context, boolean logAccess) throws IOException;
+    RESULT_T get(KEY_T key, CONTEXT_T context, boolean logAccess) throws IOException;
+}