You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/02/10 21:17:42 UTC
[4/4] incubator-metron git commit: METRON-33 Execute Enrichments in
Parallel (merrimanr via cestella) closes apache/incubator-metron#19
METRON-33 Execute Enrichments in Parallel (merrimanr via cestella) closes apache/incubator-metron#19
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/905b09f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/905b09f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/905b09f6
Branch: refs/heads/master
Commit: 905b09f629a936fedf1fb5468d5d3ce357bcd9d2
Parents: 973b1bb
Author: merrimanr <me...@gmail.com>
Authored: Wed Feb 10 15:17:11 2016 -0500
Committer: cstella <ce...@gmail.com>
Committed: Wed Feb 10 15:17:11 2016 -0500
----------------------------------------------------------------------
.../java/org/apache/metron/bolt/JoinBolt.java | 105 ++++++
.../java/org/apache/metron/bolt/SplitBolt.java | 83 +++++
.../org/apache/metron/domain/Enrichment.java | 37 ++
.../interfaces/EnrichmentAdapter.java | 5 +-
.../metron/parser/interfaces/MessageFilter.java | 7 +-
.../metron/parser/interfaces/MessageParser.java | 10 +-
.../java/org/apache/metron/pcap/PcapParser.java | 183 ----------
.../adapters/cif/AbstractCIFAdapter.java | 4 +
.../adapters/cif/CIFHbaseAdapter.java | 14 +-
.../adapters/geo/AbstractGeoAdapter.java | 61 ----
.../enrichment/adapters/geo/GeoAdapter.java | 54 +++
.../adapters/geo/GeoMysqlAdapter.java | 186 ----------
.../adapters/host/AbstractHostAdapter.java | 7 +-
.../adapters/host/HostFromJSONListAdapter.java | 50 +++
.../adapters/jdbc/BaseJdbcConfig.java | 53 +++
.../enrichment/adapters/jdbc/JdbcAdapter.java | 61 ++++
.../enrichment/adapters/jdbc/JdbcConfig.java | 9 +
.../enrichment/adapters/jdbc/MySqlConfig.java | 22 ++
.../adapters/threat/AbstractThreatAdapter.java | 4 +
.../adapters/threat/ThreatHbaseAdapter.java | 12 +-
.../adapters/whois/AbstractWhoisAdapter.java | 40 ---
.../adapters/whois/WhoisHBaseAdapter.java | 21 +-
.../enrichment/bolt/EnrichmentJoinBolt.java | 57 +++
.../enrichment/bolt/GenericEnrichmentBolt.java | 186 ++++++++++
.../common/AbstractEnrichmentBolt.java | 128 -------
.../common/GenericEnrichmentBolt.java | 279 ---------------
.../adapters/geo/GeoMysqlAdapterTest.java | 30 +-
metron-streaming/Metron-MessageParsers/pom.xml | 5 +
.../org/apache/metron/bolt/PcapParserBolt.java | 27 ++
.../apache/metron/bolt/TelemetryParserBolt.java | 137 +++++++
.../apache/metron/filters/BroMessageFilter.java | 19 +-
.../metron/filters/GenericMessageFilter.java | 10 +-
.../metron/parsing/AbstractParserBolt.java | 144 --------
.../apache/metron/parsing/PcapParserBolt.java | 227 ------------
.../metron/parsing/TelemetryParserBolt.java | 222 ------------
.../metron/parsing/parsers/AbstractParser.java | 48 ---
.../metron/parsing/parsers/BasicBroParser.java | 23 +-
.../parsing/parsers/BasicFireEyeParser.java | 86 ++---
.../metron/parsing/parsers/BasicIseParser.java | 41 ++-
.../parsing/parsers/BasicLancopeParser.java | 32 +-
.../parsing/parsers/BasicLogstashParser.java | 26 +-
.../parsers/BasicPaloAltoFirewallParser.java | 355 ++++++++++---------
.../metron/parsing/parsers/BasicParser.java | 54 +++
.../parsing/parsers/BasicSnortParser.java | 36 +-
.../parsing/parsers/BasicSourcefireParser.java | 32 +-
.../metron/parsing/parsers/BasicYafParser.java | 18 +-
.../metron/parsing/parsers/GrokAsaParser.java | 41 +--
.../parsing/parsers/GrokSourcefireParser.java | 25 +-
.../metron/parsing/parsers/GrokUtils.java | 26 --
.../metron/parsing/parsers/JSONCleaner.java | 4 +-
.../metron/parsing/parsers/MetronGrok.java | 1 +
.../metron/parsing/parsers/ParserUtils.java | 23 --
.../metron/parsing/parsers/PcapParser.java | 89 +++--
.../apache/metron/parsing/utils/GrokUtils.java | 26 ++
.../metron/parsing/utils/ParserUtils.java | 55 +++
.../metron/parsing/test/BasicBroParserTest.java | 128 +++----
.../parsing/test/BasicFireEyeParserTest.java | 10 +-
.../metron/parsing/test/BasicIseParserTest.java | 4 +-
.../parsing/test/BasicLancopeParserTest.java | 2 +-
.../test/BasicPaloAltoFirewallParserTest.java | 5 +-
.../parsing/test/BasicSourcefireParserTest.java | 4 +-
.../metron/parsing/test/BroParserTest.java | 2 +-
.../metron/parsing/test/GrokAsaParserTest.java | 4 +-
.../test/spouts/GenericInternalTestSpout.java | 8 +-
.../etc/whitelists/known_hosts.conf | 3 +-
.../Metron_Configs/topologies/asa/local.yaml | 187 +++++-----
.../Metron_Configs/topologies/asa/remote.yaml | 191 +++++-----
.../Metron_Configs/topologies/bro/local.yaml | 11 +-
.../Metron_Configs/topologies/bro/remote.yaml | 11 +-
.../topologies/fireeye/local.yaml | 187 +++++-----
.../topologies/fireeye/remote.yaml | 191 +++++-----
.../Metron_Configs/topologies/ise/local.yaml | 11 +-
.../Metron_Configs/topologies/ise/remote.yaml | 11 +-
.../topologies/lancope/local.yaml | 163 ++++++---
.../topologies/lancope/remote.yaml | 167 ++++++---
.../topologies/paloalto/local.yaml | 11 +-
.../topologies/paloalto/remote.yaml | 11 +-
.../Metron_Configs/topologies/pcap/local.yaml | 121 ++++++-
.../Metron_Configs/topologies/pcap/remote.yaml | 136 ++++++-
.../Metron_Configs/topologies/snort/local.yaml | 13 +-
.../Metron_Configs/topologies/snort/remote.yaml | 11 +-
.../topologies/sourcefire/local.yaml | 187 +++++-----
.../topologies/sourcefire/remote.yaml | 191 +++++-----
.../Metron_Configs/topologies/yaf/local.yaml | 11 +-
.../Metron_Configs/topologies/yaf/remote.yaml | 11 +-
.../integration/pcap/PcapIntegrationTest.java | 57 +--
.../util/integration/ComponentRunner.java | 2 +-
.../integration/util/mock/MockGeoAdapter.java | 31 ++
88 files changed, 2843 insertions(+), 2790 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/JoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/JoinBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/JoinBolt.java
new file mode 100644
index 0000000..aec3fd0
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/JoinBolt.java
@@ -0,0 +1,105 @@
+package org.apache.metron.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public abstract class JoinBolt<V> extends BaseRichBolt {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(JoinBolt.class);
+ protected OutputCollector collector;
+ protected ImmutableSet<String> streamIds;
+
+ protected transient CacheLoader<String, Map<String, V>> loader;
+ protected transient LoadingCache<String, Map<String, V>> cache;
+ protected Long maxCacheSize;
+ protected Long maxTimeRetain;
+
+ public JoinBolt withMaxCacheSize(long maxCacheSize) {
+ this.maxCacheSize = maxCacheSize;
+ return this;
+ }
+
+ public JoinBolt withMaxTimeRetain(long maxTimeRetain) {
+ this.maxTimeRetain = maxTimeRetain;
+ return this;
+ }
+
+ @Override
+ public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+ this.collector = outputCollector;
+ if (this.maxCacheSize == null)
+ throw new IllegalStateException("maxCacheSize must be specified");
+ if (this.maxTimeRetain == null)
+ throw new IllegalStateException("maxTimeRetain must be specified");
+ loader = new CacheLoader<String, Map<String, V>>() {
+ public Map<String, V> load(String key) throws Exception {
+ return new HashMap<>();
+ }
+ };
+ cache = CacheBuilder.newBuilder().maximumSize(maxCacheSize)
+ .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES)
+ .build(loader);
+ Set<String> temp = getStreamIds();
+ temp.add("message");
+ streamIds = ImmutableSet.copyOf(temp);
+ prepare(map, topologyContext);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void execute(Tuple tuple) {
+ String streamId = tuple.getSourceStreamId();
+ String key = (String) tuple.getValueByField("key");
+ V value = (V) tuple.getValueByField("message");
+ try {
+ Map<String, V> streamValueMap = cache.get(key);
+ if (streamValueMap.containsKey(streamId)) {
+ LOG.warn(String.format("Received key %s twice for " +
+ "stream %s", key, streamId));
+ }
+ streamValueMap.put(streamId, value);
+ Set<String> streamValueKeys = streamValueMap.keySet();
+ if (streamValueKeys.size() == streamIds.size() && Sets.symmetricDifference
+ (streamValueKeys, streamIds)
+ .isEmpty()) {
+ collector.emit("message", tuple, new Values(key, joinValues
+ (streamValueMap)));
+ collector.ack(tuple);
+ cache.invalidate(key);
+ } else {
+ cache.put(key, streamValueMap);
+ }
+ } catch (ExecutionException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declareStream("message", new Fields("key", "message"));
+ }
+
+ public abstract void prepare(Map map, TopologyContext topologyContext);
+
+ public abstract Set<String> getStreamIds();
+
+ public abstract V joinValues(Map<String, V> streamValueMap);
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java
new file mode 100644
index 0000000..ad4edb7
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java
@@ -0,0 +1,83 @@
+package org.apache.metron.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import com.google.common.collect.ImmutableSet;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+public abstract class SplitBolt<T> extends
+ BaseRichBolt {
+
+ protected OutputCollector collector;
+ private Set<String> streamIds;
+
+ @Override
+ public final void prepare(Map map, TopologyContext topologyContext,
+ OutputCollector outputCollector) {
+ collector = outputCollector;
+ streamIds = ImmutableSet.copyOf(getStreamIds());
+ prepare(map, topologyContext);
+ }
+
+ @Override
+ public final void execute(Tuple tuple) {
+ emit(tuple, generateMessages(tuple));
+ }
+
+ @Override
+ public final void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declareStream("message", new Fields("key", "message"));
+ for (String streamId : getStreamIds()) {
+ declarer.declareStream(streamId, new Fields("key", "message"));
+ }
+ declarer.declareStream("error", new Fields("message"));
+ declareOther(declarer);
+ }
+
+ public void emit(Tuple tuple, List<T> messages) {
+ for(T message: messages) {
+ String key = getKey(tuple, message);
+ collector.emit("message", tuple, new Values(key, message));
+ Map<String, T> streamValueMap = splitMessage(message);
+ for (String streamId : streamIds) {
+ T streamValue = streamValueMap.get(streamId);
+ if (streamValue == null) {
+ streamValue = getDefaultValue(streamId);
+ }
+ collector.emit(streamId, new Values(key, streamValue));
+ }
+ collector.ack(tuple);
+ }
+ emitOther(tuple, messages);
+ }
+
+ protected T getDefaultValue(String streamId) {
+ throw new IllegalArgumentException("Could not find a message for" +
+ " stream: " + streamId);
+ }
+
+ public abstract void prepare(Map map, TopologyContext topologyContext);
+
+ public abstract Set<String> getStreamIds();
+
+ public abstract String getKey(Tuple tuple, T message);
+
+ public abstract List<T> generateMessages(Tuple tuple);
+
+ public abstract Map<String, T> splitMessage(T message);
+
+ public abstract void declareOther(OutputFieldsDeclarer declarer);
+
+ public abstract void emitOther(Tuple tuple, List<T> messages);
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java
new file mode 100644
index 0000000..8567357
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java
@@ -0,0 +1,37 @@
+package org.apache.metron.domain;
+
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class Enrichment<T extends EnrichmentAdapter> implements Serializable {
+
+ private String name;
+ private List<String> fields;
+ private T adapter;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public List<String> getFields() {
+ return fields;
+ }
+
+ public void setFields(List<String> fields) {
+ this.fields = fields;
+ }
+
+ public T getAdapter() {
+ return adapter;
+ }
+
+ public void setAdapter(T adapter) {
+ this.adapter = adapter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
index c0eb0e0..8eb5937 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
@@ -19,8 +19,9 @@ package org.apache.metron.enrichment.interfaces;
import org.json.simple.JSONObject;
-public interface EnrichmentAdapter
+public interface EnrichmentAdapter<T>
{
- JSONObject enrich(String metadata);
+ JSONObject enrich(T value);
boolean initializeAdapter();
+ void cleanup();
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Common/src/main/java/org/apache/metron/parser/interfaces/MessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/parser/interfaces/MessageFilter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/parser/interfaces/MessageFilter.java
index 88a3543..a3c0089 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/parser/interfaces/MessageFilter.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/parser/interfaces/MessageFilter.java
@@ -1,10 +1,7 @@
package org.apache.metron.parser.interfaces;
-import org.json.simple.JSONObject;
+public interface MessageFilter<T> {
-
-public interface MessageFilter {
-
- public boolean emitTuple(JSONObject message);
+ boolean emitTuple(T message);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Common/src/main/java/org/apache/metron/parser/interfaces/MessageParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/parser/interfaces/MessageParser.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/parser/interfaces/MessageParser.java
index f09484a..a4fd2d8 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/parser/interfaces/MessageParser.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/parser/interfaces/MessageParser.java
@@ -1,11 +1,11 @@
package org.apache.metron.parser.interfaces;
-import org.json.simple.JSONObject;
+import java.util.List;
+
+public interface MessageParser<T> {
-public interface MessageParser {
-
- void initializeParser();
void init();
- JSONObject parse(byte[] raw_message);
+ List<T> parse(byte[] rawMessage);
+ boolean validate(T message);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Common/src/main/java/org/apache/metron/pcap/PcapParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/pcap/PcapParser.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/pcap/PcapParser.java
deleted file mode 100644
index 18874c2..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/pcap/PcapParser.java
+++ /dev/null
@@ -1,183 +0,0 @@
-package org.apache.metron.pcap;
-
-import java.io.EOFException;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.log4j.Logger;
-import org.krakenapps.pcap.decoder.ethernet.EthernetDecoder;
-import org.krakenapps.pcap.decoder.ethernet.EthernetType;
-import org.krakenapps.pcap.decoder.ip.IpDecoder;
-import org.krakenapps.pcap.decoder.ip.Ipv4Packet;
-import org.krakenapps.pcap.decoder.tcp.TcpPacket;
-import org.krakenapps.pcap.decoder.udp.UdpPacket;
-import org.krakenapps.pcap.file.GlobalHeader;
-import org.krakenapps.pcap.packet.PacketHeader;
-import org.krakenapps.pcap.packet.PcapPacket;
-import org.krakenapps.pcap.util.Buffer;
-
-// TODO: Auto-generated Javadoc
-/**
- * The Class PcapParser.
- *
- * @author sheetal
- * @version $Revision: 1.0 $
- */
-public final class PcapParser {
-
- /** The Constant LOG. */
- private static final Logger LOG = Logger.getLogger(PcapParser.class);
-
- /** The ETHERNET_DECODER. */
- private static final EthernetDecoder ETHERNET_DECODER = new EthernetDecoder();
-
- /** The ip decoder. */
- private static final IpDecoder IP_DECODER = new IpDecoder();
-
- // /** The tcp decoder. */
- // private static final TcpDecoder TCP_DECODER = new TcpDecoder(new
- // TcpPortProtocolMapper());
- //
- // /** The udp decoder. */
- // private static final UdpDecoder UDP_DECODER = new UdpDecoder(new
- // UdpPortProtocolMapper());
-
- static {
- // IP_DECODER.register(InternetProtocol.TCP, TCP_DECODER);
- // IP_DECODER.register(InternetProtocol.UDP, UDP_DECODER);
- ETHERNET_DECODER.register(EthernetType.IPV4, IP_DECODER);
- }
-
- /**
- * Instantiates a new pcap parser.
- */
- private PcapParser() { // $codepro.audit.disable emptyMethod
-
- }
-
- /**
- * Parses the.
- *
- * @param tcpdump
- * the tcpdump
- * @return the list * @throws IOException Signals that an I/O exception has
- * occurred. * @throws IOException * @throws IOException * @throws
- * IOException
- * @throws IOException
- * Signals that an I/O exception has occurred.
- */
- public static List<PacketInfo> parse(byte[] tcpdump) throws IOException {
- List<PacketInfo> packetInfoList = new ArrayList<PacketInfo>();
-
- PcapByteInputStream pcapByteInputStream = new PcapByteInputStream(tcpdump);
-
- GlobalHeader globalHeader = pcapByteInputStream.getGlobalHeader();
- while (true) {
- try
-
- {
- PcapPacket packet = pcapByteInputStream.getPacket();
- // int packetCounter = 0;
- // PacketHeader packetHeader = null;
- // Ipv4Packet ipv4Packet = null;
- TcpPacket tcpPacket = null;
- UdpPacket udpPacket = null;
- // Buffer packetDataBuffer = null;
- int sourcePort = 0;
- int destinationPort = 0;
-
- // LOG.trace("Got packet # " + ++packetCounter);
-
- // LOG.trace(packet.getPacketData());
- ETHERNET_DECODER.decode(packet);
-
- PacketHeader packetHeader = packet.getPacketHeader();
- Ipv4Packet ipv4Packet = Ipv4Packet.parse(packet.getPacketData());
-
- if (ipv4Packet.getProtocol() == Constants.PROTOCOL_TCP) {
- tcpPacket = TcpPacket.parse(ipv4Packet);
-
- }
-
- if (ipv4Packet.getProtocol() == Constants.PROTOCOL_UDP) {
-
- Buffer packetDataBuffer = ipv4Packet.getData();
- sourcePort = packetDataBuffer.getUnsignedShort();
- destinationPort = packetDataBuffer.getUnsignedShort();
-
- udpPacket = new UdpPacket(ipv4Packet, sourcePort, destinationPort);
-
- udpPacket.setLength(packetDataBuffer.getUnsignedShort());
- udpPacket.setChecksum(packetDataBuffer.getUnsignedShort());
- packetDataBuffer.discardReadBytes();
- udpPacket.setData(packetDataBuffer);
- }
-
- packetInfoList.add(new PacketInfo(globalHeader, packetHeader, packet,
- ipv4Packet, tcpPacket, udpPacket));
- } catch (NegativeArraySizeException ignored) {
- LOG.debug("Ignorable exception while parsing packet.", ignored);
- } catch (EOFException eof) { // $codepro.audit.disable logExceptions
- // Ignore exception and break
- break;
- }
- }
- return packetInfoList;
- }
-
- /**
- * The main method.
- *
- * @param args
- * the arguments
- * @throws IOException
- * Signals that an I/O exception has occurred.
- * @throws InterruptedException
- * the interrupted exception
- */
- public static void main(String[] args) throws IOException,
- InterruptedException {
-
- double totalIterations = 1000000;
- double parallelism = 64;
- double targetEvents = 1000000;
-
- File fin = new File("/Users/sheetal/Downloads/udp.pcap");
- File fout = new File(fin.getAbsolutePath() + ".parsed");
- byte[] pcapBytes = FileUtils.readFileToByteArray(fin);
- long startTime = System.currentTimeMillis();
- for (int i = 0; i < totalIterations; i++) {
- List<PacketInfo> list = parse(pcapBytes);
-
- for (PacketInfo packetInfo : list) {
- // FileUtils.writeStringToFile(fout, packetInfo.getJsonDoc(), true);
- // FileUtils.writeStringToFile(fout, "\n", true);
- // System.out.println(packetInfo.getJsonDoc());
- }
- }
- long endTime = System.currentTimeMillis();
-
- System.out.println("Time taken to process " + totalIterations + " events :"
- + (endTime - startTime) + " milliseconds");
-
- System.out
- .println("With parallelism of "
- + parallelism
- + " estimated time to process "
- + targetEvents
- + " events: "
- + (((((endTime - startTime) / totalIterations) * targetEvents) / parallelism) / 1000)
- + " seconds");
- System.out.println("With parallelism of " + parallelism
- + " estimated # of events per second: "
- + ((parallelism * 1000 * totalIterations) / (endTime - startTime))
- + " events");
- System.out.println("Expected Parallelism to process " + targetEvents
- + " events in a second: "
- + (targetEvents / ((1000 * totalIterations) / (endTime - startTime))));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/AbstractCIFAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/AbstractCIFAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/AbstractCIFAdapter.java
index 1eff2eb..57c60e5 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/AbstractCIFAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/AbstractCIFAdapter.java
@@ -38,4 +38,8 @@ public abstract class AbstractCIFAdapter implements EnrichmentAdapter,Serializab
abstract public String enrichByDomain(String metadata);
abstract public String enrichByEmail(String metadata);
+ @Override
+ public void cleanup() {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
index 33b0350..bfe9ef6 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
@@ -18,9 +18,11 @@
package org.apache.metron.enrichment.adapters.cif;
import java.io.IOException;
+import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
import org.json.simple.JSONObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -33,7 +35,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.log4j.Logger;
@SuppressWarnings("unchecked")
-public class CIFHbaseAdapter extends AbstractCIFAdapter {
+public class CIFHbaseAdapter implements EnrichmentAdapter<String>,Serializable {
private static final long serialVersionUID = 1L;
private String _tableName;
@@ -107,19 +109,23 @@ public class CIFHbaseAdapter extends AbstractCIFAdapter {
return false;
}
- @Override
+
public String enrichByIP(String metadata) {
return null;
}
- @Override
+
public String enrichByDomain(String metadata) {
return null;
}
- @Override
+
public String enrichByEmail(String metadata) {
return null;
}
+ @Override
+ public void cleanup() {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/AbstractGeoAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/AbstractGeoAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/AbstractGeoAdapter.java
deleted file mode 100644
index e0ca1cd..0000000
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/AbstractGeoAdapter.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.enrichment.adapters.geo;
-
-import java.io.Serializable;
-import java.net.InetAddress;
-
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.metron.enrichment.common.GenericEnrichmentBolt;
-import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-
-@SuppressWarnings("serial")
-public abstract class AbstractGeoAdapter implements EnrichmentAdapter,
- Serializable {
-
- protected static final Logger _LOG = LoggerFactory
- .getLogger(GenericEnrichmentBolt.class);
-
- abstract public JSONObject enrich(String metadata);
-
- abstract public boolean initializeAdapter();
-
- /**
- * Check if we can reach the IP where geo data is storred
- *
- * @param ip
- * - ip of geo database
- * @param timeout
- * - timeout for a connection attempt
- * @return - True if can connect, False if cannot
- * @throws Exception
- */
- public boolean checkIfReachable(String ip, int timeout) throws Exception {
- boolean reachable = InetAddress.getByName(ip).isReachable(timeout);
-
- if (!reachable)
- return false;
-
- return true;
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
new file mode 100644
index 0000000..89721a7
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
@@ -0,0 +1,54 @@
+package org.apache.metron.enrichment.adapters.geo;
+
+import org.apache.commons.validator.routines.InetAddressValidator;
+import org.apache.metron.enrichment.adapters.jdbc.JdbcAdapter;
+import org.json.simple.JSONObject;
+
+import java.net.InetAddress;
+import java.sql.ResultSet;
+
+public class GeoAdapter extends JdbcAdapter {
+
+ private InetAddressValidator ipvalidator = new InetAddressValidator();
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public JSONObject enrich(String value) {
+ JSONObject enriched = new JSONObject();
+ try {
+ InetAddress addr = InetAddress.getByName(value);
+ if (addr.isAnyLocalAddress() || addr.isLoopbackAddress()
+ || addr.isSiteLocalAddress() || addr.isMulticastAddress()
+ || !ipvalidator.isValidInet4Address(value)) {
+ return new JSONObject();
+ }
+ String locidQuery = "select IPTOLOCID(\"" + value
+ + "\") as ANS";
+ ResultSet resultSet = statement.executeQuery(locidQuery);
+ String locid = null;
+ if (resultSet.next()) {
+ locid = resultSet.getString("ANS");
+ }
+ resultSet.close();
+ if (locid == null) return new JSONObject();
+ String geoQuery = "select * from location where locID = " + locid;
+ resultSet = statement.executeQuery(geoQuery);
+ if (resultSet.next()) {
+ enriched.put("locID", resultSet.getString("locID"));
+ enriched.put("country", resultSet.getString("country"));
+ enriched.put("city", resultSet.getString("city"));
+ enriched.put("postalCode", resultSet.getString("postalCode"));
+ enriched.put("latitude", resultSet.getString("latitude"));
+ enriched.put("longitude", resultSet.getString("longitude"));
+ enriched.put("dmaCode", resultSet.getString("dmaCode"));
+ enriched.put("location_point", enriched.get("longitude") + "," + enriched.get("latitude"));
+ }
+ resultSet.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ _LOG.error("Enrichment failure: " + e);
+ return new JSONObject();
+ }
+ return enriched;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapter.java
deleted file mode 100644
index 91f2fac..0000000
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapter.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.enrichment.adapters.geo;
-
-import java.net.InetAddress;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
-
-import org.apache.commons.validator.routines.InetAddressValidator;
-import org.json.simple.JSONObject;
-
-@SuppressWarnings("serial")
-public class GeoMysqlAdapter extends AbstractGeoAdapter {
-
- private Connection connection = null;
- private Statement statement = null;
- private String _ip;
- private String _username;
- private String _password;
- private String _tablename;
- private InetAddressValidator ipvalidator = new InetAddressValidator();
-
- public GeoMysqlAdapter(String ip, int port, String username,
- String password, String tablename) {
- try {
- _ip = InetAddress.getByName(ip).getHostAddress();
-
- boolean reachable = checkIfReachable(ip, 500);
-
- if (!reachable)
- throw new Exception("Unable to reach IP " + _ip
- + " with username " + _username + " and password "
- + _password + " accessing table name " + _tablename);
-
- } catch (Exception e) {
- _LOG.error("Environment misconfigured, cannot reach MYSQL server....");
- e.printStackTrace();
- }
-
- _username = username;
- _password = password;
- _tablename = tablename;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public JSONObject enrich(String metadata) {
-
- ResultSet resultSet = null;
-
- try {
-
- _LOG.trace("[Metron] Received metadata: " + metadata);
-
- InetAddress addr = InetAddress.getByName(metadata);
-
- if (addr.isAnyLocalAddress() || addr.isLoopbackAddress()
- || addr.isSiteLocalAddress() || addr.isMulticastAddress()
- || !ipvalidator.isValidInet4Address(metadata)) {
- _LOG.trace("[Metron] Not a remote IP: " + metadata);
- _LOG.trace("[Metron] Returning enrichment: " + "{}");
-
- return new JSONObject();
- }
-
- _LOG.trace("[Metron] Is a valid remote IP: " + metadata);
-
- statement = connection.createStatement(
- ResultSet.TYPE_SCROLL_INSENSITIVE,
- ResultSet.CONCUR_READ_ONLY);
- String locid_query = "select IPTOLOCID(\"" + metadata
- + "\") as ANS";
- resultSet = statement.executeQuery(locid_query);
-
- if (resultSet == null)
- throw new Exception("Invalid result set for metadata: "
- + metadata + ". Query run was: " + locid_query);
-
- resultSet.last();
- int size = resultSet.getRow();
-
- if (size == 0)
- throw new Exception("No result returned for: " + metadata
- + ". Query run was: " + locid_query);
-
- resultSet.beforeFirst();
- resultSet.next();
-
- String locid = null;
- locid = resultSet.getString("ANS");
-
- if (locid == null)
- throw new Exception("Invalid location id for: " + metadata
- + ". Query run was: " + locid_query);
-
- String geo_query = "select * from location where locID = " + locid
- + ";";
- resultSet = statement.executeQuery(geo_query);
-
- if (resultSet == null)
- throw new Exception(
- "Invalid result set for metadata and locid: "
- + metadata + ", " + locid + ". Query run was: "
- + geo_query);
-
- resultSet.last();
- size = resultSet.getRow();
-
- if (size == 0)
- throw new Exception(
- "No result id returned for metadata and locid: "
- + metadata + ", " + locid + ". Query run was: "
- + geo_query);
-
- resultSet.beforeFirst();
- resultSet.next();
-
- JSONObject jo = new JSONObject();
- jo.put("locID", resultSet.getString("locID"));
- jo.put("country", resultSet.getString("country"));
- jo.put("city", resultSet.getString("city"));
- jo.put("postalCode", resultSet.getString("postalCode"));
- jo.put("latitude", resultSet.getString("latitude"));
- jo.put("longitude", resultSet.getString("longitude"));
- jo.put("dmaCode", resultSet.getString("dmaCode"));
- jo.put("locID", resultSet.getString("locID"));
-
- jo.put("location_point", jo.get("longitude") + "," + jo.get("latitude"));
-
- _LOG.debug("Returning enrichment: " + jo);
-
- return jo;
-
- } catch (Exception e) {
- e.printStackTrace();
- _LOG.error("Enrichment failure: " + e);
- return new JSONObject();
- }
- }
-
- @Override
- public boolean initializeAdapter() {
-
- _LOG.info("[Metron] Initializing MysqlAdapter....");
-
- try {
-
- Class.forName("com.mysql.jdbc.Driver");
- connection = DriverManager.getConnection("jdbc:mysql://" + _ip
- + "/" + _tablename + "?user=" + _username + "&password="
- + _password);
-
- connection.setReadOnly(true);
-
- if (!connection.isValid(0))
- throw new Exception("Invalid connection string....");
-
- _LOG.info("[Metron] Set JDBC connection....");
-
- return true;
- } catch (Exception e) {
- e.printStackTrace();
- _LOG.error("[Metron] JDBC connection failed....");
-
- return false;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/AbstractHostAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/AbstractHostAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/AbstractHostAdapter.java
index f8f7257..076594d 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/AbstractHostAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/AbstractHostAdapter.java
@@ -25,7 +25,8 @@ import org.slf4j.LoggerFactory;
import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-public abstract class AbstractHostAdapter implements EnrichmentAdapter,Serializable{
+public abstract class AbstractHostAdapter implements EnrichmentAdapter<String>,
+ Serializable{
/**
* Adapter to attach reputation information to the telemetry message
@@ -37,4 +38,8 @@ public abstract class AbstractHostAdapter implements EnrichmentAdapter,Serializa
abstract public boolean initializeAdapter();
abstract public JSONObject enrich(String metadata);
+ @Override
+ public void cleanup() {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java
new file mode 100644
index 0000000..260d878
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java
@@ -0,0 +1,50 @@
+package org.apache.metron.enrichment.adapters.host;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Created by rmerriman on 2/2/16.
+ */
+public class HostFromJSONListAdapter extends AbstractHostAdapter {
+
+ Map<String, JSONObject> _known_hosts = new HashMap<>();
+
+ public HostFromJSONListAdapter(String jsonList) {
+ JSONArray jsonArray = (JSONArray) JSONValue.parse(jsonList);
+ Iterator jsonArrayIterator = jsonArray.iterator();
+ while(jsonArrayIterator.hasNext()) {
+ JSONObject jsonObject = (JSONObject) jsonArrayIterator.next();
+ String host = (String) jsonObject.remove("ip");
+ _known_hosts.put(host, jsonObject);
+ }
+ }
+
+ @Override
+ public boolean initializeAdapter()
+ {
+
+ if(_known_hosts.size() > 0)
+ return true;
+ else
+ return false;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public JSONObject enrich(String metadata) {
+
+
+ if(!_known_hosts.containsKey(metadata))
+ return new JSONObject();
+
+ JSONObject enrichment = new JSONObject();
+ enrichment.put("known_info", _known_hosts.get(metadata));
+ return enrichment;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/BaseJdbcConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/BaseJdbcConfig.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/BaseJdbcConfig.java
new file mode 100644
index 0000000..f5ba630
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/BaseJdbcConfig.java
@@ -0,0 +1,53 @@
+package org.apache.metron.enrichment.adapters.jdbc;
+
+import java.io.Serializable;
+
+public abstract class BaseJdbcConfig implements JdbcConfig, Serializable {
+
+ protected String host;
+ protected int port = -1;
+ protected String username;
+ protected String password;
+ protected String table = "";
+
+ @Override
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public void setTable(String table) {
+ this.table = table;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
new file mode 100644
index 0000000..ca4f8ea
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
@@ -0,0 +1,61 @@
+package org.apache.metron.enrichment.adapters.jdbc;
+
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.sql.*;
+
+public abstract class JdbcAdapter implements EnrichmentAdapter<String>,
+ Serializable {
+
+ protected static final Logger _LOG = LoggerFactory
+ .getLogger(JdbcAdapter.class);
+
+ protected Connection connection;
+ protected Statement statement;
+
+ private JdbcConfig config;
+ private String host;
+
+ public JdbcAdapter withJdbcConfig(JdbcConfig config) {
+ this.config = config;
+ this.host = config.getHost();
+ return this;
+ }
+
+ @Override
+ public boolean initializeAdapter() {
+ try {
+ if (!InetAddress.getByName(host).isReachable(500)) {
+ throw new Exception("Unable to reach host " + host);
+ }
+ Class.forName(config.getClassName());
+ connection = DriverManager.getConnection(config.getJdbcUrl());
+ connection.setReadOnly(true);
+ if (!connection.isValid(0))
+ throw new Exception("Invalid connection string....");
+ statement = connection.createStatement(
+ ResultSet.TYPE_SCROLL_INSENSITIVE,
+ ResultSet.CONCUR_READ_ONLY);
+ return true;
+ } catch (Exception e) {
+ e.printStackTrace();
+ _LOG.error("[Metron] JDBC connection failed....", e);
+
+ return false;
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ try {
+ if (statement != null) statement.close();
+ if (connection != null) connection.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcConfig.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcConfig.java
new file mode 100644
index 0000000..cbd6f66
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcConfig.java
@@ -0,0 +1,9 @@
+package org.apache.metron.enrichment.adapters.jdbc;
+
+public interface JdbcConfig {
+
+ public String getClassName();
+ public String getJdbcUrl();
+ public String getHost();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/MySqlConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/MySqlConfig.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/MySqlConfig.java
new file mode 100644
index 0000000..9b8ee79
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/MySqlConfig.java
@@ -0,0 +1,22 @@
+package org.apache.metron.enrichment.adapters.jdbc;
+
+public class MySqlConfig extends BaseJdbcConfig {
+
+ @Override
+ public String getClassName() {
+ return "com.mysql.jdbc.Driver";
+ }
+
+ @Override
+ public String getJdbcUrl() {
+ StringBuilder url = new StringBuilder();
+ url.append("jdbc:mysql://").append(host);
+ if (port > 0) {
+ url.append(":").append(port);
+ }
+ url.append("/").append(table);
+ url.append("?user=").append(username);
+ url.append("&password=").append(password);
+ return url.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/AbstractThreatAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/AbstractThreatAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/AbstractThreatAdapter.java
index d66a7dc..d23a639 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/AbstractThreatAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/AbstractThreatAdapter.java
@@ -33,4 +33,8 @@ public abstract class AbstractThreatAdapter implements EnrichmentAdapter,Seriali
abstract public boolean initializeAdapter();
+ @Override
+ public void cleanup() {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
index b39cbe5..cfd47c8 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
@@ -18,11 +18,13 @@
package org.apache.metron.enrichment.adapters.threat;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
@@ -36,10 +38,14 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.log4j.Logger;
+import org.slf4j.LoggerFactory;
@SuppressWarnings("unchecked")
-public class ThreatHbaseAdapter extends AbstractThreatAdapter {
+public class ThreatHbaseAdapter implements EnrichmentAdapter<String>,
+ Serializable {
+ protected static final org.slf4j.Logger LOG = LoggerFactory
+ .getLogger(ThreatHbaseAdapter.class);
private static final long serialVersionUID = 1L;
private String _tableName;
private HTableInterface table;
@@ -124,6 +130,8 @@ public class ThreatHbaseAdapter extends AbstractThreatAdapter {
return false;
}
-
+ @Override
+ public void cleanup() {
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/AbstractWhoisAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/AbstractWhoisAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/AbstractWhoisAdapter.java
deleted file mode 100644
index 45cf5a0..0000000
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/AbstractWhoisAdapter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.enrichment.adapters.whois;
-
-import java.io.Serializable;
-
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-
-public abstract class AbstractWhoisAdapter implements EnrichmentAdapter,Serializable{
-
- /**
- *
- */
- private static final long serialVersionUID = 8280523289446309728L;
- protected static final Logger LOG = LoggerFactory
- .getLogger(AbstractWhoisAdapter.class);
-
- abstract public boolean initializeAdapter();
- abstract public JSONObject enrich(String metadata);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
index 43f4149..bbcd0bd 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
@@ -18,6 +18,7 @@
package org.apache.metron.enrichment.adapters.whois;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
@@ -28,16 +29,19 @@ import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
import org.json.simple.JSONObject;
import com.google.common.base.Joiner;
import org.apache.metron.tldextractor.BasicTldExtractor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class WhoisHBaseAdapter extends AbstractWhoisAdapter {
+public class WhoisHBaseAdapter implements EnrichmentAdapter<String>,
+ Serializable {
- /**
- *
- */
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(WhoisHBaseAdapter.class);
private static final long serialVersionUID = 3371873619030870389L;
private HTableInterface table;
private String _table_name;
@@ -123,8 +127,13 @@ public class WhoisHBaseAdapter extends AbstractWhoisAdapter {
return output;
}
-
-// private String format(String input) {
+
+ @Override
+ public void cleanup() {
+
+ }
+
+ // private String format(String input) {
// String output = input;
// String[] tokens = input.split("\\.");
// if(tokens.length > 2) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
new file mode 100644
index 0000000..f16c2fb
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
@@ -0,0 +1,57 @@
+package org.apache.metron.enrichment.bolt;
+
+import backtype.storm.task.TopologyContext;
+import org.apache.metron.bolt.JoinBolt;
+import org.apache.metron.domain.Enrichment;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
+
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(EnrichmentJoinBolt.class);
+
+ protected List<Enrichment> enrichments;
+
+ /**
+ * @param enrichments A class for sending tuples to enrichment bolt
+ * @return Instance of this class
+ */
+ public EnrichmentJoinBolt withEnrichments(List<Enrichment> enrichments) {
+ this.enrichments = enrichments;
+ return this;
+ }
+
+ @Override
+ public void prepare(Map map, TopologyContext topologyContext) {
+
+ }
+
+ @Override
+ public Set<String> getStreamIds() {
+ Set<String> streamIds = new HashSet<>();
+ for(Enrichment enrichment: enrichments) {
+ streamIds.add(enrichment.getName());
+ }
+ return streamIds;
+ }
+
+
+ @Override
+ public JSONObject joinValues(Map<String, JSONObject> streamValueMap) {
+ JSONObject message = new JSONObject();
+ message.put("message", streamValueMap.get("message"));
+ JSONObject enrichment = new JSONObject();
+ for(String streamId: getStreamIds()) {
+ enrichment.put(streamId, streamValueMap.get(streamId));
+ }
+ message.put("enrichment", enrichment);
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
new file mode 100644
index 0000000..1563652
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.bolt;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import backtype.storm.topology.base.BaseRichBolt;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.metron.domain.Enrichment;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.metron.helpers.topology.ErrorGenerator;
+
+/**
+ * Uses an adapter to enrich telemetry messages with additional metadata
+ * entries. For a list of available enrichment adapters see
+ * org.apache.metron.enrichment.adapters.
+ * <p/>
+ * At the moment of release the following enrichment adapters are available:
+ * <p/>
+ * <ul>
+ * <p/>
+ * <li>geo = attaches geo coordinates to IPs
+ * <li>whois = attaches whois information to domains
+ * <li>host = attaches reputation information to known hosts
+ * <li>CIF = attaches information from threat intelligence feeds
+ * <ul>
+ * <p/>
+ * <p/>
+ * Enrichments are optional
+ **/
+
+@SuppressWarnings({"rawtypes", "serial"})
+public class GenericEnrichmentBolt extends BaseRichBolt {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(GenericEnrichmentBolt.class);
+ private OutputCollector collector;
+
+
+ protected String streamId;
+ protected Enrichment<EnrichmentAdapter> enrichment;
+ protected EnrichmentAdapter adapter;
+ protected transient CacheLoader<String, JSONObject> loader;
+ protected transient LoadingCache<String, JSONObject> cache;
+ protected Long maxCacheSize;
+ protected Long maxTimeRetain;
+
+
+ /**
+ * @param enrichment Object holding enrichment metadata
+ * @return Instance of this class
+ */
+
+ public GenericEnrichmentBolt withEnrichment
+ (Enrichment<EnrichmentAdapter> enrichment) {
+ this.streamId = enrichment.getName();
+ this.enrichment = enrichment;
+ this.adapter = this.enrichment.getAdapter();
+ return this;
+ }
+
+ /**
+ * @param maxCacheSize Maximum size of cache before flushing
+ * @return Instance of this class
+ */
+
+ public GenericEnrichmentBolt withMaxCacheSize(long maxCacheSize) {
+ this.maxCacheSize = maxCacheSize;
+ return this;
+ }
+
+ /**
+ * @param maxTimeRetain Maximum time to retain cached entry before expiring
+ * @return Instance of this class
+ */
+
+ public GenericEnrichmentBolt withMaxTimeRetain(long maxTimeRetain) {
+ this.maxTimeRetain = maxTimeRetain;
+ return this;
+ }
+
+ @Override
+ public void prepare(Map conf, TopologyContext topologyContext,
+ OutputCollector collector) {
+ this.collector = collector;
+ if (this.enrichment == null)
+ throw new IllegalStateException("enrichment must be specified");
+ if (this.maxCacheSize == null)
+ throw new IllegalStateException("MAX_CACHE_SIZE_OBJECTS_NUM must be specified");
+ if (this.maxTimeRetain == null)
+ throw new IllegalStateException("MAX_TIME_RETAIN_MINUTES must be specified");
+ if (this.adapter == null)
+ throw new IllegalStateException("Adapter must be specified");
+ if (this.enrichment.getFields() == null)
+ throw new IllegalStateException(
+ "Fields to be enriched must be specified");
+ loader = new CacheLoader<String, JSONObject>() {
+ public JSONObject load(String key) throws Exception {
+ return adapter.enrich(key);
+ }
+ };
+ cache = CacheBuilder.newBuilder().maximumSize(maxCacheSize)
+ .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES)
+ .build(loader);
+ boolean success = adapter.initializeAdapter();
+ if (!success) {
+ LOG.error("[Metron] EnrichmentBolt could not initialize adapter");
+ throw new IllegalStateException("Could not initialize adapter...");
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declearer) {
+ declearer.declareStream(streamId, new Fields("key", "message"));
+ declearer.declareStream("error", new Fields("message"));
+ }
+
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void execute(Tuple tuple) {
+ String key = tuple.getStringByField("key");
+ JSONObject rawMessage = (JSONObject) tuple.getValueByField("message");
+ JSONObject enrichedMessage = new JSONObject();
+ try {
+ if (rawMessage == null || rawMessage.isEmpty())
+ throw new Exception("Could not parse binary stream to JSON");
+ if (key == null)
+ throw new Exception("Key is not valid");
+ for (String field : enrichment.getFields()) {
+ JSONObject enrichedField = new JSONObject();
+ String value = (String) rawMessage.get(field);
+ if (value != null && value.length() != 0) {
+ enrichedField = cache.getUnchecked(value);
+ if (enrichedField == null)
+ throw new Exception("[Metron] Could not enrich string: "
+ + value);
+ }
+ enrichedMessage.put(field, enrichedField);
+ }
+ if (!enrichedMessage.isEmpty()) {
+ collector.emit(streamId, new Values(key, enrichedMessage));
+ }
+ } catch (Exception e) {
+ LOG.error("[Metron] Unable to enrich message: " + rawMessage);
+ JSONObject error = ErrorGenerator.generateErrorMessage("Enrichment problem: " + rawMessage, e);
+ if (key != null) {
+ collector.emit(streamId, new Values(key, enrichedMessage));
+ }
+ collector.emit("error", new Values(error));
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ adapter.cleanup();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/common/AbstractEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/common/AbstractEnrichmentBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/common/AbstractEnrichmentBolt.java
deleted file mode 100644
index d3f2fb7..0000000
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/common/AbstractEnrichmentBolt.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.common;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.base.BaseRichBolt;
-
-import com.codahale.metrics.Counter;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-import org.apache.metron.metrics.MetricReporter;
-
-@SuppressWarnings("rawtypes")
-public abstract class AbstractEnrichmentBolt extends BaseRichBolt {
- /**
- * Abstract enrichment bolt
- */
- private static final long serialVersionUID = -6710596708304282838L;
-
- protected static final Logger LOG = LoggerFactory
- .getLogger(AbstractEnrichmentBolt.class);
-
- protected OutputCollector _collector;
- protected String _OutputFieldName;
-
- protected String _enrichment_tag;
- protected Long _MAX_CACHE_SIZE_OBJECTS_NUM;
- protected Long _MAX_TIME_RETAIN_MINUTES;
-
- // JSON Keys to be enriched
- protected List<String> _jsonKeys;
- protected EnrichmentAdapter _adapter;
- protected MetricReporter _reporter;
-
- protected transient CacheLoader<String, JSONObject> loader;
- protected transient LoadingCache<String, JSONObject> cache;
-
- protected Counter ackCounter, emitCounter, failCounter;
-
- protected void registerCounters() {
-
- String ackString = _adapter.getClass().getSimpleName() + ".ack";
-
- String emitString = _adapter.getClass().getSimpleName() + ".emit";
-
- String failString = _adapter.getClass().getSimpleName() + ".fail";
-
- ackCounter = _reporter.registerCounter(ackString);
- emitCounter = _reporter.registerCounter(emitString);
- failCounter = _reporter.registerCounter(failString);
-
- }
-
- public final void prepare(Map conf, TopologyContext topologyContext,
- OutputCollector collector) {
- _collector = collector;
-
- if (this._OutputFieldName == null)
- throw new IllegalStateException("OutputFieldName must be specified");
- if (this._enrichment_tag == null)
- throw new IllegalStateException("enrichment_tag must be specified");
- if (this._MAX_CACHE_SIZE_OBJECTS_NUM == null)
- throw new IllegalStateException("MAX_CACHE_SIZE_OBJECTS_NUM must be specified");
- if (this._MAX_TIME_RETAIN_MINUTES == null)
- throw new IllegalStateException("MAX_TIME_RETAIN_MINUTES must be specified");
- if (this._adapter == null)
- throw new IllegalStateException("Adapter must be specified");
- if (this._jsonKeys == null)
- throw new IllegalStateException(
- "JSON Keys to be enriched, must be specified");
-
- loader = new CacheLoader<String, JSONObject>() {
- public JSONObject load(String key) throws Exception {
- return _adapter.enrich(key);
- }
- };
-
- cache = CacheBuilder.newBuilder().maximumSize(_MAX_CACHE_SIZE_OBJECTS_NUM)
- .expireAfterWrite(_MAX_TIME_RETAIN_MINUTES, TimeUnit.MINUTES)
- .build(loader);
-
- boolean success = _adapter.initializeAdapter();
-
- if (!success) {
- LOG.error("[Metron] EnrichmentBolt could not initialize adapter");
- throw new IllegalStateException("Could not initialize adapter...");
- }
-
- try {
- doPrepare(conf, topologyContext, collector);
- } catch (IOException e) {
- LOG.error("[Metron] Counld not initialize...");
- e.printStackTrace();
- }
-
- }
-
- abstract void doPrepare(Map conf, TopologyContext topologyContext,
- OutputCollector collector) throws IOException;
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/common/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/common/GenericEnrichmentBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/common/GenericEnrichmentBolt.java
deleted file mode 100644
index 5915039..0000000
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/common/GenericEnrichmentBolt.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.enrichment.common;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.configuration.Configuration;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-import org.apache.metron.helpers.topology.ErrorGenerator;
-import org.apache.metron.json.serialization.JSONEncoderHelper;
-import org.apache.metron.metrics.MetricReporter;
-
-/**
- * Uses an adapter to enrich telemetry messages with additional metadata
- * entries. For a list of available enrichment adapters see
- * org.apache.metron.enrichment.adapters.
- * <p>
- * At the moment of release the following enrichment adapters are available:
- * <p>
- * <ul>
- *
- * <li>geo = attaches geo coordinates to IPs
- * <li>whois = attaches whois information to domains
- * <li>host = attaches reputation information to known hosts
- * <li>CIF = attaches information from threat intelligence feeds
- * <ul>
- * <p>
- * <p>
- * Enrichments are optional
- **/
-
-@SuppressWarnings({ "rawtypes", "serial" })
-public class GenericEnrichmentBolt extends AbstractEnrichmentBolt {
-
- private static final Logger LOG = LoggerFactory
- .getLogger(GenericEnrichmentBolt.class);
- private JSONObject metricConfiguration;
-
- /**
- * @param adapter
- * Adapter for doing the enrichment
- * @return Instance of this class
- */
-
- public GenericEnrichmentBolt withAdapter(EnrichmentAdapter adapter) {
- _adapter = adapter;
- return this;
- }
-
- /**
- * @param OutputFieldName
- * Fieldname of the output tuple for this bolt
- * @return Instance of this class
- */
-
- public GenericEnrichmentBolt withOutputFieldName(String OutputFieldName) {
- _OutputFieldName = OutputFieldName;
- return this;
- }
-
- /**
- * @param EnrichmentTag
- * Defines what tag the enrichment will be tagged with in the
- * telemetry message
- * @return Instance of this class
- */
-
- public GenericEnrichmentBolt withEnrichmentTag(String EnrichmentTag) {
- _enrichment_tag = EnrichmentTag;
- return this;
- }
-
- /**
- * @param MAX_CACHE_SIZE_OBJECTS_NUM
- * Maximum size of cache before flushing
- * @return Instance of this class
- */
-
- public GenericEnrichmentBolt withMaxCacheSize(long MAX_CACHE_SIZE_OBJECTS_NUM) {
- _MAX_CACHE_SIZE_OBJECTS_NUM = MAX_CACHE_SIZE_OBJECTS_NUM;
- return this;
- }
-
- /**
- * @param MAX_TIME_RETAIN_MINUTES
- * Maximum time to retain cached entry before expiring
- * @return Instance of this class
- */
-
- public GenericEnrichmentBolt withMaxTimeRetain(long MAX_TIME_RETAIN_MINUTES) {
- _MAX_TIME_RETAIN_MINUTES = MAX_TIME_RETAIN_MINUTES;
- return this;
- }
-
- /**
- * @param jsonKeys
- * Keys in the telemetry message that are to be enriched by this
- * bolt
- * @return Instance of this class
- */
-
- public GenericEnrichmentBolt withKeys(List<String> jsonKeys) {
- _jsonKeys = jsonKeys;
- return this;
- }
-
- /**
- * @param config
- * A class for generating custom metrics into graphite
- * @return Instance of this class
- */
-
- public GenericEnrichmentBolt withMetricConfiguration(Configuration config) {
- this.metricConfiguration = JSONEncoderHelper.getJSON(config
- .subset("org.apache.metron.metrics"));
- return this;
- }
-
- @SuppressWarnings("unchecked")
- public void execute(Tuple tuple) {
-
- LOG.trace("[Metron] Starting enrichment");
-
- JSONObject in_json = null;
- String key = null;
-
- try {
-
- key = tuple.getStringByField("key");
- in_json = (JSONObject) tuple.getValueByField("message");
-
- if (in_json == null || in_json.isEmpty())
- throw new Exception("Could not parse binary stream to JSON");
-
- if(key == null)
- throw new Exception("Key is not valid");
-
- LOG.trace("[Metron] Received tuple: " + in_json);
-
- JSONObject message = (JSONObject) in_json.get("message");
-
- if (message == null || message.isEmpty())
- throw new Exception("Could not extract message from JSON: "
- + in_json);
-
- LOG.trace("[Metron] Extracted message: " + message);
-
- for (String jsonkey : _jsonKeys) {
- LOG.trace("[Metron] Processing:" + jsonkey + " within:"
- + message);
-
- String jsonvalue = (String) message.get(jsonkey);
- LOG.trace("[Metron] Processing: " + jsonkey + " -> "
- + jsonvalue);
-
- if (null == jsonvalue) {
- LOG.trace("[Metron] Key " + jsonkey
- + "not present in message " + message);
- continue;
- }
-
- // If the field is empty, no need to enrich
- if ( jsonvalue.length() == 0) {
- continue;
- }
-
- JSONObject enrichment = cache.getUnchecked(jsonvalue);
- LOG.trace("[Metron] Enriched: " + jsonkey + " -> "
- + enrichment);
-
- if (enrichment == null)
- throw new Exception("[Metron] Could not enrich string: "
- + jsonvalue);
-
- if (!in_json.containsKey("enrichment")) {
- in_json.put("enrichment", new JSONObject());
- LOG.trace("[Metron] Starting a string of enrichments");
- }
-
- JSONObject enr1 = (JSONObject) in_json.get("enrichment");
-
- if (enr1 == null)
- throw new Exception("Internal enrichment is empty");
-
- if (!enr1.containsKey(_enrichment_tag)) {
- enr1.put(_enrichment_tag, new JSONObject());
- LOG.trace("[Metron] Starting a new enrichment");
- }
-
- LOG.trace("[Metron] ENR1 is: " + enr1);
-
- JSONObject enr2 = (JSONObject) enr1.get(_enrichment_tag);
- enr2.put(jsonkey, enrichment);
-
- LOG.trace("[Metron] ENR2 is: " + enr2);
-
- enr1.put(_enrichment_tag, enr2);
- in_json.put("enrichment", enr1);
- }
-
- LOG.debug("[Metron] Generated combined enrichment: " + in_json);
-
- _collector.emit("message", new Values(key, in_json));
- _collector.ack(tuple);
-
- if (_reporter != null) {
- emitCounter.inc();
- ackCounter.inc();
- }
- } catch (Exception e) {
-
- LOG.error("[Metron] Unable to enrich message: " + in_json);
- _collector.fail(tuple);
-
- if (_reporter != null) {
- failCounter.inc();
- }
-
- JSONObject error = ErrorGenerator.generateErrorMessage("Enrichment problem: " + in_json, e);
- _collector.emit("error", new Values(error));
- }
-
-
-
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declearer) {
- declearer.declareStream("message", new Fields("key", "message"));
- declearer.declareStream("error", new Fields("message"));
- }
-
- @Override
- void doPrepare(Map conf, TopologyContext topologyContext,
- OutputCollector collector) throws IOException {
- LOG.info("[Metron] Preparing Enrichment Bolt...");
-
- _collector = collector;
-
- try {
- _reporter = new MetricReporter();
- _reporter.initialize(metricConfiguration,
- GenericEnrichmentBolt.class);
- this.registerCounters();
- } catch (Exception e) {
- LOG.info("[Metron] Unable to initialize metrics reporting");
- }
-
- LOG.info("[Metron] Enrichment bolt initialized...");
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapterTest.java b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapterTest.java
index a54dc6e..f5b6a24 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapterTest.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapterTest.java
@@ -19,6 +19,9 @@ package org.apache.metron.enrichment.adapters.geo;
import java.net.URL;
import java.util.Properties;
+import org.apache.metron.enrichment.adapters.jdbc.JdbcAdapter;
+import org.apache.metron.enrichment.adapters.jdbc.JdbcConfig;
+import org.apache.metron.enrichment.adapters.jdbc.MySqlConfig;
import org.json.simple.JSONObject;
import org.apache.metron.test.AbstractSchemaTest;
@@ -35,7 +38,7 @@ import org.junit.Assert;
*/
public class GeoMysqlAdapterTest extends AbstractSchemaTest {
- private static GeoMysqlAdapter geoMySqlAdapter=null;
+ private static JdbcAdapter geoMySqlAdapter=null;
private static boolean connected=false;
/**
@@ -75,7 +78,14 @@ public class GeoMysqlAdapterTest extends AbstractSchemaTest {
System.out.println(getClass().getName()+" Skipping Tests !!Local Mode");
return;//skip tests
}else{
- GeoMysqlAdapterTest.setGeoMySqlAdapter(new GeoMysqlAdapter((String)prop.get("mysql.ip"), (new Integer((String)prop.get("mysql.port"))).intValue(),(String)prop.get("mysql.username"),(String)prop.get("mysql.password"), (String)prop.get("bolt.enrichment.geo.adapter.table")));
+ MySqlConfig mySqlConfig = new MySqlConfig();
+ mySqlConfig.setHost((String)prop.get("mysql.ip"));
+ mySqlConfig.setPort(new Integer((String) prop.get("mysql.port")));
+ mySqlConfig.setUsername((String)prop.get("mysql.username"));
+ mySqlConfig.setPassword((String)prop.get("mysql.password"));
+ mySqlConfig.setTable((String)prop.get("bolt.enrichment.geo.adapter.table"));
+ JdbcAdapter geoAdapter = new GeoAdapter().withJdbcConfig(mySqlConfig);
+ GeoMysqlAdapterTest.setGeoMySqlAdapter(geoAdapter);
connected =geoMySqlAdapter.initializeAdapter();
Assert.assertTrue(connected);
URL schema_url = getClass().getClassLoader().getResource(
@@ -95,7 +105,7 @@ public class GeoMysqlAdapterTest extends AbstractSchemaTest {
}
/**
- * Test method for {@link org.apache.metron.enrichment.adapters.geo.GeoMysqlAdapter#enrich(java.lang.String)}.
+ * Test method for {@link org.apache.metron.enrichment.adapters.geo.GeoAdapter#enrich(java.lang.String)}.
*/
public void testEnrich() {
if(skipTests(this.getMode())){
@@ -123,7 +133,7 @@ public class GeoMysqlAdapterTest extends AbstractSchemaTest {
}
/**
- * Test method for {@link org.apache.metron.enrichment.adapters.geo.GeoMysqlAdapter#initializeAdapter()}.
+ * Test method for {@link org.apache.metron.enrichment.adapters.geo.GeoAdapter#initializeAdapter()}.
*/
public void testInitializeAdapter() {
if(skipTests(this.getMode())){
@@ -135,7 +145,13 @@ public class GeoMysqlAdapterTest extends AbstractSchemaTest {
}
/**
- * Test method for {@link org.apache.metron.enrichment.adapters.geo.GeoMysqlAdapter#GeoMysqlAdapter(java.lang.String, int, java.lang.String, java.lang.String, java.lang.String)}.
+ * Test method for
+ *
+ *
+ *
+ *
+ *
+ * {@link org.apache.metron.enrichment.adapters.geo.GeoAdapter}.
*/
public void testGeoMysqlAdapter() {
if(skipTests(this.getMode())){
@@ -150,7 +166,7 @@ public class GeoMysqlAdapterTest extends AbstractSchemaTest {
* @return the geoMySqlAdapter.
*/
- public static GeoMysqlAdapter getGeoMySqlAdapter() {
+ public static JdbcAdapter getGeoMySqlAdapter() {
return geoMySqlAdapter;
}
@@ -159,7 +175,7 @@ public class GeoMysqlAdapterTest extends AbstractSchemaTest {
* @param geoMySqlAdapter the geoMySqlAdapter.
*/
- public static void setGeoMySqlAdapter(GeoMysqlAdapter geoMySqlAdapter) {
+ public static void setGeoMySqlAdapter(JdbcAdapter geoMySqlAdapter) {
GeoMysqlAdapterTest.geoMySqlAdapter = geoMySqlAdapter;
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/pom.xml b/metron-streaming/Metron-MessageParsers/pom.xml
index d36021f..4de84a2 100644
--- a/metron-streaming/Metron-MessageParsers/pom.xml
+++ b/metron-streaming/Metron-MessageParsers/pom.xml
@@ -29,6 +29,11 @@
<version>${project.parent.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>Metron-EnrichmentAdapters</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${global_storm_version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/PcapParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/PcapParserBolt.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/PcapParserBolt.java
new file mode 100644
index 0000000..49c80d7
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/PcapParserBolt.java
@@ -0,0 +1,27 @@
+package org.apache.metron.bolt;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.json.simple.JSONObject;
+
+import java.util.List;
+
+public class PcapParserBolt extends TelemetryParserBolt {
+
+ @Override
+ public void declareOther(OutputFieldsDeclarer declarer) {
+ declarer.declareStream("raw", new Fields("key", "value", "timestamp") );
+ }
+
+ @Override
+ public void emitOther(Tuple tuple, List<JSONObject> messages) {
+ for(JSONObject message: messages) {
+ String key = (String) message.get("pcap_id");
+ long timestamp = (long) message.get("ts_micro");
+ collector.emit("raw", tuple, new Values(key, tuple.getBinary(0),
+ timestamp));
+ }
+ }
+}