You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/02/10 21:17:42 UTC

[4/4] incubator-metron git commit: METRON-33 Execute Enrichments in Parallel (merrimanr via cestella) closes apache/incubator-metron#19

METRON-33 Execute Enrichments in Parallel (merrimanr via cestella) closes apache/incubator-metron#19


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/905b09f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/905b09f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/905b09f6

Branch: refs/heads/master
Commit: 905b09f629a936fedf1fb5468d5d3ce357bcd9d2
Parents: 973b1bb
Author: merrimanr <me...@gmail.com>
Authored: Wed Feb 10 15:17:11 2016 -0500
Committer: cstella <ce...@gmail.com>
Committed: Wed Feb 10 15:17:11 2016 -0500

----------------------------------------------------------------------
 .../java/org/apache/metron/bolt/JoinBolt.java   | 105 ++++++
 .../java/org/apache/metron/bolt/SplitBolt.java  |  83 +++++
 .../org/apache/metron/domain/Enrichment.java    |  37 ++
 .../interfaces/EnrichmentAdapter.java           |   5 +-
 .../metron/parser/interfaces/MessageFilter.java |   7 +-
 .../metron/parser/interfaces/MessageParser.java |  10 +-
 .../java/org/apache/metron/pcap/PcapParser.java | 183 ----------
 .../adapters/cif/AbstractCIFAdapter.java        |   4 +
 .../adapters/cif/CIFHbaseAdapter.java           |  14 +-
 .../adapters/geo/AbstractGeoAdapter.java        |  61 ----
 .../enrichment/adapters/geo/GeoAdapter.java     |  54 +++
 .../adapters/geo/GeoMysqlAdapter.java           | 186 ----------
 .../adapters/host/AbstractHostAdapter.java      |   7 +-
 .../adapters/host/HostFromJSONListAdapter.java  |  50 +++
 .../adapters/jdbc/BaseJdbcConfig.java           |  53 +++
 .../enrichment/adapters/jdbc/JdbcAdapter.java   |  61 ++++
 .../enrichment/adapters/jdbc/JdbcConfig.java    |   9 +
 .../enrichment/adapters/jdbc/MySqlConfig.java   |  22 ++
 .../adapters/threat/AbstractThreatAdapter.java  |   4 +
 .../adapters/threat/ThreatHbaseAdapter.java     |  12 +-
 .../adapters/whois/AbstractWhoisAdapter.java    |  40 ---
 .../adapters/whois/WhoisHBaseAdapter.java       |  21 +-
 .../enrichment/bolt/EnrichmentJoinBolt.java     |  57 +++
 .../enrichment/bolt/GenericEnrichmentBolt.java  | 186 ++++++++++
 .../common/AbstractEnrichmentBolt.java          | 128 -------
 .../common/GenericEnrichmentBolt.java           | 279 ---------------
 .../adapters/geo/GeoMysqlAdapterTest.java       |  30 +-
 metron-streaming/Metron-MessageParsers/pom.xml  |   5 +
 .../org/apache/metron/bolt/PcapParserBolt.java  |  27 ++
 .../apache/metron/bolt/TelemetryParserBolt.java | 137 +++++++
 .../apache/metron/filters/BroMessageFilter.java |  19 +-
 .../metron/filters/GenericMessageFilter.java    |  10 +-
 .../metron/parsing/AbstractParserBolt.java      | 144 --------
 .../apache/metron/parsing/PcapParserBolt.java   | 227 ------------
 .../metron/parsing/TelemetryParserBolt.java     | 222 ------------
 .../metron/parsing/parsers/AbstractParser.java  |  48 ---
 .../metron/parsing/parsers/BasicBroParser.java  |  23 +-
 .../parsing/parsers/BasicFireEyeParser.java     |  86 ++---
 .../metron/parsing/parsers/BasicIseParser.java  |  41 ++-
 .../parsing/parsers/BasicLancopeParser.java     |  32 +-
 .../parsing/parsers/BasicLogstashParser.java    |  26 +-
 .../parsers/BasicPaloAltoFirewallParser.java    | 355 ++++++++++---------
 .../metron/parsing/parsers/BasicParser.java     |  54 +++
 .../parsing/parsers/BasicSnortParser.java       |  36 +-
 .../parsing/parsers/BasicSourcefireParser.java  |  32 +-
 .../metron/parsing/parsers/BasicYafParser.java  |  18 +-
 .../metron/parsing/parsers/GrokAsaParser.java   |  41 +--
 .../parsing/parsers/GrokSourcefireParser.java   |  25 +-
 .../metron/parsing/parsers/GrokUtils.java       |  26 --
 .../metron/parsing/parsers/JSONCleaner.java     |   4 +-
 .../metron/parsing/parsers/MetronGrok.java      |   1 +
 .../metron/parsing/parsers/ParserUtils.java     |  23 --
 .../metron/parsing/parsers/PcapParser.java      |  89 +++--
 .../apache/metron/parsing/utils/GrokUtils.java  |  26 ++
 .../metron/parsing/utils/ParserUtils.java       |  55 +++
 .../metron/parsing/test/BasicBroParserTest.java | 128 +++----
 .../parsing/test/BasicFireEyeParserTest.java    |  10 +-
 .../metron/parsing/test/BasicIseParserTest.java |   4 +-
 .../parsing/test/BasicLancopeParserTest.java    |   2 +-
 .../test/BasicPaloAltoFirewallParserTest.java   |   5 +-
 .../parsing/test/BasicSourcefireParserTest.java |   4 +-
 .../metron/parsing/test/BroParserTest.java      |   2 +-
 .../metron/parsing/test/GrokAsaParserTest.java  |   4 +-
 .../test/spouts/GenericInternalTestSpout.java   |   8 +-
 .../etc/whitelists/known_hosts.conf             |   3 +-
 .../Metron_Configs/topologies/asa/local.yaml    | 187 +++++-----
 .../Metron_Configs/topologies/asa/remote.yaml   | 191 +++++-----
 .../Metron_Configs/topologies/bro/local.yaml    |  11 +-
 .../Metron_Configs/topologies/bro/remote.yaml   |  11 +-
 .../topologies/fireeye/local.yaml               | 187 +++++-----
 .../topologies/fireeye/remote.yaml              | 191 +++++-----
 .../Metron_Configs/topologies/ise/local.yaml    |  11 +-
 .../Metron_Configs/topologies/ise/remote.yaml   |  11 +-
 .../topologies/lancope/local.yaml               | 163 ++++++---
 .../topologies/lancope/remote.yaml              | 167 ++++++---
 .../topologies/paloalto/local.yaml              |  11 +-
 .../topologies/paloalto/remote.yaml             |  11 +-
 .../Metron_Configs/topologies/pcap/local.yaml   | 121 ++++++-
 .../Metron_Configs/topologies/pcap/remote.yaml  | 136 ++++++-
 .../Metron_Configs/topologies/snort/local.yaml  |  13 +-
 .../Metron_Configs/topologies/snort/remote.yaml |  11 +-
 .../topologies/sourcefire/local.yaml            | 187 +++++-----
 .../topologies/sourcefire/remote.yaml           | 191 +++++-----
 .../Metron_Configs/topologies/yaf/local.yaml    |  11 +-
 .../Metron_Configs/topologies/yaf/remote.yaml   |  11 +-
 .../integration/pcap/PcapIntegrationTest.java   |  57 +--
 .../util/integration/ComponentRunner.java       |   2 +-
 .../integration/util/mock/MockGeoAdapter.java   |  31 ++
 88 files changed, 2843 insertions(+), 2790 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/JoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/JoinBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/JoinBolt.java
new file mode 100644
index 0000000..aec3fd0
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/JoinBolt.java
@@ -0,0 +1,105 @@
+package org.apache.metron.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public abstract class JoinBolt<V> extends BaseRichBolt {
+
+  private static final Logger LOG = LoggerFactory
+          .getLogger(JoinBolt.class);
+  protected OutputCollector collector;
+  protected ImmutableSet<String> streamIds;
+
+  protected transient CacheLoader<String, Map<String, V>> loader;
+  protected transient LoadingCache<String, Map<String, V>> cache;
+  protected Long maxCacheSize;
+  protected Long maxTimeRetain;
+
+  public JoinBolt withMaxCacheSize(long maxCacheSize) {
+    this.maxCacheSize = maxCacheSize;
+    return this;
+  }
+
+  public JoinBolt withMaxTimeRetain(long maxTimeRetain) {
+    this.maxTimeRetain = maxTimeRetain;
+    return this;
+  }
+
+  @Override
+  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+    this.collector = outputCollector;
+    if (this.maxCacheSize == null)
+      throw new IllegalStateException("maxCacheSize must be specified");
+    if (this.maxTimeRetain == null)
+      throw new IllegalStateException("maxTimeRetain must be specified");
+    loader = new CacheLoader<String, Map<String, V>>() {
+      public Map<String, V> load(String key) throws Exception {
+        return new HashMap<>();
+      }
+    };
+    cache = CacheBuilder.newBuilder().maximumSize(maxCacheSize)
+            .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES)
+            .build(loader);
+    Set<String> temp = getStreamIds();
+    temp.add("message");
+    streamIds = ImmutableSet.copyOf(temp);
+    prepare(map, topologyContext);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void execute(Tuple tuple) {
+    String streamId = tuple.getSourceStreamId();
+    String key = (String) tuple.getValueByField("key");
+    V value = (V) tuple.getValueByField("message");
+    try {
+      Map<String, V> streamValueMap = cache.get(key);
+      if (streamValueMap.containsKey(streamId)) {
+        LOG.warn(String.format("Received key %s twice for " +
+                "stream %s", key, streamId));
+      }
+      streamValueMap.put(streamId, value);
+      Set<String> streamValueKeys = streamValueMap.keySet();
+      if (streamValueKeys.size() == streamIds.size() && Sets.symmetricDifference
+              (streamValueKeys, streamIds)
+              .isEmpty()) {
+        collector.emit("message", tuple, new Values(key, joinValues
+                (streamValueMap)));
+        collector.ack(tuple);
+        cache.invalidate(key);
+      } else {
+        cache.put(key, streamValueMap);
+      }
+    } catch (ExecutionException e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declareStream("message", new Fields("key", "message"));
+  }
+
+  public abstract void prepare(Map map, TopologyContext topologyContext);
+
+  public abstract Set<String> getStreamIds();
+
+  public abstract V joinValues(Map<String, V> streamValueMap);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java
new file mode 100644
index 0000000..ad4edb7
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java
@@ -0,0 +1,83 @@
+package org.apache.metron.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import com.google.common.collect.ImmutableSet;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+public abstract class SplitBolt<T> extends
+        BaseRichBolt {
+
+  protected OutputCollector collector;
+  private Set<String> streamIds;
+
+  @Override
+  public final void prepare(Map map, TopologyContext topologyContext,
+                       OutputCollector outputCollector) {
+    collector = outputCollector;
+    streamIds = ImmutableSet.copyOf(getStreamIds());
+    prepare(map, topologyContext);
+  }
+
+  @Override
+  public final void execute(Tuple tuple) {
+    emit(tuple, generateMessages(tuple));
+  }
+
+  @Override
+  public final void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declareStream("message", new Fields("key", "message"));
+    for (String streamId : getStreamIds()) {
+      declarer.declareStream(streamId, new Fields("key", "message"));
+    }
+    declarer.declareStream("error", new Fields("message"));
+    declareOther(declarer);
+  }
+
+  public void emit(Tuple tuple, List<T> messages) {
+    for(T message: messages) {
+      String key = getKey(tuple, message);
+      collector.emit("message", tuple, new Values(key, message));
+      Map<String, T> streamValueMap = splitMessage(message);
+      for (String streamId : streamIds) {
+        T streamValue = streamValueMap.get(streamId);
+        if (streamValue == null) {
+          streamValue = getDefaultValue(streamId);
+        }
+        collector.emit(streamId, new Values(key, streamValue));
+      }
+      collector.ack(tuple);
+    }
+    emitOther(tuple, messages);
+  }
+
+  protected T getDefaultValue(String streamId) {
+    throw new IllegalArgumentException("Could not find a message for" +
+            " stream: " + streamId);
+  }
+
+  public abstract void prepare(Map map, TopologyContext topologyContext);
+
+  public abstract Set<String> getStreamIds();
+
+  public abstract String getKey(Tuple tuple, T message);
+
+  public abstract List<T> generateMessages(Tuple tuple);
+
+  public abstract Map<String, T> splitMessage(T message);
+
+  public abstract void declareOther(OutputFieldsDeclarer declarer);
+
+  public abstract void emitOther(Tuple tuple, List<T> messages);
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java
new file mode 100644
index 0000000..8567357
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java
@@ -0,0 +1,37 @@
+package org.apache.metron.domain;
+
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class Enrichment<T extends EnrichmentAdapter> implements Serializable {
+
+  private String name;
+  private List<String> fields;
+  private T adapter;
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public List<String> getFields() {
+    return fields;
+  }
+
+  public void setFields(List<String> fields) {
+    this.fields = fields;
+  }
+
+  public T getAdapter() {
+    return adapter;
+  }
+
+  public void setAdapter(T adapter) {
+    this.adapter = adapter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
index c0eb0e0..8eb5937 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
@@ -19,8 +19,9 @@ package org.apache.metron.enrichment.interfaces;
 
 import org.json.simple.JSONObject;
 
-public interface EnrichmentAdapter
+public interface EnrichmentAdapter<T>
 {
-	JSONObject enrich(String metadata);
+	JSONObject enrich(T value);
 	boolean initializeAdapter();
+	void cleanup();
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Common/src/main/java/org/apache/metron/parser/interfaces/MessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/parser/interfaces/MessageFilter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/parser/interfaces/MessageFilter.java
index 88a3543..a3c0089 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/parser/interfaces/MessageFilter.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/parser/interfaces/MessageFilter.java
@@ -1,10 +1,7 @@
 package org.apache.metron.parser.interfaces;
 
-import org.json.simple.JSONObject;
+public interface MessageFilter<T> {
 
-
-public interface MessageFilter {
-
-	public boolean emitTuple(JSONObject message);
+	boolean emitTuple(T message);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Common/src/main/java/org/apache/metron/parser/interfaces/MessageParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/parser/interfaces/MessageParser.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/parser/interfaces/MessageParser.java
index f09484a..a4fd2d8 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/parser/interfaces/MessageParser.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/parser/interfaces/MessageParser.java
@@ -1,11 +1,11 @@
 package org.apache.metron.parser.interfaces;
 
-import org.json.simple.JSONObject;
+import java.util.List;
+
+public interface MessageParser<T> {
 
-public interface MessageParser {
-	
-	void initializeParser();
 	void init();
-	JSONObject parse(byte[] raw_message);
+	List<T> parse(byte[] rawMessage);
+	boolean validate(T message);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Common/src/main/java/org/apache/metron/pcap/PcapParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/pcap/PcapParser.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/pcap/PcapParser.java
deleted file mode 100644
index 18874c2..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/pcap/PcapParser.java
+++ /dev/null
@@ -1,183 +0,0 @@
-package org.apache.metron.pcap;
-
-import java.io.EOFException;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.log4j.Logger;
-import org.krakenapps.pcap.decoder.ethernet.EthernetDecoder;
-import org.krakenapps.pcap.decoder.ethernet.EthernetType;
-import org.krakenapps.pcap.decoder.ip.IpDecoder;
-import org.krakenapps.pcap.decoder.ip.Ipv4Packet;
-import org.krakenapps.pcap.decoder.tcp.TcpPacket;
-import org.krakenapps.pcap.decoder.udp.UdpPacket;
-import org.krakenapps.pcap.file.GlobalHeader;
-import org.krakenapps.pcap.packet.PacketHeader;
-import org.krakenapps.pcap.packet.PcapPacket;
-import org.krakenapps.pcap.util.Buffer;
-
-// TODO: Auto-generated Javadoc
-/**
- * The Class PcapParser.
- * 
- * @author sheetal
- * @version $Revision: 1.0 $
- */
-public final class PcapParser {
-
-  /** The Constant LOG. */
-  private static final Logger LOG = Logger.getLogger(PcapParser.class);
-
-  /** The ETHERNET_DECODER. */
-  private static final EthernetDecoder ETHERNET_DECODER = new EthernetDecoder();
-
-  /** The ip decoder. */
-  private static final IpDecoder IP_DECODER = new IpDecoder();
-
-  // /** The tcp decoder. */
-  // private static final TcpDecoder TCP_DECODER = new TcpDecoder(new
-  // TcpPortProtocolMapper());
-  //
-  // /** The udp decoder. */
-  // private static final UdpDecoder UDP_DECODER = new UdpDecoder(new
-  // UdpPortProtocolMapper());
-
-  static {
-    // IP_DECODER.register(InternetProtocol.TCP, TCP_DECODER);
-    // IP_DECODER.register(InternetProtocol.UDP, UDP_DECODER);
-    ETHERNET_DECODER.register(EthernetType.IPV4, IP_DECODER);
-  }
-
-  /**
-   * Instantiates a new pcap parser.
-   */
-  private PcapParser() { // $codepro.audit.disable emptyMethod
-
-  }
-
-  /**
-   * Parses the.
-   * 
-   * @param tcpdump
-   *          the tcpdump
-   * @return the list * @throws IOException Signals that an I/O exception has
-   *         occurred. * @throws IOException * @throws IOException * @throws
-   *         IOException
-   * @throws IOException
-   *           Signals that an I/O exception has occurred.
-   */
-  public static List<PacketInfo> parse(byte[] tcpdump) throws IOException {
-    List<PacketInfo> packetInfoList = new ArrayList<PacketInfo>();
-
-    PcapByteInputStream pcapByteInputStream = new PcapByteInputStream(tcpdump);
-
-    GlobalHeader globalHeader = pcapByteInputStream.getGlobalHeader();
-    while (true) {
-      try
-
-      {
-        PcapPacket packet = pcapByteInputStream.getPacket();
-        // int packetCounter = 0;
-        // PacketHeader packetHeader = null;
-        // Ipv4Packet ipv4Packet = null;
-        TcpPacket tcpPacket = null;
-        UdpPacket udpPacket = null;
-        // Buffer packetDataBuffer = null;
-        int sourcePort = 0;
-        int destinationPort = 0;
-
-        // LOG.trace("Got packet # " + ++packetCounter);
-
-        // LOG.trace(packet.getPacketData());
-        ETHERNET_DECODER.decode(packet);
-
-        PacketHeader packetHeader = packet.getPacketHeader();
-        Ipv4Packet ipv4Packet = Ipv4Packet.parse(packet.getPacketData());
-
-        if (ipv4Packet.getProtocol() == Constants.PROTOCOL_TCP) {
-          tcpPacket = TcpPacket.parse(ipv4Packet);
-
-        }
-
-        if (ipv4Packet.getProtocol() == Constants.PROTOCOL_UDP) {
-
-          Buffer packetDataBuffer = ipv4Packet.getData();
-          sourcePort = packetDataBuffer.getUnsignedShort();
-          destinationPort = packetDataBuffer.getUnsignedShort();
-
-          udpPacket = new UdpPacket(ipv4Packet, sourcePort, destinationPort);
-
-          udpPacket.setLength(packetDataBuffer.getUnsignedShort());
-          udpPacket.setChecksum(packetDataBuffer.getUnsignedShort());
-          packetDataBuffer.discardReadBytes();
-          udpPacket.setData(packetDataBuffer);
-        }
-
-        packetInfoList.add(new PacketInfo(globalHeader, packetHeader, packet,
-            ipv4Packet, tcpPacket, udpPacket));
-      } catch (NegativeArraySizeException ignored) {
-        LOG.debug("Ignorable exception while parsing packet.", ignored);
-      } catch (EOFException eof) { // $codepro.audit.disable logExceptions
-        // Ignore exception and break
-        break;
-      }
-    }
-    return packetInfoList;
-  }
-
-  /**
-   * The main method.
-   * 
-   * @param args
-   *          the arguments
-   * @throws IOException
-   *           Signals that an I/O exception has occurred.
-   * @throws InterruptedException
-   *           the interrupted exception
-   */
-  public static void main(String[] args) throws IOException,
-      InterruptedException {
-
-    double totalIterations = 1000000;
-    double parallelism = 64;
-    double targetEvents = 1000000;
-
-    File fin = new File("/Users/sheetal/Downloads/udp.pcap");
-    File fout = new File(fin.getAbsolutePath() + ".parsed");
-    byte[] pcapBytes = FileUtils.readFileToByteArray(fin);
-    long startTime = System.currentTimeMillis();
-    for (int i = 0; i < totalIterations; i++) {
-      List<PacketInfo> list = parse(pcapBytes);
-
-      for (PacketInfo packetInfo : list) {
-        // FileUtils.writeStringToFile(fout, packetInfo.getJsonDoc(), true);
-        // FileUtils.writeStringToFile(fout, "\n", true);
-        // System.out.println(packetInfo.getJsonDoc());
-      }
-    }
-    long endTime = System.currentTimeMillis();
-
-    System.out.println("Time taken to process " + totalIterations + " events :"
-        + (endTime - startTime) + " milliseconds");
-
-    System.out
-        .println("With parallelism of "
-            + parallelism
-            + " estimated time to process "
-            + targetEvents
-            + " events: "
-            + (((((endTime - startTime) / totalIterations) * targetEvents) / parallelism) / 1000)
-            + " seconds");
-    System.out.println("With parallelism of " + parallelism
-        + " estimated # of events per second: "
-        + ((parallelism * 1000 * totalIterations) / (endTime - startTime))
-        + " events");
-    System.out.println("Expected Parallelism to process " + targetEvents
-        + " events in a second: "
-        + (targetEvents / ((1000 * totalIterations) / (endTime - startTime))));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/AbstractCIFAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/AbstractCIFAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/AbstractCIFAdapter.java
index 1eff2eb..57c60e5 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/AbstractCIFAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/AbstractCIFAdapter.java
@@ -38,4 +38,8 @@ public abstract class AbstractCIFAdapter implements EnrichmentAdapter,Serializab
 	abstract public String enrichByDomain(String metadata);
 	abstract public String enrichByEmail(String metadata);
 
+	@Override
+	public void cleanup() {
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
index 33b0350..bfe9ef6 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
@@ -18,9 +18,11 @@
 package org.apache.metron.enrichment.adapters.cif;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
 import org.json.simple.JSONObject;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -33,7 +35,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.log4j.Logger;
 
 @SuppressWarnings("unchecked")
-public class CIFHbaseAdapter extends AbstractCIFAdapter {
+public class CIFHbaseAdapter implements EnrichmentAdapter<String>,Serializable {
 
 	private static final long serialVersionUID = 1L;
 	private String _tableName;
@@ -107,19 +109,23 @@ public class CIFHbaseAdapter extends AbstractCIFAdapter {
 		return false;
 	}
 
-	@Override
+
 	public String enrichByIP(String metadata) {
 		return null;
 	}
 
-	@Override
+
 	public String enrichByDomain(String metadata) {
 		return null;
 	}
 
-	@Override
+
 	public String enrichByEmail(String metadata) {
 		return null;
 	}
 
+	@Override
+	public void cleanup() {
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/AbstractGeoAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/AbstractGeoAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/AbstractGeoAdapter.java
deleted file mode 100644
index e0ca1cd..0000000
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/AbstractGeoAdapter.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.enrichment.adapters.geo;
-
-import java.io.Serializable;
-import java.net.InetAddress;
-
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.metron.enrichment.common.GenericEnrichmentBolt;
-import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-
-@SuppressWarnings("serial")
-public abstract class AbstractGeoAdapter implements EnrichmentAdapter,
-		Serializable {
-
-	protected static final Logger _LOG = LoggerFactory
-			.getLogger(GenericEnrichmentBolt.class);
-
-	abstract public JSONObject enrich(String metadata);
-
-	abstract public boolean initializeAdapter();
-
-	/**
-	 * Check if we can reach the IP where geo data is storred
-	 * 
-	 * @param ip
-	 *            - ip of geo database
-	 * @param timeout
-	 *            - timeout for a connection attempt
-	 * @return - True if can connect, False if cannot
-	 * @throws Exception
-	 */
-	public boolean checkIfReachable(String ip, int timeout) throws Exception {
-		boolean reachable = InetAddress.getByName(ip).isReachable(timeout);
-
-		if (!reachable)
-			return false;
-
-		return true;
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
new file mode 100644
index 0000000..89721a7
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
@@ -0,0 +1,54 @@
+package org.apache.metron.enrichment.adapters.geo;
+
+import org.apache.commons.validator.routines.InetAddressValidator;
+import org.apache.metron.enrichment.adapters.jdbc.JdbcAdapter;
+import org.json.simple.JSONObject;
+
+import java.net.InetAddress;
+import java.sql.ResultSet;
+
+public class GeoAdapter extends JdbcAdapter {
+
+  private InetAddressValidator ipvalidator = new InetAddressValidator();
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public JSONObject enrich(String value) {
+    JSONObject enriched = new JSONObject();
+    try {
+      InetAddress addr = InetAddress.getByName(value);
+      if (addr.isAnyLocalAddress() || addr.isLoopbackAddress()
+              || addr.isSiteLocalAddress() || addr.isMulticastAddress()
+              || !ipvalidator.isValidInet4Address(value)) {
+        return new JSONObject();
+      }
+      String locidQuery = "select IPTOLOCID(\"" + value
+              + "\") as ANS";
+      ResultSet resultSet = statement.executeQuery(locidQuery);
+      String locid = null;
+      if (resultSet.next()) {
+        locid = resultSet.getString("ANS");
+      }
+      resultSet.close();
+      if (locid == null) return new JSONObject();
+      String geoQuery = "select * from location where locID = " + locid;
+      resultSet = statement.executeQuery(geoQuery);
+      if (resultSet.next()) {
+        enriched.put("locID", resultSet.getString("locID"));
+        enriched.put("country", resultSet.getString("country"));
+        enriched.put("city", resultSet.getString("city"));
+        enriched.put("postalCode", resultSet.getString("postalCode"));
+        enriched.put("latitude", resultSet.getString("latitude"));
+        enriched.put("longitude", resultSet.getString("longitude"));
+        enriched.put("dmaCode", resultSet.getString("dmaCode"));
+        enriched.put("location_point", enriched.get("longitude") + "," + enriched.get("latitude"));
+      }
+      resultSet.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+      _LOG.error("Enrichment failure: " + e);
+      return new JSONObject();
+    }
+    return enriched;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapter.java
deleted file mode 100644
index 91f2fac..0000000
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapter.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.enrichment.adapters.geo;
-
-import java.net.InetAddress;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
-
-import org.apache.commons.validator.routines.InetAddressValidator;
-import org.json.simple.JSONObject;
-
-@SuppressWarnings("serial")
-public class GeoMysqlAdapter extends AbstractGeoAdapter {
-
-	private Connection connection = null;
-	private Statement statement = null;
-	private String _ip;
-	private String _username;
-	private String _password;
-	private String _tablename;
-	private InetAddressValidator ipvalidator = new InetAddressValidator();
-
-	public GeoMysqlAdapter(String ip, int port, String username,
-			String password, String tablename) {
-		try {
-			_ip = InetAddress.getByName(ip).getHostAddress();
-
-			boolean reachable = checkIfReachable(ip, 500);
-
-			if (!reachable)
-				throw new Exception("Unable to reach IP " + _ip
-						+ " with username " + _username + " and password "
-						+ _password + " accessing table name " + _tablename);
-
-		} catch (Exception e) {
-			_LOG.error("Environment misconfigured, cannot reach MYSQL server....");
-			e.printStackTrace();
-		}
-
-		_username = username;
-		_password = password;
-		_tablename = tablename;
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public JSONObject enrich(String metadata) {
-
-		ResultSet resultSet = null;
-
-		try {
-
-			_LOG.trace("[Metron] Received metadata: " + metadata);
-
-			InetAddress addr = InetAddress.getByName(metadata);
-
-			if (addr.isAnyLocalAddress() || addr.isLoopbackAddress()
-					|| addr.isSiteLocalAddress() || addr.isMulticastAddress()
-					|| !ipvalidator.isValidInet4Address(metadata)) {
-				_LOG.trace("[Metron] Not a remote IP: " + metadata);
-				_LOG.trace("[Metron] Returning enrichment: " + "{}");
-
-				return new JSONObject();
-			}
-
-			_LOG.trace("[Metron] Is a valid remote IP: " + metadata);
-
-			statement = connection.createStatement(
-					ResultSet.TYPE_SCROLL_INSENSITIVE,
-					ResultSet.CONCUR_READ_ONLY);
-			String locid_query = "select IPTOLOCID(\"" + metadata
-					+ "\") as ANS";
-			resultSet = statement.executeQuery(locid_query);
-
-			if (resultSet == null)
-				throw new Exception("Invalid result set for metadata: "
-						+ metadata + ". Query run was: " + locid_query);
-
-			resultSet.last();
-			int size = resultSet.getRow();
-
-			if (size == 0)
-				throw new Exception("No result returned for: " + metadata
-						+ ". Query run was: " + locid_query);
-
-			resultSet.beforeFirst();
-			resultSet.next();
-
-			String locid = null;
-			locid = resultSet.getString("ANS");
-
-			if (locid == null)
-				throw new Exception("Invalid location id for: " + metadata
-						+ ". Query run was: " + locid_query);
-
-			String geo_query = "select * from location where locID = " + locid
-					+ ";";
-			resultSet = statement.executeQuery(geo_query);
-
-			if (resultSet == null)
-				throw new Exception(
-						"Invalid result set for metadata and locid: "
-								+ metadata + ", " + locid + ". Query run was: "
-								+ geo_query);
-
-			resultSet.last();
-			size = resultSet.getRow();
-
-			if (size == 0)
-				throw new Exception(
-						"No result id returned for metadata and locid: "
-								+ metadata + ", " + locid + ". Query run was: "
-								+ geo_query);
-
-			resultSet.beforeFirst();
-			resultSet.next();
-
-			JSONObject jo = new JSONObject();
-			jo.put("locID", resultSet.getString("locID"));
-			jo.put("country", resultSet.getString("country"));
-			jo.put("city", resultSet.getString("city"));
-			jo.put("postalCode", resultSet.getString("postalCode"));
-			jo.put("latitude", resultSet.getString("latitude"));
-			jo.put("longitude", resultSet.getString("longitude"));
-			jo.put("dmaCode", resultSet.getString("dmaCode"));
-			jo.put("locID", resultSet.getString("locID"));
-			
-			jo.put("location_point", jo.get("longitude") + "," + jo.get("latitude"));
-
-			_LOG.debug("Returning enrichment: " + jo);
-
-			return jo;
-
-		} catch (Exception e) {
-			e.printStackTrace();
-			_LOG.error("Enrichment failure: " + e);
-			return new JSONObject();
-		}
-	}
-
-	@Override
-	public boolean initializeAdapter() {
-
-		_LOG.info("[Metron] Initializing MysqlAdapter....");
-
-		try {
-
-			Class.forName("com.mysql.jdbc.Driver");
-			connection = DriverManager.getConnection("jdbc:mysql://" + _ip
-					+ "/" + _tablename + "?user=" + _username + "&password="
-					+ _password);
-
-			connection.setReadOnly(true);
-
-			if (!connection.isValid(0))
-				throw new Exception("Invalid connection string....");
-
-			_LOG.info("[Metron] Set JDBC connection....");
-
-			return true;
-		} catch (Exception e) {
-			e.printStackTrace();
-			_LOG.error("[Metron] JDBC connection failed....");
-
-			return false;
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/AbstractHostAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/AbstractHostAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/AbstractHostAdapter.java
index f8f7257..076594d 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/AbstractHostAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/AbstractHostAdapter.java
@@ -25,7 +25,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
 
-public abstract class AbstractHostAdapter implements EnrichmentAdapter,Serializable{
+public abstract class AbstractHostAdapter implements EnrichmentAdapter<String>,
+				Serializable{
 
 	/**
 	 * Adapter to attach reputation information to the telemetry message
@@ -37,4 +38,8 @@ public abstract class AbstractHostAdapter implements EnrichmentAdapter,Serializa
 	abstract public boolean initializeAdapter();
 	abstract public JSONObject enrich(String metadata);
 
+	@Override
+	public void cleanup() {
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java
new file mode 100644
index 0000000..260d878
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java
@@ -0,0 +1,50 @@
+package org.apache.metron.enrichment.adapters.host;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Created by rmerriman on 2/2/16.
+ */
+public class HostFromJSONListAdapter extends AbstractHostAdapter {
+
+  Map<String, JSONObject> _known_hosts = new HashMap<>();
+
+  public HostFromJSONListAdapter(String jsonList) {
+    JSONArray jsonArray = (JSONArray) JSONValue.parse(jsonList);
+    Iterator jsonArrayIterator = jsonArray.iterator();
+    while(jsonArrayIterator.hasNext()) {
+      JSONObject jsonObject = (JSONObject) jsonArrayIterator.next();
+      String host = (String) jsonObject.remove("ip");
+      _known_hosts.put(host, jsonObject);
+    }
+  }
+
+  @Override
+  public boolean initializeAdapter()
+  {
+
+    if(_known_hosts.size() > 0)
+      return true;
+    else
+      return false;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public JSONObject enrich(String metadata) {
+
+
+    if(!_known_hosts.containsKey(metadata))
+      return new JSONObject();
+
+    JSONObject enrichment = new JSONObject();
+    enrichment.put("known_info", _known_hosts.get(metadata));
+    return enrichment;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/BaseJdbcConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/BaseJdbcConfig.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/BaseJdbcConfig.java
new file mode 100644
index 0000000..f5ba630
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/BaseJdbcConfig.java
@@ -0,0 +1,53 @@
+package org.apache.metron.enrichment.adapters.jdbc;
+
+import java.io.Serializable;
+
+public abstract class BaseJdbcConfig implements JdbcConfig, Serializable {
+
+  protected String host;
+  protected int port = -1;
+  protected String username;
+  protected String password;
+  protected String table = "";
+
+  @Override
+  public String getHost() {
+    return host;
+  }
+
+  public void setHost(String host) {
+    this.host = host;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public void setPort(int port) {
+    this.port = port;
+  }
+
+  public String getUsername() {
+    return username;
+  }
+
+  public void setUsername(String username) {
+    this.username = username;
+  }
+
+  public String getPassword() {
+    return password;
+  }
+
+  public void setPassword(String password) {
+    this.password = password;
+  }
+
+  public String getTable() {
+    return table;
+  }
+
+  public void setTable(String table) {
+    this.table = table;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
new file mode 100644
index 0000000..ca4f8ea
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
@@ -0,0 +1,61 @@
+package org.apache.metron.enrichment.adapters.jdbc;
+
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.sql.*;
+
+public abstract class JdbcAdapter implements EnrichmentAdapter<String>,
+        Serializable {
+
+  protected static final Logger _LOG = LoggerFactory
+          .getLogger(JdbcAdapter.class);
+
+  protected Connection connection;
+  protected Statement statement;
+
+  private JdbcConfig config;
+  private String host;
+
+  public JdbcAdapter withJdbcConfig(JdbcConfig config) {
+    this.config = config;
+    this.host = config.getHost();
+    return this;
+  }
+
+  @Override
+  public boolean initializeAdapter() {
+    try {
+      if (!InetAddress.getByName(host).isReachable(500)) {
+        throw new Exception("Unable to reach host " + host);
+      }
+      Class.forName(config.getClassName());
+      connection = DriverManager.getConnection(config.getJdbcUrl());
+      connection.setReadOnly(true);
+      if (!connection.isValid(0))
+        throw new Exception("Invalid connection string....");
+      statement = connection.createStatement(
+              ResultSet.TYPE_SCROLL_INSENSITIVE,
+              ResultSet.CONCUR_READ_ONLY);
+      return true;
+    } catch (Exception e) {
+      e.printStackTrace();
+      _LOG.error("[Metron] JDBC connection failed....", e);
+
+      return false;
+    }
+  }
+
+  @Override
+  public void cleanup() {
+    try {
+      if (statement != null) statement.close();
+      if (connection != null) connection.close();
+    } catch (SQLException e) {
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcConfig.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcConfig.java
new file mode 100644
index 0000000..cbd6f66
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcConfig.java
@@ -0,0 +1,9 @@
+package org.apache.metron.enrichment.adapters.jdbc;
+
+public interface JdbcConfig {
+
+  public String getClassName();
+  public String getJdbcUrl();
+  public String getHost();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/MySqlConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/MySqlConfig.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/MySqlConfig.java
new file mode 100644
index 0000000..9b8ee79
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/MySqlConfig.java
@@ -0,0 +1,22 @@
+package org.apache.metron.enrichment.adapters.jdbc;
+
+public class MySqlConfig extends BaseJdbcConfig {
+
+  @Override
+  public String getClassName() {
+    return "com.mysql.jdbc.Driver";
+  }
+
+  @Override
+  public String getJdbcUrl() {
+    StringBuilder url = new StringBuilder();
+    url.append("jdbc:mysql://").append(host);
+    if (port > 0) {
+      url.append(":").append(port);
+    }
+    url.append("/").append(table);
+    url.append("?user=").append(username);
+    url.append("&password=").append(password);
+    return url.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/AbstractThreatAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/AbstractThreatAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/AbstractThreatAdapter.java
index d66a7dc..d23a639 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/AbstractThreatAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/AbstractThreatAdapter.java
@@ -33,4 +33,8 @@ public abstract class AbstractThreatAdapter implements EnrichmentAdapter,Seriali
 	
 	abstract public boolean initializeAdapter();
 
+	@Override
+	public void cleanup() {
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
index b39cbe5..cfd47c8 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
@@ -18,11 +18,13 @@
 package org.apache.metron.enrichment.adapters.threat;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.json.simple.parser.ParseException;
@@ -36,10 +38,14 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.log4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("unchecked")
-public class ThreatHbaseAdapter extends AbstractThreatAdapter {
+public class ThreatHbaseAdapter implements EnrichmentAdapter<String>,
+				Serializable {
 
+	protected static final org.slf4j.Logger LOG = LoggerFactory
+					.getLogger(ThreatHbaseAdapter.class);
 	private static final long serialVersionUID = 1L;
 	private String _tableName;
 	private HTableInterface table;
@@ -124,6 +130,8 @@ public class ThreatHbaseAdapter extends AbstractThreatAdapter {
 		return false;
 	}
 
-	
+	@Override
+	public void cleanup() {
 
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/AbstractWhoisAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/AbstractWhoisAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/AbstractWhoisAdapter.java
deleted file mode 100644
index 45cf5a0..0000000
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/AbstractWhoisAdapter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.enrichment.adapters.whois;
-
-import java.io.Serializable;
-
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-
-public abstract class AbstractWhoisAdapter implements EnrichmentAdapter,Serializable{
-
-	/**
-	 * 
-	 */
-	private static final long serialVersionUID = 8280523289446309728L;
-	protected static final Logger LOG = LoggerFactory
-			.getLogger(AbstractWhoisAdapter.class);
-	
-	abstract public boolean initializeAdapter();
-	abstract public JSONObject enrich(String metadata);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
index 43f4149..bbcd0bd 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
@@ -18,6 +18,7 @@
 package org.apache.metron.enrichment.adapters.whois;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Arrays;
 
 import org.apache.hadoop.conf.Configuration;
@@ -28,16 +29,19 @@ import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
 import org.json.simple.JSONObject;
 
 import com.google.common.base.Joiner;
 import org.apache.metron.tldextractor.BasicTldExtractor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class WhoisHBaseAdapter extends AbstractWhoisAdapter {
+public class WhoisHBaseAdapter implements EnrichmentAdapter<String>,
+				Serializable {
 
-	/**
-	 * 
-	 */
+	protected static final Logger LOG = LoggerFactory
+					.getLogger(WhoisHBaseAdapter.class);
 	private static final long serialVersionUID = 3371873619030870389L;
 	private HTableInterface table;
 	private String _table_name;
@@ -123,8 +127,13 @@ public class WhoisHBaseAdapter extends AbstractWhoisAdapter {
 		return output;
 
 	}
-	
-//	private String format(String input) {
+
+	@Override
+	public void cleanup() {
+
+	}
+
+	//	private String format(String input) {
 //		String output = input;
 //		String[] tokens = input.split("\\.");
 //		if(tokens.length > 2) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
new file mode 100644
index 0000000..f16c2fb
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
@@ -0,0 +1,57 @@
+package org.apache.metron.enrichment.bolt;
+
+import backtype.storm.task.TopologyContext;
+import org.apache.metron.bolt.JoinBolt;
+import org.apache.metron.domain.Enrichment;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
+
+  protected static final Logger LOG = LoggerFactory
+          .getLogger(EnrichmentJoinBolt.class);
+
+  protected List<Enrichment> enrichments;
+
+  /**
+   * @param enrichments A class for sending tuples to enrichment bolt
+   * @return Instance of this class
+   */
+  public EnrichmentJoinBolt withEnrichments(List<Enrichment> enrichments) {
+    this.enrichments = enrichments;
+    return this;
+  }
+
+  @Override
+  public void prepare(Map map, TopologyContext topologyContext) {
+
+  }
+
+  @Override
+  public Set<String> getStreamIds() {
+    Set<String> streamIds = new HashSet<>();
+    for(Enrichment enrichment: enrichments) {
+      streamIds.add(enrichment.getName());
+    }
+    return streamIds;
+  }
+
+
+  @Override
+  public JSONObject joinValues(Map<String, JSONObject> streamValueMap) {
+    JSONObject message = new JSONObject();
+    message.put("message", streamValueMap.get("message"));
+    JSONObject enrichment = new JSONObject();
+    for(String streamId: getStreamIds()) {
+      enrichment.put(streamId, streamValueMap.get(streamId));
+    }
+    message.put("enrichment", enrichment);
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
new file mode 100644
index 0000000..1563652
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.bolt;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import backtype.storm.topology.base.BaseRichBolt;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.metron.domain.Enrichment;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.metron.helpers.topology.ErrorGenerator;
+
+/**
+ * Uses an adapter to enrich telemetry messages with additional metadata
+ * entries. For a list of available enrichment adapters see
+ * org.apache.metron.enrichment.adapters.
+ * <p/>
+ * At the moment of release the following enrichment adapters are available:
+ * <p/>
+ * <ul>
+ * <p/>
+ * <li>geo = attaches geo coordinates to IPs
+ * <li>whois = attaches whois information to domains
+ * <li>host = attaches reputation information to known hosts
+ * <li>CIF = attaches information from threat intelligence feeds
+ * <ul>
+ * <p/>
+ * <p/>
+ * Enrichments are optional
+ **/
+
+@SuppressWarnings({"rawtypes", "serial"})
+public class GenericEnrichmentBolt extends BaseRichBolt {
+
+  private static final Logger LOG = LoggerFactory
+          .getLogger(GenericEnrichmentBolt.class);
+  private OutputCollector collector;
+
+
+  protected String streamId;
+  protected Enrichment<EnrichmentAdapter> enrichment;
+  protected EnrichmentAdapter adapter;
+  protected transient CacheLoader<String, JSONObject> loader;
+  protected transient LoadingCache<String, JSONObject> cache;
+  protected Long maxCacheSize;
+  protected Long maxTimeRetain;
+
+
+  /**
+   * @param enrichment Object holding enrichment metadata
+   * @return Instance of this class
+   */
+
+  public GenericEnrichmentBolt withEnrichment
+  (Enrichment<EnrichmentAdapter> enrichment) {
+    this.streamId = enrichment.getName();
+    this.enrichment = enrichment;
+    this.adapter = this.enrichment.getAdapter();
+    return this;
+  }
+
+  /**
+   * @param maxCacheSize Maximum size of cache before flushing
+   * @return Instance of this class
+   */
+
+  public GenericEnrichmentBolt withMaxCacheSize(long maxCacheSize) {
+    this.maxCacheSize = maxCacheSize;
+    return this;
+  }
+
+  /**
+   * @param maxTimeRetain Maximum time to retain cached entry before expiring
+   * @return Instance of this class
+   */
+
+  public GenericEnrichmentBolt withMaxTimeRetain(long maxTimeRetain) {
+    this.maxTimeRetain = maxTimeRetain;
+    return this;
+  }
+
+  @Override
+  public void prepare(Map conf, TopologyContext topologyContext,
+                      OutputCollector collector) {
+    this.collector = collector;
+    if (this.enrichment == null)
+      throw new IllegalStateException("enrichment must be specified");
+    if (this.maxCacheSize == null)
+      throw new IllegalStateException("MAX_CACHE_SIZE_OBJECTS_NUM must be specified");
+    if (this.maxTimeRetain == null)
+      throw new IllegalStateException("MAX_TIME_RETAIN_MINUTES must be specified");
+    if (this.adapter == null)
+      throw new IllegalStateException("Adapter must be specified");
+    if (this.enrichment.getFields() == null)
+      throw new IllegalStateException(
+              "Fields to be enriched must be specified");
+    loader = new CacheLoader<String, JSONObject>() {
+      public JSONObject load(String key) throws Exception {
+        return adapter.enrich(key);
+      }
+    };
+    cache = CacheBuilder.newBuilder().maximumSize(maxCacheSize)
+            .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES)
+            .build(loader);
+    boolean success = adapter.initializeAdapter();
+    if (!success) {
+      LOG.error("[Metron] EnrichmentBolt could not initialize adapter");
+      throw new IllegalStateException("Could not initialize adapter...");
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declearer) {
+    declearer.declareStream(streamId, new Fields("key", "message"));
+    declearer.declareStream("error", new Fields("message"));
+  }
+
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void execute(Tuple tuple) {
+    String key = tuple.getStringByField("key");
+    JSONObject rawMessage = (JSONObject) tuple.getValueByField("message");
+    JSONObject enrichedMessage = new JSONObject();
+    try {
+      if (rawMessage == null || rawMessage.isEmpty())
+        throw new Exception("Could not parse binary stream to JSON");
+      if (key == null)
+        throw new Exception("Key is not valid");
+      for (String field : enrichment.getFields()) {
+        JSONObject enrichedField = new JSONObject();
+        String value = (String) rawMessage.get(field);
+        if (value != null && value.length() != 0) {
+          enrichedField = cache.getUnchecked(value);
+          if (enrichedField == null)
+            throw new Exception("[Metron] Could not enrich string: "
+                    + value);
+        }
+        enrichedMessage.put(field, enrichedField);
+      }
+      if (!enrichedMessage.isEmpty()) {
+        collector.emit(streamId, new Values(key, enrichedMessage));
+      }
+    } catch (Exception e) {
+      LOG.error("[Metron] Unable to enrich message: " + rawMessage);
+      JSONObject error = ErrorGenerator.generateErrorMessage("Enrichment problem: " + rawMessage, e);
+      if (key != null) {
+        collector.emit(streamId, new Values(key, enrichedMessage));
+      }
+      collector.emit("error", new Values(error));
+    }
+  }
+
+  @Override
+  public void cleanup() {
+    adapter.cleanup();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/common/AbstractEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/common/AbstractEnrichmentBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/common/AbstractEnrichmentBolt.java
deleted file mode 100644
index d3f2fb7..0000000
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/common/AbstractEnrichmentBolt.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.common;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.base.BaseRichBolt;
-
-import com.codahale.metrics.Counter;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-import org.apache.metron.metrics.MetricReporter;
-
-@SuppressWarnings("rawtypes")
-public abstract class AbstractEnrichmentBolt extends BaseRichBolt {
-	/**
-	 * Abstract enrichment bolt
-	 */
-	private static final long serialVersionUID = -6710596708304282838L;
-
-	protected static final Logger LOG = LoggerFactory
-			.getLogger(AbstractEnrichmentBolt.class);
-
-	protected OutputCollector _collector;
-	protected String _OutputFieldName;
-
-	protected String _enrichment_tag;
-	protected Long _MAX_CACHE_SIZE_OBJECTS_NUM;
-	protected Long _MAX_TIME_RETAIN_MINUTES;
-
-	// JSON Keys to be enriched
-	protected List<String> _jsonKeys;
-	protected EnrichmentAdapter _adapter;
-	protected MetricReporter _reporter;
-
-	protected transient CacheLoader<String, JSONObject> loader;
-	protected transient LoadingCache<String, JSONObject> cache;
-
-	protected Counter ackCounter, emitCounter, failCounter;
-
-	protected void registerCounters() {
-
-		String ackString = _adapter.getClass().getSimpleName() + ".ack";
-
-		String emitString = _adapter.getClass().getSimpleName() + ".emit";
-
-		String failString = _adapter.getClass().getSimpleName() + ".fail";
-
-		ackCounter = _reporter.registerCounter(ackString);
-		emitCounter = _reporter.registerCounter(emitString);
-		failCounter = _reporter.registerCounter(failString);
-
-	}
-
-	public final void prepare(Map conf, TopologyContext topologyContext,
-			OutputCollector collector) {
-		_collector = collector;
-
-		if (this._OutputFieldName == null)
-			throw new IllegalStateException("OutputFieldName must be specified");
-		if (this._enrichment_tag == null)
-			throw new IllegalStateException("enrichment_tag must be specified");
-		if (this._MAX_CACHE_SIZE_OBJECTS_NUM == null)
-			throw new IllegalStateException("MAX_CACHE_SIZE_OBJECTS_NUM must be specified");
-		if (this._MAX_TIME_RETAIN_MINUTES == null)
-			throw new IllegalStateException("MAX_TIME_RETAIN_MINUTES must be specified");
-		if (this._adapter == null)
-			throw new IllegalStateException("Adapter must be specified");
-		if (this._jsonKeys == null)
-			throw new IllegalStateException(
-					"JSON Keys to be enriched, must be specified");
-
-		loader = new CacheLoader<String, JSONObject>() {
-			public JSONObject load(String key) throws Exception {
-				return _adapter.enrich(key);
-			}
-		};
-
-		cache = CacheBuilder.newBuilder().maximumSize(_MAX_CACHE_SIZE_OBJECTS_NUM)
-				.expireAfterWrite(_MAX_TIME_RETAIN_MINUTES, TimeUnit.MINUTES)
-				.build(loader);
-
-		boolean success = _adapter.initializeAdapter();
-
-		if (!success) {
-			LOG.error("[Metron] EnrichmentBolt could not initialize adapter");
-			throw new IllegalStateException("Could not initialize adapter...");
-		}
-
-		try {
-			doPrepare(conf, topologyContext, collector);
-		} catch (IOException e) {
-			LOG.error("[Metron] Counld not initialize...");
-			e.printStackTrace();
-		}
-
-	}
-
-	abstract void doPrepare(Map conf, TopologyContext topologyContext,
-			OutputCollector collector) throws IOException;
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/common/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/common/GenericEnrichmentBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/common/GenericEnrichmentBolt.java
deleted file mode 100644
index 5915039..0000000
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/common/GenericEnrichmentBolt.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.enrichment.common;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.configuration.Configuration;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-import org.apache.metron.helpers.topology.ErrorGenerator;
-import org.apache.metron.json.serialization.JSONEncoderHelper;
-import org.apache.metron.metrics.MetricReporter;
-
-/**
- * Uses an adapter to enrich telemetry messages with additional metadata
- * entries. For a list of available enrichment adapters see
- * org.apache.metron.enrichment.adapters.
- * <p>
- * At the moment of release the following enrichment adapters are available:
- * <p>
- * <ul>
- * 
- * <li>geo = attaches geo coordinates to IPs
- * <li>whois = attaches whois information to domains
- * <li>host = attaches reputation information to known hosts
- * <li>CIF = attaches information from threat intelligence feeds
- * <ul>
- * <p>
- * <p>
- * Enrichments are optional
- **/
-
-@SuppressWarnings({ "rawtypes", "serial" })
-public class GenericEnrichmentBolt extends AbstractEnrichmentBolt {
-
-	private static final Logger LOG = LoggerFactory
-			.getLogger(GenericEnrichmentBolt.class);
-	private JSONObject metricConfiguration;
-
-	/**
-	 * @param adapter
-	 *            Adapter for doing the enrichment
-	 * @return Instance of this class
-	 */
-
-	public GenericEnrichmentBolt withAdapter(EnrichmentAdapter adapter) {
-		_adapter = adapter;
-		return this;
-	}
-
-	/**
-	 * @param OutputFieldName
-	 *            Fieldname of the output tuple for this bolt
-	 * @return Instance of this class
-	 */
-
-	public GenericEnrichmentBolt withOutputFieldName(String OutputFieldName) {
-		_OutputFieldName = OutputFieldName;
-		return this;
-	}
-
-	/**
-	 * @param EnrichmentTag
-	 *            Defines what tag the enrichment will be tagged with in the
-	 *            telemetry message
-	 * @return Instance of this class
-	 */
-
-	public GenericEnrichmentBolt withEnrichmentTag(String EnrichmentTag) {
-		_enrichment_tag = EnrichmentTag;
-		return this;
-	}
-
-	/**
-	 * @param MAX_CACHE_SIZE_OBJECTS_NUM
-	 *            Maximum size of cache before flushing
-	 * @return Instance of this class
-	 */
-
-	public GenericEnrichmentBolt withMaxCacheSize(long MAX_CACHE_SIZE_OBJECTS_NUM) {
-		_MAX_CACHE_SIZE_OBJECTS_NUM = MAX_CACHE_SIZE_OBJECTS_NUM;
-		return this;
-	}
-
-	/**
-	 * @param MAX_TIME_RETAIN_MINUTES
-	 *            Maximum time to retain cached entry before expiring
-	 * @return Instance of this class
-	 */
-
-	public GenericEnrichmentBolt withMaxTimeRetain(long MAX_TIME_RETAIN_MINUTES) {
-		_MAX_TIME_RETAIN_MINUTES = MAX_TIME_RETAIN_MINUTES;
-		return this;
-	}
-
-	/**
-	 * @param jsonKeys
-	 *            Keys in the telemetry message that are to be enriched by this
-	 *            bolt
-	 * @return Instance of this class
-	 */
-
-	public GenericEnrichmentBolt withKeys(List<String> jsonKeys) {
-		_jsonKeys = jsonKeys;
-		return this;
-	}
-
-	/**
-	 * @param config
-	 *            A class for generating custom metrics into graphite
-	 * @return Instance of this class
-	 */
-
-	public GenericEnrichmentBolt withMetricConfiguration(Configuration config) {
-		this.metricConfiguration = JSONEncoderHelper.getJSON(config
-				.subset("org.apache.metron.metrics"));
-		return this;
-	}
-
-	@SuppressWarnings("unchecked")
-	public void execute(Tuple tuple) {
-
-		LOG.trace("[Metron] Starting enrichment");
-
-		JSONObject in_json = null;
-		String key = null;
-		
-		try {
-
-			key = tuple.getStringByField("key");
-			in_json = (JSONObject) tuple.getValueByField("message");
-
-			if (in_json == null || in_json.isEmpty())
-				throw new Exception("Could not parse binary stream to JSON");
-			
-			if(key == null)
-				throw new Exception("Key is not valid");
-
-			LOG.trace("[Metron] Received tuple: " + in_json);
-
-			JSONObject message = (JSONObject) in_json.get("message");
-
-			if (message == null || message.isEmpty())
-				throw new Exception("Could not extract message from JSON: "
-						+ in_json);
-
-			LOG.trace("[Metron] Extracted message: " + message);
-
-			for (String jsonkey : _jsonKeys) {
-				LOG.trace("[Metron] Processing:" + jsonkey + " within:"
-						+ message);
-
-				String jsonvalue = (String) message.get(jsonkey);
-				LOG.trace("[Metron] Processing: " + jsonkey + " -> "
-						+ jsonvalue);
-
-				if (null == jsonvalue) {
-					LOG.trace("[Metron] Key " + jsonkey
-							+ "not present in message " + message);
-					continue;
-				}
-				
-				// If the field is empty, no need to enrich
-				if ( jsonvalue.length() == 0) {
-					continue;
-				}
-
-				JSONObject enrichment = cache.getUnchecked(jsonvalue);
-				LOG.trace("[Metron] Enriched: " + jsonkey + " -> "
-						+ enrichment);
-
-				if (enrichment == null)
-					throw new Exception("[Metron] Could not enrich string: "
-							+ jsonvalue);
-
-				if (!in_json.containsKey("enrichment")) {
-					in_json.put("enrichment", new JSONObject());
-					LOG.trace("[Metron] Starting a string of enrichments");
-				}
-
-				JSONObject enr1 = (JSONObject) in_json.get("enrichment");
-
-				if (enr1 == null)
-					throw new Exception("Internal enrichment is empty");
-
-				if (!enr1.containsKey(_enrichment_tag)) {
-					enr1.put(_enrichment_tag, new JSONObject());
-					LOG.trace("[Metron] Starting a new enrichment");
-				}
-
-				LOG.trace("[Metron] ENR1 is: " + enr1);
-
-				JSONObject enr2 = (JSONObject) enr1.get(_enrichment_tag);
-				enr2.put(jsonkey, enrichment);
-
-				LOG.trace("[Metron] ENR2 is: " + enr2);
-
-				enr1.put(_enrichment_tag, enr2);
-				in_json.put("enrichment", enr1);
-			}
-
-			LOG.debug("[Metron] Generated combined enrichment: " + in_json);
-
-			_collector.emit("message", new Values(key, in_json));
-			_collector.ack(tuple);
-
-			if (_reporter != null) {
-				emitCounter.inc();
-				ackCounter.inc();
-			}
-		} catch (Exception e) {
-			
-			LOG.error("[Metron] Unable to enrich message: " + in_json);
-			_collector.fail(tuple);
-
-			if (_reporter != null) {
-				failCounter.inc();
-			}
-			
-			JSONObject error = ErrorGenerator.generateErrorMessage("Enrichment problem: " + in_json, e);
-			_collector.emit("error", new Values(error));
-		}
-		
-		
-
-	}
-
-	public void declareOutputFields(OutputFieldsDeclarer declearer) {
-		declearer.declareStream("message", new Fields("key", "message"));
-		declearer.declareStream("error", new Fields("message"));
-	}
-
-	@Override
-	void doPrepare(Map conf, TopologyContext topologyContext,
-			OutputCollector collector) throws IOException {
-		LOG.info("[Metron] Preparing Enrichment Bolt...");
-
-		_collector = collector;
-
-		try {
-			_reporter = new MetricReporter();
-			_reporter.initialize(metricConfiguration,
-					GenericEnrichmentBolt.class);
-			this.registerCounters();
-		} catch (Exception e) {
-			LOG.info("[Metron] Unable to initialize metrics reporting");
-		}
-
-		LOG.info("[Metron] Enrichment bolt initialized...");
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapterTest.java b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapterTest.java
index a54dc6e..f5b6a24 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapterTest.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapterTest.java
@@ -19,6 +19,9 @@ package org.apache.metron.enrichment.adapters.geo;
 import java.net.URL;
 import java.util.Properties;
 
+import org.apache.metron.enrichment.adapters.jdbc.JdbcAdapter;
+import org.apache.metron.enrichment.adapters.jdbc.JdbcConfig;
+import org.apache.metron.enrichment.adapters.jdbc.MySqlConfig;
 import org.json.simple.JSONObject;
 
 import org.apache.metron.test.AbstractSchemaTest;
@@ -35,7 +38,7 @@ import org.junit.Assert;
  */
 public class GeoMysqlAdapterTest extends AbstractSchemaTest {
 
-    private static GeoMysqlAdapter geoMySqlAdapter=null;
+    private static JdbcAdapter geoMySqlAdapter=null;
     private static boolean connected=false;
 
     /**
@@ -75,7 +78,14 @@ public class GeoMysqlAdapterTest extends AbstractSchemaTest {
             System.out.println(getClass().getName()+" Skipping Tests !!Local Mode");
             return;//skip tests
        }else{
-           GeoMysqlAdapterTest.setGeoMySqlAdapter(new GeoMysqlAdapter((String)prop.get("mysql.ip"), (new Integer((String)prop.get("mysql.port"))).intValue(),(String)prop.get("mysql.username"),(String)prop.get("mysql.password"), (String)prop.get("bolt.enrichment.geo.adapter.table")));
+          MySqlConfig mySqlConfig = new MySqlConfig();
+          mySqlConfig.setHost((String)prop.get("mysql.ip"));
+          mySqlConfig.setPort(new Integer((String) prop.get("mysql.port")));
+          mySqlConfig.setUsername((String)prop.get("mysql.username"));
+          mySqlConfig.setPassword((String)prop.get("mysql.password"));
+          mySqlConfig.setTable((String)prop.get("bolt.enrichment.geo.adapter.table"));
+          JdbcAdapter geoAdapter = new GeoAdapter().withJdbcConfig(mySqlConfig);
+           GeoMysqlAdapterTest.setGeoMySqlAdapter(geoAdapter);
            connected =geoMySqlAdapter.initializeAdapter();
            Assert.assertTrue(connected);
            URL schema_url = getClass().getClassLoader().getResource(
@@ -95,7 +105,7 @@ public class GeoMysqlAdapterTest extends AbstractSchemaTest {
     }
 
     /**
-     * Test method for {@link org.apache.metron.enrichment.adapters.geo.GeoMysqlAdapter#enrich(java.lang.String)}.
+     * Test method for {@link org.apache.metron.enrichment.adapters.geo.GeoAdapter#enrich(java.lang.String)}.
      */
     public void testEnrich() {
         if(skipTests(this.getMode())){
@@ -123,7 +133,7 @@ public class GeoMysqlAdapterTest extends AbstractSchemaTest {
     }
 
     /**
-     * Test method for {@link org.apache.metron.enrichment.adapters.geo.GeoMysqlAdapter#initializeAdapter()}.
+     * Test method for {@link org.apache.metron.enrichment.adapters.geo.GeoAdapter#initializeAdapter()}.
      */
     public void testInitializeAdapter() {
         if(skipTests(this.getMode())){
@@ -135,7 +145,13 @@ public class GeoMysqlAdapterTest extends AbstractSchemaTest {
     }
  
     /**
-     * Test method for {@link org.apache.metron.enrichment.adapters.geo.GeoMysqlAdapter#GeoMysqlAdapter(java.lang.String, int, java.lang.String, java.lang.String, java.lang.String)}.
+     * Test method for
+     *
+     *
+     *
+     *
+     *
+     * {@link org.apache.metron.enrichment.adapters.geo.GeoAdapter}.
      */
     public void testGeoMysqlAdapter() {
         if(skipTests(this.getMode())){
@@ -150,7 +166,7 @@ public class GeoMysqlAdapterTest extends AbstractSchemaTest {
      * @return the geoMySqlAdapter.
      */
     
-    public static GeoMysqlAdapter getGeoMySqlAdapter() {
+    public static JdbcAdapter getGeoMySqlAdapter() {
         return geoMySqlAdapter;
     }
 
@@ -159,7 +175,7 @@ public class GeoMysqlAdapterTest extends AbstractSchemaTest {
      * @param geoMySqlAdapter the geoMySqlAdapter.
      */
     
-    public static void setGeoMySqlAdapter(GeoMysqlAdapter geoMySqlAdapter) {
+    public static void setGeoMySqlAdapter(JdbcAdapter geoMySqlAdapter) {
     
         GeoMysqlAdapterTest.geoMySqlAdapter = geoMySqlAdapter;
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/pom.xml b/metron-streaming/Metron-MessageParsers/pom.xml
index d36021f..4de84a2 100644
--- a/metron-streaming/Metron-MessageParsers/pom.xml
+++ b/metron-streaming/Metron-MessageParsers/pom.xml
@@ -29,6 +29,11 @@
 			<version>${project.parent.version}</version>
 		</dependency>
 		<dependency>
+			<groupId>org.apache.metron</groupId>
+			<artifactId>Metron-EnrichmentAdapters</artifactId>
+			<version>${project.parent.version}</version>
+		</dependency>
+		<dependency>
 			<groupId>org.apache.storm</groupId>
 			<artifactId>storm-core</artifactId>
 			<version>${global_storm_version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/PcapParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/PcapParserBolt.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/PcapParserBolt.java
new file mode 100644
index 0000000..49c80d7
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/PcapParserBolt.java
@@ -0,0 +1,27 @@
+package org.apache.metron.bolt;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.json.simple.JSONObject;
+
+import java.util.List;
+
+public class PcapParserBolt extends TelemetryParserBolt {
+
+  @Override
+  public void declareOther(OutputFieldsDeclarer declarer) {
+    declarer.declareStream("raw", new Fields("key", "value", "timestamp") );
+  }
+
+  @Override
+  public void emitOther(Tuple tuple, List<JSONObject> messages) {
+    for(JSONObject message: messages) {
+      String key = (String) message.get("pcap_id");
+      long timestamp = (long) message.get("ts_micro");
+      collector.emit("raw", tuple, new Values(key, tuple.getBinary(0),
+              timestamp));
+    }
+  }
+}