You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/02/10 21:17:40 UTC
[2/4] incubator-metron git commit: METRON-33 Execute Enrichments in
Parallel (merrimanr via cestella) closes apache/incubator-metron#19
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/PcapParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/PcapParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/PcapParser.java
index a0cb8c8..0841762 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/PcapParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/PcapParser.java
@@ -3,11 +3,17 @@ package org.apache.metron.parsing.parsers;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
+import org.apache.metron.parser.interfaces.MessageParser;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
import org.krakenapps.pcap.decoder.ethernet.EthernetDecoder;
import org.krakenapps.pcap.decoder.ethernet.EthernetType;
import org.krakenapps.pcap.decoder.ip.IpDecoder;
@@ -24,42 +30,59 @@ import org.apache.metron.pcap.MetronEthernetDecoder;
import org.apache.metron.pcap.PacketInfo;
import org.apache.metron.pcap.PcapByteInputStream;
-/**
- * The Class PcapParser.
- *
- * @author sheetal
- * @version $Revision: 1.0 $
- */
-public final class PcapParser {
+public class PcapParser implements MessageParser<JSONObject>, Serializable {
- /** The Constant LOG. */
private static final Logger LOG = Logger.getLogger(PcapParser.class);
- /** The ETHERNET_DECODER. */
- private static final EthernetDecoder ETHERNET_DECODER = new MetronEthernetDecoder();
-
- /** The ip decoder. */
- private static final IpDecoder IP_DECODER = new IpDecoder();
-
- // /** The tcp decoder. */
- // private static final TcpDecoder TCP_DECODER = new TcpDecoder(new
- // TcpPortProtocolMapper());
- //
- // /** The udp decoder. */
- // private static final UdpDecoder UDP_DECODER = new UdpDecoder(new
- // UdpPortProtocolMapper());
+ private EthernetDecoder ethernetDecoder;
+ private long timePrecisionDivisor = 1L;
+
+ public PcapParser withTsPrecision(String tsPrecision) {
+ if (tsPrecision.equalsIgnoreCase("MILLI")) {
+ //Convert nanos to millis
+ LOG.info("Configured for MILLI, setting timePrecisionDivisor to 1000000L" );
+ timePrecisionDivisor = 1000000L;
+ } else if (tsPrecision.equalsIgnoreCase("MICRO")) {
+ //Convert nanos to micro
+ LOG.info("Configured for MICRO, setting timePrecisionDivisor to 1000L" );
+ timePrecisionDivisor = 1000L;
+ } else if (tsPrecision.equalsIgnoreCase("NANO")) {
+ //Keep nano as is.
+ LOG.info("Configured for NANO, setting timePrecisionDivisor to 1L" );
+ timePrecisionDivisor = 1L;
+ } else {
+ LOG.info("bolt.parser.ts.precision not set. Default to NANO");
+ timePrecisionDivisor = 1L;
+ }
+ return this;
+ }
- static {
- // IP_DECODER.register(InternetProtocol.TCP, TCP_DECODER);
- // IP_DECODER.register(InternetProtocol.UDP, UDP_DECODER);
- ETHERNET_DECODER.register(EthernetType.IPV4, IP_DECODER);
+ @Override
+ public void init() {
+ ethernetDecoder = new MetronEthernetDecoder();
+ IpDecoder ipDecoder = new IpDecoder();
+ ethernetDecoder.register(EthernetType.IPV4, ipDecoder);
}
- /**
- * Instantiates a new pcap parser.
- */
- private PcapParser() { // $codepro.audit.disable emptyMethod
+ @Override
+ public List<JSONObject> parse(byte[] pcap) {
+ List<JSONObject> messages = new ArrayList<>();
+ List<PacketInfo> packetInfoList = new ArrayList<>();
+ try {
+ packetInfoList = getPacketInfo(pcap);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ for (PacketInfo packetInfo : packetInfoList) {
+ JSONObject message = (JSONObject) JSONValue.parse(packetInfo.getJsonIndexDoc());
+ messages.add(message);
+ }
+ return messages;
+ }
+ @Override
+ public boolean validate(JSONObject message) {
+ return true;
}
/**
@@ -73,7 +96,7 @@ public final class PcapParser {
* @throws IOException
* Signals that an I/O exception has occurred.
*/
- public static List<PacketInfo> parse(byte[] pcap) throws IOException {
+ public List<PacketInfo> getPacketInfo(byte[] pcap) throws IOException {
List<PacketInfo> packetInfoList = new ArrayList<PacketInfo>();
PcapByteInputStream pcapByteInputStream = new PcapByteInputStream(pcap);
@@ -96,7 +119,7 @@ public final class PcapParser {
// LOG.trace("Got packet # " + ++packetCounter);
// LOG.trace(packet.getPacketData());
- ETHERNET_DECODER.decode(packet);
+ ethernetDecoder.decode(packet);
PacketHeader packetHeader = packet.getPacketHeader();
Ipv4Packet ipv4Packet = Ipv4Packet.parse(packet.getPacketData());
@@ -148,13 +171,13 @@ public final class PcapParser {
double totalIterations = 1000000;
double parallelism = 64;
double targetEvents = 1000000;
-
+ PcapParser pcapParser = new PcapParser();
File fin = new File("/Users/sheetal/Downloads/bad_packets/bad_packet_1405988125427.pcap");
File fout = new File(fin.getAbsolutePath() + ".parsed");
byte[] pcapBytes = FileUtils.readFileToByteArray(fin);
long startTime = System.currentTimeMillis();
for (int i = 0; i < totalIterations; i++) {
- List<PacketInfo> list = parse(pcapBytes);
+ List<PacketInfo> list = pcapParser.getPacketInfo(pcapBytes);
for (PacketInfo packetInfo : list) {
System.out.println(packetInfo.getJsonIndexDoc());
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/utils/GrokUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/utils/GrokUtils.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/utils/GrokUtils.java
new file mode 100644
index 0000000..b2b5f7a
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/utils/GrokUtils.java
@@ -0,0 +1,26 @@
+package org.apache.metron.parsing.utils;
+import java.io.Serializable;
+
+import com.google.code.regexp.Pattern;
+
+public class GrokUtils implements Serializable {
+
+ private static final long serialVersionUID = 7465176887422419286L;
+ /**
+ * Extract Grok patter like %{FOO} to FOO, Also Grok pattern with semantic.
+ */
+ public static final Pattern GROK_PATTERN = Pattern.compile(
+ "%\\{" +
+ "(?<name>" +
+ "(?<pattern>[A-z0-9]+)" +
+ "(?::(?<subname>[A-z0-9_:;\\/\\s\\.]+))?" +
+ ")" +
+ "(?:=(?<definition>" +
+ "(?:" +
+ "(?:[^{}]+|\\.+)+" +
+ ")+" +
+ ")" +
+ ")?" +
+ "\\}");
+
+ }
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/utils/ParserUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/utils/ParserUtils.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/utils/ParserUtils.java
new file mode 100644
index 0000000..e1a3e2c
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/utils/ParserUtils.java
@@ -0,0 +1,55 @@
+package org.apache.metron.parsing.utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.json.simple.JSONObject;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Locale;
+import java.util.TimeZone;
+
+public class ParserUtils {
+
+ public static final String PREFIX = "stream2file";
+ public static final String SUFFIX = ".tmp";
+
+ public static File stream2file(InputStream in) throws IOException {
+ final File tempFile = File.createTempFile(PREFIX, SUFFIX);
+ tempFile.deleteOnExit();
+ try (FileOutputStream out = new FileOutputStream(tempFile)) {
+ IOUtils.copy(in, out);
+ }
+ return tempFile;
+ }
+
+ public static Long convertToEpoch(String m, String d, String ts,
+ boolean adjust_timezone) throws ParseException {
+ d = d.trim();
+ if (d.length() <= 2) {
+ d = "0" + d;
+ }
+ Date date = new SimpleDateFormat("MMM", Locale.ENGLISH).parse(m);
+ Calendar cal = Calendar.getInstance();
+ cal.setTime(date);
+ String month = String.valueOf(cal.get(Calendar.MONTH));
+ int year = Calendar.getInstance().get(Calendar.YEAR);
+ if (month.length() <= 2) {
+ month = "0" + month;
+ }
+ String coglomerated_ts = year + "-" + month + "-" + d + " " + ts;
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ if (adjust_timezone) {
+ sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
+ }
+ date = sdf.parse(coglomerated_ts);
+ long timeInMillisSinceEpoch = date.getTime();
+ return timeInMillisSinceEpoch;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java
index 2f643d7..acd919c 100644
--- a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java
+++ b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java
@@ -8,7 +8,7 @@ import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
-
+import org.junit.Assert;
import org.apache.metron.parsing.parsers.BasicBroParser;
public class BasicBroParserTest extends TestCase {
@@ -21,12 +21,12 @@ public class BasicBroParserTest extends TestCase {
/**
* Constructs a new <code>BasicBroParserTest</code> instance.
- *
+ *
* @throws Exception
*/
public BasicBroParserTest() throws Exception {
broParser = new BasicBroParser();
- jsonParser = new JSONParser();
+ jsonParser = new JSONParser();
}
public void testUnwrappedBroMessage() throws ParseException {
@@ -34,91 +34,91 @@ public class BasicBroParserTest extends TestCase {
JSONObject rawJson = (JSONObject)jsonParser.parse(rawMessage);
- JSONObject broJson = broParser.parse(rawMessage.getBytes());
-
- assertEquals(broJson.get("timestamp"), Long.parseLong(rawJson.get("timestamp").toString()));
- assertEquals(broJson.get("ip_src_addr").toString(), rawJson.get("source_ip").toString());
- assertEquals(broJson.get("ip_dst_addr").toString(), rawJson.get("dest_ip").toString());
- assertEquals(broJson.get("ip_src_port"), rawJson.get("source_port"));
- assertEquals(broJson.get("ip_dst_port"), rawJson.get("dest_port"));
- assertEquals(broJson.get("uid").toString(), rawJson.get("uid").toString());
- assertEquals(broJson.get("trans_id").toString(), rawJson.get("trans_id").toString());
- assertEquals(broJson.get("sensor").toString(), rawJson.get("sensor").toString());
- assertEquals(broJson.get("protocol").toString(), rawJson.get("type").toString());
- assertEquals(broJson.get("rcode").toString(), rawJson.get("rcode").toString());
- assertEquals(broJson.get("rcode_name").toString(), rawJson.get("rcode_name").toString());
- assertTrue(broJson.get("original_string").toString().startsWith("DNS"));
+ JSONObject broJson = broParser.parse(rawMessage.getBytes()).get(0);
+
+ Assert.assertEquals(broJson.get("timestamp"), Long.parseLong(rawJson.get("timestamp").toString()));
+ Assert.assertEquals(broJson.get("ip_src_addr").toString(), rawJson.get("source_ip").toString());
+ Assert.assertEquals(broJson.get("ip_dst_addr").toString(), rawJson.get("dest_ip").toString());
+ Assert.assertEquals(broJson.get("ip_src_port"), rawJson.get("source_port"));
+ Assert.assertEquals(broJson.get("ip_dst_port"), rawJson.get("dest_port"));
+ Assert.assertEquals(broJson.get("uid").toString(), rawJson.get("uid").toString());
+ Assert.assertEquals(broJson.get("trans_id").toString(), rawJson.get("trans_id").toString());
+ Assert.assertEquals(broJson.get("sensor").toString(), rawJson.get("sensor").toString());
+ Assert.assertEquals(broJson.get("protocol").toString(), rawJson.get("type").toString());
+ Assert.assertEquals(broJson.get("rcode").toString(), rawJson.get("rcode").toString());
+ Assert.assertEquals(broJson.get("rcode_name").toString(), rawJson.get("rcode_name").toString());
+ Assert.assertTrue(broJson.get("original_string").toString().startsWith("DNS"));
}
@SuppressWarnings("rawtypes")
public void testHttpBroMessage() throws ParseException {
String rawMessage = "{\"http\":{\"ts\":1402307733473,\"uid\":\"CTo78A11g7CYbbOHvj\",\"id.orig_h\":\"192.249.113.37\",\"id.orig_p\":58808,\"id.resp_h\":\"72.163.4.161\",\"id.resp_p\":80,\"trans_depth\":1,\"method\":\"GET\",\"host\":\"www.cisco.com\",\"uri\":\"/\",\"user_agent\":\"curl/7.22.0 (x86_64-pc-linux-gnu) libcurl/7.22.0 OpenSSL/1.0.1 zlib/1.2.3.4 libidn/1.23 librtmp/2.3\",\"request_body_len\":0,\"response_body_len\":25523,\"status_code\":200,\"status_msg\":\"OK\",\"tags\":[],\"resp_fuids\":[\"FJDyMC15lxUn5ngPfd\"],\"resp_mime_types\":[\"text/html\"]}}";
-
+
Map rawMessageMap = (Map) jsonParser.parse(rawMessage);
JSONObject rawJson = (JSONObject) rawMessageMap.get(rawMessageMap.keySet().iterator().next());
-
- JSONObject broJson = broParser.parse(rawMessage.getBytes());
- assertEquals(broJson.get("timestamp").toString(), rawJson.get("ts").toString());
- assertEquals(broJson.get("ip_src_addr").toString(), rawJson.get("id.orig_h").toString());
- assertEquals(broJson.get("ip_dst_addr").toString(), rawJson.get("id.resp_h").toString());
- assertEquals(broJson.get("ip_src_port").toString(), rawJson.get("id.orig_p").toString());
- assertEquals(broJson.get("ip_dst_port").toString(), rawJson.get("id.resp_p").toString());
- assertTrue(broJson.get("original_string").toString().startsWith(rawMessageMap.keySet().iterator().next().toString().toUpperCase()));
-
- assertEquals(broJson.get("uid").toString(), rawJson.get("uid").toString());
- assertEquals(broJson.get("method").toString(), rawJson.get("method").toString());
- assertEquals(broJson.get("host").toString(), rawJson.get("host").toString());
- assertEquals(broJson.get("resp_mime_types").toString(), rawJson.get("resp_mime_types").toString());
+
+ JSONObject broJson = broParser.parse(rawMessage.getBytes()).get(0);
+ Assert.assertEquals(broJson.get("timestamp").toString(), rawJson.get("ts").toString());
+ Assert.assertEquals(broJson.get("ip_src_addr").toString(), rawJson.get("id.orig_h").toString());
+ Assert.assertEquals(broJson.get("ip_dst_addr").toString(), rawJson.get("id.resp_h").toString());
+ Assert.assertEquals(broJson.get("ip_src_port").toString(), rawJson.get("id.orig_p").toString());
+ Assert.assertEquals(broJson.get("ip_dst_port").toString(), rawJson.get("id.resp_p").toString());
+ Assert.assertTrue(broJson.get("original_string").toString().startsWith(rawMessageMap.keySet().iterator().next().toString().toUpperCase()));
+
+ Assert.assertEquals(broJson.get("uid").toString(), rawJson.get("uid").toString());
+ Assert.assertEquals(broJson.get("method").toString(), rawJson.get("method").toString());
+ Assert.assertEquals(broJson.get("host").toString(), rawJson.get("host").toString());
+ Assert.assertEquals(broJson.get("resp_mime_types").toString(), rawJson.get("resp_mime_types").toString());
}
-
+
@SuppressWarnings("rawtypes")
public void testDnsBroMessage() throws ParseException {
String rawMessage = "{\"dns\":{\"ts\":1402308259609,\"uid\":\"CuJT272SKaJSuqO0Ia\",\"id.orig_h\":\"10.122.196.204\",\"id.orig_p\":33976,\"id.resp_h\":\"144.254.71.184\",\"id.resp_p\":53,\"proto\":\"udp\",\"trans_id\":62418,\"query\":\"www.cisco.com\",\"qclass\":1,\"qclass_name\":\"C_INTERNET\",\"qtype\":28,\"qtype_name\":\"AAAA\",\"rcode\":0,\"rcode_name\":\"NOERROR\",\"AA\":true,\"TC\":false,\"RD\":true,\"RA\":true,\"Z\":0,\"answers\":[\"www.cisco.com.akadns.net\",\"origin-www.cisco.com\",\"2001:420:1201:2::a\"],\"TTLs\":[3600.0,289.0,14.0],\"rejected\":false}}";
-
+
Map rawMessageMap = (Map) jsonParser.parse(rawMessage);
JSONObject rawJson = (JSONObject) rawMessageMap.get(rawMessageMap.keySet().iterator().next());
-
- JSONObject broJson = broParser.parse(rawMessage.getBytes());
- assertEquals(broJson.get("timestamp").toString(), rawJson.get("ts").toString());
- assertEquals(broJson.get("ip_src_addr").toString(), rawJson.get("id.orig_h").toString());
- assertEquals(broJson.get("ip_dst_addr").toString(), rawJson.get("id.resp_h").toString());
- assertEquals(broJson.get("ip_src_port").toString(), rawJson.get("id.orig_p").toString());
- assertEquals(broJson.get("ip_dst_port").toString(), rawJson.get("id.resp_p").toString());
- assertTrue(broJson.get("original_string").toString().startsWith(rawMessageMap.keySet().iterator().next().toString().toUpperCase()));
-
- assertEquals(broJson.get("qtype").toString(), rawJson.get("qtype").toString());
- assertEquals(broJson.get("trans_id").toString(), rawJson.get("trans_id").toString());
+
+ JSONObject broJson = broParser.parse(rawMessage.getBytes()).get(0);
+ Assert.assertEquals(broJson.get("timestamp").toString(), rawJson.get("ts").toString());
+ Assert.assertEquals(broJson.get("ip_src_addr").toString(), rawJson.get("id.orig_h").toString());
+ Assert.assertEquals(broJson.get("ip_dst_addr").toString(), rawJson.get("id.resp_h").toString());
+ Assert.assertEquals(broJson.get("ip_src_port").toString(), rawJson.get("id.orig_p").toString());
+ Assert.assertEquals(broJson.get("ip_dst_port").toString(), rawJson.get("id.resp_p").toString());
+ Assert.assertTrue(broJson.get("original_string").toString().startsWith(rawMessageMap.keySet().iterator().next().toString().toUpperCase()));
+
+ Assert.assertEquals(broJson.get("qtype").toString(), rawJson.get("qtype").toString());
+ Assert.assertEquals(broJson.get("trans_id").toString(), rawJson.get("trans_id").toString());
}
-
+
@SuppressWarnings("rawtypes")
public void testFilesBroMessage() throws ParseException {
String rawMessage = "{\"files\":{\"analyzers\": [\"X509\",\"MD5\",\"SHA1\"],\"conn_uids\":[\"C4tygJ3qxJBEJEBCeh\"],\"depth\": 0,\"duration\": 0.0,\"fuid\":\"FZEBC33VySG0nHSoO9\",\"is_orig\": false,\"local_orig\": false,\"md5\": \"eba37166385e3ef42464ed9752e99f1b\",\"missing_bytes\": 0,\"overflow_bytes\": 0,\"rx_hosts\": [\"10.220.15.205\"],\"seen_bytes\": 1136,\"sha1\": \"73e42686657aece354fbf685712361658f2f4357\",\"source\": \"SSL\",\"timedout\": false,\"ts\": \"1425845251334\",\"tx_hosts\": [\"68.171.237.7\"]}}";
-
+
Map rawMessageMap = (Map) jsonParser.parse(rawMessage);
JSONObject rawJson = (JSONObject) rawMessageMap.get(rawMessageMap.keySet().iterator().next());
-
- JSONObject broJson = broParser.parse(rawMessage.getBytes());
- assertEquals(broJson.get("timestamp").toString(), rawJson.get("ts").toString());
- assertEquals(broJson.get("ip_src_addr").toString(), ((JSONArray)rawJson.get("tx_hosts")).get(0).toString());
- assertEquals(broJson.get("ip_dst_addr").toString(), ((JSONArray)rawJson.get("rx_hosts")).get(0).toString());
- assertTrue(broJson.get("original_string").toString().startsWith(rawMessageMap.keySet().iterator().next().toString().toUpperCase()));
-
- assertEquals(broJson.get("fuid").toString(), rawJson.get("fuid").toString());
- assertEquals(broJson.get("md5").toString(), rawJson.get("md5").toString());
- assertEquals(broJson.get("analyzers").toString(), rawJson.get("analyzers").toString());
+
+ JSONObject broJson = broParser.parse(rawMessage.getBytes()).get(0);
+ Assert.assertEquals(broJson.get("timestamp").toString(), rawJson.get("ts").toString());
+ Assert.assertEquals(broJson.get("ip_src_addr").toString(), ((JSONArray)rawJson.get("tx_hosts")).get(0).toString());
+ Assert.assertEquals(broJson.get("ip_dst_addr").toString(), ((JSONArray)rawJson.get("rx_hosts")).get(0).toString());
+ Assert.assertTrue(broJson.get("original_string").toString().startsWith(rawMessageMap.keySet().iterator().next().toString().toUpperCase()));
+
+ Assert.assertEquals(broJson.get("fuid").toString(), rawJson.get("fuid").toString());
+ Assert.assertEquals(broJson.get("md5").toString(), rawJson.get("md5").toString());
+ Assert.assertEquals(broJson.get("analyzers").toString(), rawJson.get("analyzers").toString());
}
-
+
@SuppressWarnings("rawtypes")
public void testProtocolKeyCleanedUp() throws ParseException {
String rawMessage = "{\"ht*tp\":{\"ts\":1402307733473,\"uid\":\"CTo78A11g7CYbbOHvj\",\"id.orig_h\":\"192.249.113.37\",\"id.orig_p\":58808,\"id.resp_h\":\"72.163.4.161\",\"id.resp_p\":80,\"trans_depth\":1,\"method\":\"GET\",\"host\":\"www.cisco.com\",\"uri\":\"/\",\"user_agent\":\"curl/7.22.0 (x86_64-pc-linux-gnu) libcurl/7.22.0 OpenSSL/1.0.1 zlib/1.2.3.4 libidn/1.23 librtmp/2.3\",\"request_body_len\":0,\"response_body_len\":25523,\"status_code\":200,\"status_msg\":\"OK\",\"tags\":[],\"resp_fuids\":[\"FJDyMC15lxUn5ngPfd\"],\"resp_mime_types\":[\"text/html\"]}}";
-
+
Map rawMessageMap = (Map) jsonParser.parse(rawMessage);
JSONObject rawJson = (JSONObject) rawMessageMap.get(rawMessageMap.keySet().iterator().next());
-
- JSONObject broJson = broParser.parse(rawMessage.getBytes());
-
- assertEquals(broJson.get("timestamp").toString(), rawJson.get("ts").toString());
- assertEquals(broJson.get("ip_src_addr").toString(), rawJson.get("id.orig_h").toString());
- assertTrue(broJson.get("original_string").toString().startsWith("HTTP"));
+
+ JSONObject broJson = broParser.parse(rawMessage.getBytes()).get(0);
+
+ Assert.assertEquals(broJson.get("timestamp").toString(), rawJson.get("ts").toString());
+ Assert.assertEquals(broJson.get("ip_src_addr").toString(), rawJson.get("id.orig_h").toString());
+ Assert.assertTrue(broJson.get("original_string").toString().startsWith("HTTP"));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicFireEyeParserTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicFireEyeParserTest.java b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicFireEyeParserTest.java
index 9adf446..1eefc27 100644
--- a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicFireEyeParserTest.java
+++ b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicFireEyeParserTest.java
@@ -78,12 +78,18 @@ public class BasicFireEyeParserTest extends AbstractConfigTest
}
/**
- * Test method for {@link org.apache.metron.parsing.parsers.BasicFireEyeParser#parse(java.lang.String)}.
+ * Test method for
+ *
+ *
+ *
+ *
+ *
+ * {@link org.apache.metron.parsing.parsers.BasicFireEyeParser#parse(byte[])}.
*/
@SuppressWarnings({ "rawtypes"})
public void testParse() {
for (String inputString : getInputStrings()) {
- JSONObject parsed = parser.parse(inputString.getBytes());
+ JSONObject parsed = parser.parse(inputString.getBytes()).get(0);
Assert.assertNotNull(parsed);
JSONParser parser = new JSONParser();
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicIseParserTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicIseParserTest.java b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicIseParserTest.java
index e8842b9..03d31a3 100644
--- a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicIseParserTest.java
+++ b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicIseParserTest.java
@@ -110,7 +110,7 @@ public class BasicIseParserTest extends AbstractSchemaTest {
*/
public void testParse() throws ParseException, IOException, Exception {
for (String inputString : getInputStrings()) {
- JSONObject parsed = parser.parse(inputString.getBytes());
+ JSONObject parsed = parser.parse(inputString.getBytes()).get(0);
assertNotNull(parsed);
System.out.println(parsed);
@@ -139,7 +139,7 @@ public class BasicIseParserTest extends AbstractSchemaTest {
/**
* Sets the iseParser.
*
- * @param iseParser
+ * @param parser
*/
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicLancopeParserTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicLancopeParserTest.java b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicLancopeParserTest.java
index 9667699..70cf089 100644
--- a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicLancopeParserTest.java
+++ b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicLancopeParserTest.java
@@ -104,7 +104,7 @@ public class BasicLancopeParserTest extends AbstractSchemaTest {
public void testParse() throws IOException, Exception {
for (String inputString : getInputStrings()) {
- JSONObject parsed = parser.parse(inputString.getBytes());
+ JSONObject parsed = parser.parse(inputString.getBytes()).get(0);
assertNotNull(parsed);
System.out.println(parsed);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicPaloAltoFirewallParserTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicPaloAltoFirewallParserTest.java b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicPaloAltoFirewallParserTest.java
index 7a0b9d4..d989b6c 100644
--- a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicPaloAltoFirewallParserTest.java
+++ b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicPaloAltoFirewallParserTest.java
@@ -72,12 +72,13 @@ public class BasicPaloAltoFirewallParserTest extends AbstractConfigTest {
}
/**
- * Test method for {@link org.apache.metron.parsing.parsers.BasicSourcefireParser#parse(java.lang.String)}.
+ * Test method for
+ * {@link org.apache.metron.parsing.parsers.BasicSourcefireParser#parse(byte[])}.
*/
@SuppressWarnings({ "rawtypes" })
public void testParse() {
for (String inputString : getInputStrings()) {
- JSONObject parsed = paParser.parse(inputString.getBytes());
+ JSONObject parsed = paParser.parse(inputString.getBytes()).get(0);
Assert.assertNotNull(parsed);
System.out.println(parsed);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicSourcefireParserTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicSourcefireParserTest.java b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicSourcefireParserTest.java
index dd56eeb..3ac44fb 100644
--- a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicSourcefireParserTest.java
+++ b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicSourcefireParserTest.java
@@ -78,13 +78,13 @@ public class BasicSourcefireParserTest extends AbstractConfigTest
}
/**
- * Test method for {@link org.apache.metron.parsing.parsers.BasicSourcefireParser#parse(java.lang.String)}.
+ * Test method for {@link org.apache.metron.parsing.parsers.BasicSourcefireParser#parse(byte[])}.
*/
@SuppressWarnings({ "rawtypes", "unused" })
public void testParse() {
for (String sourceFireString : getSourceFireStrings()) {
byte[] srcBytes = sourceFireString.getBytes();
- JSONObject parsed = sourceFireParser.parse(sourceFireString.getBytes());
+ JSONObject parsed = sourceFireParser.parse(sourceFireString.getBytes()).get(0);
Assert.assertNotNull(parsed);
System.out.println(parsed);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BroParserTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BroParserTest.java b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BroParserTest.java
index 01a1210..2a543fe 100644
--- a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BroParserTest.java
+++ b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BroParserTest.java
@@ -85,7 +85,7 @@ public class BroParserTest extends AbstractConfigTest {
public void testParse() throws ParseException {
for (String inputString : getInputStrings()) {
- JSONObject cleanJson = parser.parse(inputString.getBytes());
+ JSONObject cleanJson = parser.parse(inputString.getBytes()).get(0);
Assert.assertNotNull(cleanJson);
System.out.println(cleanJson);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/GrokAsaParserTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/GrokAsaParserTest.java b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/GrokAsaParserTest.java
index 86f23bd..a59d5c8 100644
--- a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/GrokAsaParserTest.java
+++ b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/GrokAsaParserTest.java
@@ -75,13 +75,13 @@ public class GrokAsaParserTest extends AbstractConfigTest{
}
/**
- * Test method for {@link org.apache.metron.parsing.parsers.BasicSourcefireParser#parse(java.lang.String)}.
+ * Test method for {@link org.apache.metron.parsing.parsers.BasicSourcefireParser#parse(byte[])}.
*/
@SuppressWarnings({ "rawtypes" })
public void testParse() {
for (String grokAsaString : getGrokAsaStrings()) {
- JSONObject parsed = grokAsaParser.parse(grokAsaString.getBytes());
+ JSONObject parsed = grokAsaParser.parse(grokAsaString.getBytes()).get(0);
Assert.assertNotNull(parsed);
System.out.println(parsed);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java
index 13f4c99..b922916 100644
--- a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java
@@ -109,7 +109,13 @@ public class GenericInternalTestSpout extends BaseRichSpout {
if(cnt < jsons.size())
{
- _collector.emit(new Values(_converter.convert(jsons.get(cnt))));
+ byte[] value;
+ if (_converter != null) {
+ value = _converter.convert(jsons.get(cnt));
+ } else {
+ value = jsons.get(cnt).getBytes();
+ }
+ _collector.emit(new Values(value));
}
cnt ++;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/whitelists/known_hosts.conf
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/whitelists/known_hosts.conf b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/whitelists/known_hosts.conf
index 1c11207..d20b050 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/whitelists/known_hosts.conf
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/whitelists/known_hosts.conf
@@ -1,3 +1,4 @@
10.1.128.236={"local":"YES", "type":"webserver", "asset_value" : "important"}
10.1.128.237={"local":"UNKNOWN", "type":"unknown", "asset_value" : "important"}
-10.60.10.254={"local":"YES", "type":"printer", "asset_value" : "important"}
\ No newline at end of file
+10.60.10.254={"local":"YES", "type":"printer", "asset_value" : "important"}
+10.0.2.15={"local":"YES", "type":"webserver", "asset_value" : "important"}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/local.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/local.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/local.yaml
index 8615d8f..6c6c63b 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/local.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/local.yaml
@@ -3,36 +3,58 @@ config:
topology.workers: 1
components:
- - id: "asaParser"
+ - id: "parser"
className: "org.apache.metron.parsing.parsers.GrokAsaParser"
- - id: "genericMessageFilter"
- className: "org.apache.metron.filters.GenericMessageFilter"
- - id: "geoKeys"
- className: "java.util.ArrayList"
- configMethods:
- - name: "add"
- args: ["ip_src_addr"]
- - name: "add"
- args: ["ip_dst_addr"]
+ - id: "jdbcConfig"
+ className: "org.apache.metron.enrichment.adapters.jdbc.MySqlConfig"
+ properties:
+ - name: "host"
+ value: "${mysql.ip}"
+ - name: "port"
+ value: ${mysql.port}
+ - name: "username"
+ value: "${mysql.username}"
+ - name: "password"
+ value: "${mysql.password}"
+ - name: "table"
+ value: "GEO"
- id: "geoEnrichmentAdapter"
- className: "org.apache.metron.enrichment.adapters.geo.GeoMysqlAdapter"
- constructorArgs:
- - "${mysql.ip}"
- - ${mysql.port}
- - "${mysql.username}"
- - "${mysql.password}"
- - "GEO"
- - id: "hostsKeys"
- className: "java.util.ArrayList"
+ className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
configMethods:
- - name: "add"
- args: ["ip_src_addr"]
- - name: "add"
- args: ["ip_dst_addr"]
+ - name: "withJdbcConfig"
+ args:
+ - ref: "jdbcConfig"
+ - id: "geoEnrichment"
+ className: "org.apache.metron.domain.Enrichment"
+ properties:
+ - name: "name"
+ value: "geo"
+ - name: "fields"
+ value: ["ip_src_addr", "ip_dst_addr"]
+ - name: "adapter"
+ ref: "geoEnrichmentAdapter"
- id: "hostEnrichmentAdapter"
className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
constructorArgs:
- '${org.apache.metron.enrichment.host.known_hosts}'
+ - id: "hostEnrichment"
+ className: "org.apache.metron.domain.Enrichment"
+ properties:
+ - name: "name"
+ value: "host"
+ - name: "fields"
+ value: ["ip_src_addr", "ip_dst_addr"]
+ - name: "adapter"
+ ref: "hostEnrichmentAdapter"
+ - id: "enrichments"
+ className: "java.util.ArrayList"
+ configMethods:
+ - name: "add"
+ args:
+ - ref: "geoEnrichment"
+ - name: "add"
+ args:
+ - ref: "hostEnrichment"
- id: "indexAdapter"
className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter"
- id: "alertsConfig"
@@ -135,60 +157,14 @@ spouts:
bolts:
- id: "parserBolt"
- className: "org.apache.metron.parsing.TelemetryParserBolt"
+ className: "org.apache.metron.bolt.TelemetryParserBolt"
configMethods:
- name: "withMessageParser"
args:
- - ref: "asaParser"
- - name: "withOutputFieldName"
+ - ref: "parser"
+ - name: "withEnrichments"
args:
- - "asa"
- - name: "withMessageFilter"
- args:
- - ref: "genericMessageFilter"
- - name: "withMetricConfig"
- args:
- - ref: "metricConfig"
- - id: "geoEnrichmentBolt"
- className: "org.apache.metron.enrichment.common.GenericEnrichmentBolt"
- configMethods:
- - name: "withEnrichmentTag"
- args: ["geo"]
- - name: "withAdapter"
- args:
- - ref: "geoEnrichmentAdapter"
- - name: "withMaxTimeRetain"
- args: [10]
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withOutputFieldName"
- args: ["asa"]
- - name: "withKeys"
- args:
- - ref: "geoKeys"
- - name: "withMetricConfiguration"
- args:
- - ref: "metricConfig"
- - id: "hostEnrichmentBolt"
- className: "org.apache.metron.enrichment.common.GenericEnrichmentBolt"
- configMethods:
- - name: "withEnrichmentTag"
- args: ["host"]
- - name: "withAdapter"
- args:
- - ref: "hostEnrichmentAdapter"
- - name: "withMaxTimeRetain"
- args: [10]
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withOutputFieldName"
- args: ["asa"]
- - name: "withKeys"
- args:
- - ref: "hostsKeys"
- - name: "withMetricConfiguration"
- args:
- - ref: "metricConfig"
+ - ref: "enrichments"
- id: "indexingBolt"
className: "org.apache.metron.indexing.TelemetryIndexingBolt"
configMethods:
@@ -297,6 +273,36 @@ bolts:
- name: "withMetricConfiguration"
args:
- ref: "metricConfig"
+ - id: "geoEnrichmentBolt"
+ className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+ configMethods:
+ - name: "withEnrichment"
+ args:
+ - ref: "geoEnrichment"
+ - name: "withMaxCacheSize"
+ args: [10000]
+ - name: "withMaxTimeRetain"
+ args: [10]
+ - id: "hostEnrichmentBolt"
+ className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+ configMethods:
+ - name: "withEnrichment"
+ args:
+ - ref: "hostEnrichment"
+ - name: "withMaxCacheSize"
+ args: [10000]
+ - name: "withMaxTimeRetain"
+ args: [10]
+ - id: "joinBolt"
+ className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
+ configMethods:
+ - name: "withEnrichments"
+ args:
+ - ref: "enrichments"
+ - name: "withMaxCacheSize"
+ args: [10000]
+ - name: "withMaxTimeRetain"
+ args: [10]
streams:
- name: "spout -> parser"
@@ -304,22 +310,43 @@ streams:
to: "parserBolt"
grouping:
type: SHUFFLE
+ - name: "parser -> host"
+ from: "parserBolt"
+ to: "hostEnrichmentBolt"
+ grouping:
+ streamId: "host"
+ type: FIELDS
+ args: ["key"]
- name: "parser -> geo"
from: "parserBolt"
to: "geoEnrichmentBolt"
grouping:
+ streamId: "geo"
type: FIELDS
+ args: ["key"]
+ - name: "parser -> join"
+ from: "parserBolt"
+ to: "joinBolt"
+ grouping:
streamId: "message"
+ type: FIELDS
args: ["key"]
- - name: "geo -> host"
+ - name: "geo -> join"
from: "geoEnrichmentBolt"
- to: "hostEnrichmentBolt"
+ to: "joinBolt"
grouping:
+ streamId: "geo"
type: FIELDS
- streamId: "message"
args: ["key"]
- - name: "host -> alerts"
+ - name: "host -> join"
from: "hostEnrichmentBolt"
+ to: "joinBolt"
+ grouping:
+ streamId: "host"
+ type: FIELDS
+ args: ["key"]
+ - name: "join -> alerts"
+ from: "joinBolt"
to: "alertsBolt"
grouping:
streamId: "message"
@@ -331,8 +358,8 @@ streams:
grouping:
streamId: "message"
type: SHUFFLE
- - name: "alerts -> indexing"
- from: "alertsBolt"
+ - name: "join -> indexing"
+ from: "joinBolt"
to: "indexingBolt"
grouping:
streamId: "message"
@@ -355,4 +382,4 @@ streams:
to: "errorIndexingBolt"
grouping:
streamId: "error"
- type: SHUFFLE
+ type: SHUFFLE
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/remote.yaml
index e3b01d6..01492c5 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/remote.yaml
@@ -3,36 +3,58 @@ config:
topology.workers: 1
components:
- - id: "asaParser"
+ - id: "parser"
className: "org.apache.metron.parsing.parsers.GrokAsaParser"
- - id: "genericMessageFilter"
- className: "org.apache.metron.filters.GenericMessageFilter"
- - id: "geoKeys"
- className: "java.util.ArrayList"
- configMethods:
- - name: "add"
- args: ["ip_src_addr"]
- - name: "add"
- args: ["ip_dst_addr"]
+ - id: "jdbcConfig"
+ className: "org.apache.metron.enrichment.adapters.jdbc.MySqlConfig"
+ properties:
+ - name: "host"
+ value: "${mysql.ip}"
+ - name: "port"
+ value: ${mysql.port}
+ - name: "username"
+ value: "${mysql.username}"
+ - name: "password"
+ value: "${mysql.password}"
+ - name: "table"
+ value: "GEO"
- id: "geoEnrichmentAdapter"
- className: "org.apache.metron.enrichment.adapters.geo.GeoMysqlAdapter"
- constructorArgs:
- - "${mysql.ip}"
- - ${mysql.port}
- - "${mysql.username}"
- - "${mysql.password}"
- - "GEO"
- - id: "hostsKeys"
- className: "java.util.ArrayList"
+ className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
configMethods:
- - name: "add"
- args: ["ip_src_addr"]
- - name: "add"
- args: ["ip_dst_addr"]
+ - name: "withJdbcConfig"
+ args:
+ - ref: "jdbcConfig"
+ - id: "geoEnrichment"
+ className: "org.apache.metron.domain.Enrichment"
+ properties:
+ - name: "name"
+ value: "geo"
+ - name: "fields"
+ value: ["ip_src_addr", "ip_dst_addr"]
+ - name: "adapter"
+ ref: "geoEnrichmentAdapter"
- id: "hostEnrichmentAdapter"
className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
constructorArgs:
- '${org.apache.metron.enrichment.host.known_hosts}'
+ - id: "hostEnrichment"
+ className: "org.apache.metron.domain.Enrichment"
+ properties:
+ - name: "name"
+ value: "host"
+ - name: "fields"
+ value: ["ip_src_addr", "ip_dst_addr"]
+ - name: "adapter"
+ ref: "hostEnrichmentAdapter"
+ - id: "enrichments"
+ className: "java.util.ArrayList"
+ configMethods:
+ - name: "add"
+ args:
+ - ref: "geoEnrichment"
+ - name: "add"
+ args:
+ - ref: "hostEnrichment"
- id: "indexAdapter"
className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter"
- id: "alertsConfig"
@@ -130,11 +152,11 @@ components:
# zookeeper hosts
- ref: "zkHosts"
# topic name
- - "${spout.kafka.topic.asa}"
+ - "${spout.kafka.topic.pcap}"
# zk root
- ""
# id
- - "${spout.kafka.topic.asa}"
+ - "${spout.kafka.topic.pcap}"
properties:
- name: "forceFromStart"
value: true
@@ -149,60 +171,14 @@ spouts:
bolts:
- id: "parserBolt"
- className: "org.apache.metron.parsing.TelemetryParserBolt"
+ className: "org.apache.metron.bolt.TelemetryParserBolt"
configMethods:
- name: "withMessageParser"
args:
- - ref: "asaParser"
- - name: "withOutputFieldName"
- args:
- - "asa"
- - name: "withMessageFilter"
+ - ref: "parser"
+ - name: "withEnrichments"
args:
- - ref: "genericMessageFilter"
- - name: "withMetricConfig"
- args:
- - ref: "metricConfig"
- - id: "geoEnrichmentBolt"
- className: "org.apache.metron.enrichment.common.GenericEnrichmentBolt"
- configMethods:
- - name: "withEnrichmentTag"
- args: ["geo"]
- - name: "withAdapter"
- args:
- - ref: "geoEnrichmentAdapter"
- - name: "withMaxTimeRetain"
- args: [10]
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withOutputFieldName"
- args: ["asa"]
- - name: "withKeys"
- args:
- - ref: "geoKeys"
- - name: "withMetricConfiguration"
- args:
- - ref: "metricConfig"
- - id: "hostEnrichmentBolt"
- className: "org.apache.metron.enrichment.common.GenericEnrichmentBolt"
- configMethods:
- - name: "withEnrichmentTag"
- args: ["host"]
- - name: "withAdapter"
- args:
- - ref: "hostEnrichmentAdapter"
- - name: "withMaxTimeRetain"
- args: [10]
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withOutputFieldName"
- args: ["asa"]
- - name: "withKeys"
- args:
- - ref: "hostsKeys"
- - name: "withMetricConfiguration"
- args:
- - ref: "metricConfig"
+ - ref: "enrichments"
- id: "indexingBolt"
className: "org.apache.metron.indexing.TelemetryIndexingBolt"
configMethods:
@@ -311,6 +287,36 @@ bolts:
- name: "withMetricConfiguration"
args:
- ref: "metricConfig"
+ - id: "geoEnrichmentBolt"
+ className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+ configMethods:
+ - name: "withEnrichment"
+ args:
+ - ref: "geoEnrichment"
+ - name: "withMaxCacheSize"
+ args: [10000]
+ - name: "withMaxTimeRetain"
+ args: [10]
+ - id: "hostEnrichmentBolt"
+ className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+ configMethods:
+ - name: "withEnrichment"
+ args:
+ - ref: "hostEnrichment"
+ - name: "withMaxCacheSize"
+ args: [10000]
+ - name: "withMaxTimeRetain"
+ args: [10]
+ - id: "joinBolt"
+ className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
+ configMethods:
+ - name: "withEnrichments"
+ args:
+ - ref: "enrichments"
+ - name: "withMaxCacheSize"
+ args: [10000]
+ - name: "withMaxTimeRetain"
+ args: [10]
streams:
- name: "spout -> parser"
@@ -318,22 +324,43 @@ streams:
to: "parserBolt"
grouping:
type: SHUFFLE
+ - name: "parser -> host"
+ from: "parserBolt"
+ to: "hostEnrichmentBolt"
+ grouping:
+ streamId: "host"
+ type: FIELDS
+ args: ["key"]
- name: "parser -> geo"
from: "parserBolt"
to: "geoEnrichmentBolt"
grouping:
+ streamId: "geo"
type: FIELDS
+ args: ["key"]
+ - name: "parser -> join"
+ from: "parserBolt"
+ to: "joinBolt"
+ grouping:
streamId: "message"
+ type: FIELDS
args: ["key"]
- - name: "geo -> host"
+ - name: "geo -> join"
from: "geoEnrichmentBolt"
- to: "hostEnrichmentBolt"
+ to: "joinBolt"
grouping:
+ streamId: "geo"
type: FIELDS
- streamId: "message"
args: ["key"]
- - name: "host -> alerts"
+ - name: "host -> join"
from: "hostEnrichmentBolt"
+ to: "joinBolt"
+ grouping:
+ streamId: "host"
+ type: FIELDS
+ args: ["key"]
+ - name: "join -> alerts"
+ from: "joinBolt"
to: "alertsBolt"
grouping:
streamId: "message"
@@ -345,8 +372,8 @@ streams:
grouping:
streamId: "message"
type: SHUFFLE
- - name: "alerts -> indexing"
- from: "alertsBolt"
+ - name: "join -> indexing"
+ from: "joinBolt"
to: "indexingBolt"
grouping:
streamId: "message"
@@ -369,4 +396,4 @@ streams:
to: "errorIndexingBolt"
grouping:
streamId: "error"
- type: SHUFFLE
+ type: SHUFFLE
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/local.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/local.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/local.yaml
index e1ea080..8013b37 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/local.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/local.yaml
@@ -83,20 +83,11 @@ spouts:
bolts:
- id: "parserBolt"
- className: "org.apache.metron.parsing.TelemetryParserBolt"
+ className: "org.apache.metron.bolt.TelemetryParserBolt"
configMethods:
- name: "withMessageParser"
args:
- ref: "broParser"
- - name: "withOutputFieldName"
- args:
- - "bro"
- - name: "withMessageFilter"
- args:
- - ref: "genericMessageFilter"
- - name: "withMetricConfig"
- args:
- - ref: "metricConfig"
- id: "indexingBolt"
className: "org.apache.metron.indexing.TelemetryIndexingBolt"
configMethods:
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
index 956e346..23fbc56 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
@@ -97,20 +97,11 @@ spouts:
bolts:
- id: "parserBolt"
- className: "org.apache.metron.parsing.TelemetryParserBolt"
+ className: "org.apache.metron.bolt.TelemetryParserBolt"
configMethods:
- name: "withMessageParser"
args:
- ref: "broParser"
- - name: "withOutputFieldName"
- args:
- - "bro"
- - name: "withMessageFilter"
- args:
- - ref: "genericMessageFilter"
- - name: "withMetricConfig"
- args:
- - ref: "metricConfig"
- id: "indexingBolt"
className: "org.apache.metron.indexing.TelemetryIndexingBolt"
configMethods:
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/fireeye/local.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/fireeye/local.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/fireeye/local.yaml
index 3390dd7..e519715 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/fireeye/local.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/fireeye/local.yaml
@@ -3,36 +3,58 @@ config:
topology.workers: 1
components:
- - id: "fireEyeParser"
+ - id: "parser"
className: "org.apache.metron.parsing.parsers.BasicFireEyeParser"
- - id: "genericMessageFilter"
- className: "org.apache.metron.filters.GenericMessageFilter"
- - id: "geoKeys"
- className: "java.util.ArrayList"
- configMethods:
- - name: "add"
- args: ["ip_src_addr"]
- - name: "add"
- args: ["ip_dst_addr"]
+ - id: "jdbcConfig"
+ className: "org.apache.metron.enrichment.adapters.jdbc.MySqlConfig"
+ properties:
+ - name: "host"
+ value: "${mysql.ip}"
+ - name: "port"
+ value: ${mysql.port}
+ - name: "username"
+ value: "${mysql.username}"
+ - name: "password"
+ value: "${mysql.password}"
+ - name: "table"
+ value: "GEO"
- id: "geoEnrichmentAdapter"
- className: "org.apache.metron.enrichment.adapters.geo.GeoMysqlAdapter"
- constructorArgs:
- - "${mysql.ip}"
- - ${mysql.port}
- - "${mysql.username}"
- - "${mysql.password}"
- - "GEO"
- - id: "hostsKeys"
- className: "java.util.ArrayList"
+ className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
configMethods:
- - name: "add"
- args: ["ip_src_addr"]
- - name: "add"
- args: ["ip_dst_addr"]
+ - name: "withJdbcConfig"
+ args:
+ - ref: "jdbcConfig"
+ - id: "geoEnrichment"
+ className: "org.apache.metron.domain.Enrichment"
+ properties:
+ - name: "name"
+ value: "geo"
+ - name: "fields"
+ value: ["ip_src_addr", "ip_dst_addr"]
+ - name: "adapter"
+ ref: "geoEnrichmentAdapter"
- id: "hostEnrichmentAdapter"
className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
constructorArgs:
- '${org.apache.metron.enrichment.host.known_hosts}'
+ - id: "hostEnrichment"
+ className: "org.apache.metron.domain.Enrichment"
+ properties:
+ - name: "name"
+ value: "host"
+ - name: "fields"
+ value: ["ip_src_addr", "ip_dst_addr"]
+ - name: "adapter"
+ ref: "hostEnrichmentAdapter"
+ - id: "enrichments"
+ className: "java.util.ArrayList"
+ configMethods:
+ - name: "add"
+ args:
+ - ref: "geoEnrichment"
+ - name: "add"
+ args:
+ - ref: "hostEnrichment"
- id: "indexAdapter"
className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter"
- id: "alertsConfig"
@@ -135,60 +157,14 @@ spouts:
bolts:
- id: "parserBolt"
- className: "org.apache.metron.parsing.TelemetryParserBolt"
+ className: "org.apache.metron.bolt.TelemetryParserBolt"
configMethods:
- name: "withMessageParser"
args:
- - ref: "fireEyeParser"
- - name: "withOutputFieldName"
+ - ref: "parser"
+ - name: "withEnrichments"
args:
- - "fireeye"
- - name: "withMessageFilter"
- args:
- - ref: "genericMessageFilter"
- - name: "withMetricConfig"
- args:
- - ref: "metricConfig"
- - id: "geoEnrichmentBolt"
- className: "org.apache.metron.enrichment.common.GenericEnrichmentBolt"
- configMethods:
- - name: "withEnrichmentTag"
- args: ["geo"]
- - name: "withAdapter"
- args:
- - ref: "geoEnrichmentAdapter"
- - name: "withMaxTimeRetain"
- args: [10]
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withOutputFieldName"
- args: ["fireeye"]
- - name: "withKeys"
- args:
- - ref: "geoKeys"
- - name: "withMetricConfiguration"
- args:
- - ref: "metricConfig"
- - id: "hostEnrichmentBolt"
- className: "org.apache.metron.enrichment.common.GenericEnrichmentBolt"
- configMethods:
- - name: "withEnrichmentTag"
- args: ["host"]
- - name: "withAdapter"
- args:
- - ref: "hostEnrichmentAdapter"
- - name: "withMaxTimeRetain"
- args: [10]
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withOutputFieldName"
- args: ["fireeye"]
- - name: "withKeys"
- args:
- - ref: "hostsKeys"
- - name: "withMetricConfiguration"
- args:
- - ref: "metricConfig"
+ - ref: "enrichments"
- id: "indexingBolt"
className: "org.apache.metron.indexing.TelemetryIndexingBolt"
configMethods:
@@ -297,6 +273,36 @@ bolts:
- name: "withMetricConfiguration"
args:
- ref: "metricConfig"
+ - id: "geoEnrichmentBolt"
+ className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+ configMethods:
+ - name: "withEnrichment"
+ args:
+ - ref: "geoEnrichment"
+ - name: "withMaxCacheSize"
+ args: [10000]
+ - name: "withMaxTimeRetain"
+ args: [10]
+ - id: "hostEnrichmentBolt"
+ className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+ configMethods:
+ - name: "withEnrichment"
+ args:
+ - ref: "hostEnrichment"
+ - name: "withMaxCacheSize"
+ args: [10000]
+ - name: "withMaxTimeRetain"
+ args: [10]
+ - id: "joinBolt"
+ className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
+ configMethods:
+ - name: "withEnrichments"
+ args:
+ - ref: "enrichments"
+ - name: "withMaxCacheSize"
+ args: [10000]
+ - name: "withMaxTimeRetain"
+ args: [10]
streams:
- name: "spout -> parser"
@@ -304,22 +310,43 @@ streams:
to: "parserBolt"
grouping:
type: SHUFFLE
+ - name: "parser -> host"
+ from: "parserBolt"
+ to: "hostEnrichmentBolt"
+ grouping:
+ streamId: "host"
+ type: FIELDS
+ args: ["key"]
- name: "parser -> geo"
from: "parserBolt"
to: "geoEnrichmentBolt"
grouping:
+ streamId: "geo"
type: FIELDS
+ args: ["key"]
+ - name: "parser -> join"
+ from: "parserBolt"
+ to: "joinBolt"
+ grouping:
streamId: "message"
+ type: FIELDS
args: ["key"]
- - name: "geo -> host"
+ - name: "geo -> join"
from: "geoEnrichmentBolt"
- to: "hostEnrichmentBolt"
+ to: "joinBolt"
grouping:
+ streamId: "geo"
type: FIELDS
- streamId: "message"
args: ["key"]
- - name: "host -> alerts"
+ - name: "host -> join"
from: "hostEnrichmentBolt"
+ to: "joinBolt"
+ grouping:
+ streamId: "host"
+ type: FIELDS
+ args: ["key"]
+ - name: "join -> alerts"
+ from: "joinBolt"
to: "alertsBolt"
grouping:
streamId: "message"
@@ -331,8 +358,8 @@ streams:
grouping:
streamId: "message"
type: SHUFFLE
- - name: "alerts -> indexing"
- from: "alertsBolt"
+ - name: "join -> indexing"
+ from: "joinBolt"
to: "indexingBolt"
grouping:
streamId: "message"
@@ -355,4 +382,4 @@ streams:
to: "errorIndexingBolt"
grouping:
streamId: "error"
- type: SHUFFLE
+ type: SHUFFLE
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/fireeye/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/fireeye/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/fireeye/remote.yaml
index b759adf..494c233 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/fireeye/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/fireeye/remote.yaml
@@ -3,36 +3,58 @@ config:
topology.workers: 1
components:
- - id: "fireEyeParser"
+ - id: "parser"
className: "org.apache.metron.parsing.parsers.BasicFireEyeParser"
- - id: "genericMessageFilter"
- className: "org.apache.metron.filters.GenericMessageFilter"
- - id: "geoKeys"
- className: "java.util.ArrayList"
- configMethods:
- - name: "add"
- args: ["ip_src_addr"]
- - name: "add"
- args: ["ip_dst_addr"]
+ - id: "jdbcConfig"
+ className: "org.apache.metron.enrichment.adapters.jdbc.MySqlConfig"
+ properties:
+ - name: "host"
+ value: "${mysql.ip}"
+ - name: "port"
+ value: ${mysql.port}
+ - name: "username"
+ value: "${mysql.username}"
+ - name: "password"
+ value: "${mysql.password}"
+ - name: "table"
+ value: "GEO"
- id: "geoEnrichmentAdapter"
- className: "org.apache.metron.enrichment.adapters.geo.GeoMysqlAdapter"
- constructorArgs:
- - "${mysql.ip}"
- - ${mysql.port}
- - "${mysql.username}"
- - "${mysql.password}"
- - "GEO"
- - id: "hostsKeys"
- className: "java.util.ArrayList"
+ className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
configMethods:
- - name: "add"
- args: ["ip_src_addr"]
- - name: "add"
- args: ["ip_dst_addr"]
+ - name: "withJdbcConfig"
+ args:
+ - ref: "jdbcConfig"
+ - id: "geoEnrichment"
+ className: "org.apache.metron.domain.Enrichment"
+ properties:
+ - name: "name"
+ value: "geo"
+ - name: "fields"
+ value: ["ip_src_addr", "ip_dst_addr"]
+ - name: "adapter"
+ ref: "geoEnrichmentAdapter"
- id: "hostEnrichmentAdapter"
className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
constructorArgs:
- '${org.apache.metron.enrichment.host.known_hosts}'
+ - id: "hostEnrichment"
+ className: "org.apache.metron.domain.Enrichment"
+ properties:
+ - name: "name"
+ value: "host"
+ - name: "fields"
+ value: ["ip_src_addr", "ip_dst_addr"]
+ - name: "adapter"
+ ref: "hostEnrichmentAdapter"
+ - id: "enrichments"
+ className: "java.util.ArrayList"
+ configMethods:
+ - name: "add"
+ args:
+ - ref: "geoEnrichment"
+ - name: "add"
+ args:
+ - ref: "hostEnrichment"
- id: "indexAdapter"
className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter"
- id: "alertsConfig"
@@ -130,11 +152,11 @@ components:
# zookeeper hosts
- ref: "zkHosts"
# topic name
- - "${spout.kafka.topic.fireeye}"
+ - "${spout.kafka.topic.pcap}"
# zk root
- ""
# id
- - "${spout.kafka.topic.fireeye}"
+ - "${spout.kafka.topic.pcap}"
properties:
- name: "forceFromStart"
value: true
@@ -149,60 +171,14 @@ spouts:
bolts:
- id: "parserBolt"
- className: "org.apache.metron.parsing.TelemetryParserBolt"
+ className: "org.apache.metron.bolt.TelemetryParserBolt"
configMethods:
- name: "withMessageParser"
args:
- - ref: "fireEyeParser"
- - name: "withOutputFieldName"
- args:
- - "fireeye"
- - name: "withMessageFilter"
+ - ref: "parser"
+ - name: "withEnrichments"
args:
- - ref: "genericMessageFilter"
- - name: "withMetricConfig"
- args:
- - ref: "metricConfig"
- - id: "geoEnrichmentBolt"
- className: "org.apache.metron.enrichment.common.GenericEnrichmentBolt"
- configMethods:
- - name: "withEnrichmentTag"
- args: ["geo"]
- - name: "withAdapter"
- args:
- - ref: "geoEnrichmentAdapter"
- - name: "withMaxTimeRetain"
- args: [10]
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withOutputFieldName"
- args: ["fireeye"]
- - name: "withKeys"
- args:
- - ref: "geoKeys"
- - name: "withMetricConfiguration"
- args:
- - ref: "metricConfig"
- - id: "hostEnrichmentBolt"
- className: "org.apache.metron.enrichment.common.GenericEnrichmentBolt"
- configMethods:
- - name: "withEnrichmentTag"
- args: ["host"]
- - name: "withAdapter"
- args:
- - ref: "hostEnrichmentAdapter"
- - name: "withMaxTimeRetain"
- args: [10]
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withOutputFieldName"
- args: ["fireeye"]
- - name: "withKeys"
- args:
- - ref: "hostsKeys"
- - name: "withMetricConfiguration"
- args:
- - ref: "metricConfig"
+ - ref: "enrichments"
- id: "indexingBolt"
className: "org.apache.metron.indexing.TelemetryIndexingBolt"
configMethods:
@@ -311,6 +287,36 @@ bolts:
- name: "withMetricConfiguration"
args:
- ref: "metricConfig"
+ - id: "geoEnrichmentBolt"
+ className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+ configMethods:
+ - name: "withEnrichment"
+ args:
+ - ref: "geoEnrichment"
+ - name: "withMaxCacheSize"
+ args: [10000]
+ - name: "withMaxTimeRetain"
+ args: [10]
+ - id: "hostEnrichmentBolt"
+ className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+ configMethods:
+ - name: "withEnrichment"
+ args:
+ - ref: "hostEnrichment"
+ - name: "withMaxCacheSize"
+ args: [10000]
+ - name: "withMaxTimeRetain"
+ args: [10]
+ - id: "joinBolt"
+ className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
+ configMethods:
+ - name: "withEnrichments"
+ args:
+ - ref: "enrichments"
+ - name: "withMaxCacheSize"
+ args: [10000]
+ - name: "withMaxTimeRetain"
+ args: [10]
streams:
- name: "spout -> parser"
@@ -318,22 +324,43 @@ streams:
to: "parserBolt"
grouping:
type: SHUFFLE
+ - name: "parser -> host"
+ from: "parserBolt"
+ to: "hostEnrichmentBolt"
+ grouping:
+ streamId: "host"
+ type: FIELDS
+ args: ["key"]
- name: "parser -> geo"
from: "parserBolt"
to: "geoEnrichmentBolt"
grouping:
+ streamId: "geo"
type: FIELDS
+ args: ["key"]
+ - name: "parser -> join"
+ from: "parserBolt"
+ to: "joinBolt"
+ grouping:
streamId: "message"
+ type: FIELDS
args: ["key"]
- - name: "geo -> host"
+ - name: "geo -> join"
from: "geoEnrichmentBolt"
- to: "hostEnrichmentBolt"
+ to: "joinBolt"
grouping:
+ streamId: "geo"
type: FIELDS
- streamId: "message"
args: ["key"]
- - name: "host -> alerts"
+ - name: "host -> join"
from: "hostEnrichmentBolt"
+ to: "joinBolt"
+ grouping:
+ streamId: "host"
+ type: FIELDS
+ args: ["key"]
+ - name: "join -> alerts"
+ from: "joinBolt"
to: "alertsBolt"
grouping:
streamId: "message"
@@ -345,8 +372,8 @@ streams:
grouping:
streamId: "message"
type: SHUFFLE
- - name: "alerts -> indexing"
- from: "alertsBolt"
+ - name: "join -> indexing"
+ from: "joinBolt"
to: "indexingBolt"
grouping:
streamId: "message"
@@ -369,4 +396,4 @@ streams:
to: "errorIndexingBolt"
grouping:
streamId: "error"
- type: SHUFFLE
+ type: SHUFFLE
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/ise/local.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/ise/local.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/ise/local.yaml
index 9722413..fc02ac4 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/ise/local.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/ise/local.yaml
@@ -83,20 +83,11 @@ spouts:
bolts:
- id: "parserBolt"
- className: "org.apache.metron.parsing.TelemetryParserBolt"
+ className: "org.apache.metron.bolt.TelemetryParserBolt"
configMethods:
- name: "withMessageParser"
args:
- ref: "iseParser"
- - name: "withOutputFieldName"
- args:
- - "ise"
- - name: "withMessageFilter"
- args:
- - ref: "genericMessageFilter"
- - name: "withMetricConfig"
- args:
- - ref: "metricConfig"
- id: "indexingBolt"
className: "org.apache.metron.indexing.TelemetryIndexingBolt"
configMethods:
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/ise/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/ise/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/ise/remote.yaml
index d425269..88272ef 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/ise/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/ise/remote.yaml
@@ -97,20 +97,11 @@ spouts:
bolts:
- id: "parserBolt"
- className: "org.apache.metron.parsing.TelemetryParserBolt"
+ className: "org.apache.metron.bolt.TelemetryParserBolt"
configMethods:
- name: "withMessageParser"
args:
- ref: "iseParser"
- - name: "withOutputFieldName"
- args:
- - "ise"
- - name: "withMessageFilter"
- args:
- - ref: "genericMessageFilter"
- - name: "withMetricConfig"
- args:
- - ref: "metricConfig"
- id: "indexingBolt"
className: "org.apache.metron.indexing.TelemetryIndexingBolt"
configMethods:
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/lancope/local.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/lancope/local.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/lancope/local.yaml
index f83924a..7df3163 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/lancope/local.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/lancope/local.yaml
@@ -3,25 +3,58 @@ config:
topology.workers: 1
components:
- - id: "lancopeParser"
+ - id: "parser"
className: "org.apache.metron.parsing.parsers.BasicLancopeParser"
- - id: "genericMessageFilter"
- className: "org.apache.metron.filters.GenericMessageFilter"
- - id: "geoKeys"
+ - id: "jdbcConfig"
+ className: "org.apache.metron.enrichment.adapters.jdbc.MySqlConfig"
+ properties:
+ - name: "host"
+ value: "${mysql.ip}"
+ - name: "port"
+ value: ${mysql.port}
+ - name: "username"
+ value: "${mysql.username}"
+ - name: "password"
+ value: "${mysql.password}"
+ - name: "table"
+ value: "GEO"
+ - id: "geoEnrichmentAdapter"
+ className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
+ configMethods:
+ - name: "withJdbcConfig"
+ args:
+ - ref: "jdbcConfig"
+ - id: "geoEnrichment"
+ className: "org.apache.metron.domain.Enrichment"
+ properties:
+ - name: "name"
+ value: "geo"
+ - name: "fields"
+ value: ["ip_src_addr", "ip_dst_addr"]
+ - name: "adapter"
+ ref: "geoEnrichmentAdapter"
+ - id: "hostEnrichmentAdapter"
+ className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
+ constructorArgs:
+ - '${org.apache.metron.enrichment.host.known_hosts}'
+ - id: "hostEnrichment"
+ className: "org.apache.metron.domain.Enrichment"
+ properties:
+ - name: "name"
+ value: "host"
+ - name: "fields"
+ value: ["ip_src_addr", "ip_dst_addr"]
+ - name: "adapter"
+ ref: "hostEnrichmentAdapter"
+ - id: "enrichments"
className: "java.util.ArrayList"
configMethods:
- name: "add"
- args: ["ip_src_addr"]
+ args:
+ - ref: "geoEnrichment"
- name: "add"
- args: ["ip_dst_addr"]
- - id: "geoEnrichmentAdapter"
- className: "org.apache.metron.enrichment.adapters.geo.GeoMysqlAdapter"
- constructorArgs:
- - "${mysql.ip}"
- - ${mysql.port}
- - "${mysql.username}"
- - "${mysql.password}"
- - "GEO"
+ args:
+ - ref: "hostEnrichment"
- id: "indexAdapter"
className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter"
- id: "alertsConfig"
@@ -124,40 +157,14 @@ spouts:
bolts:
- id: "parserBolt"
- className: "org.apache.metron.parsing.TelemetryParserBolt"
+ className: "org.apache.metron.bolt.TelemetryParserBolt"
configMethods:
- name: "withMessageParser"
args:
- - ref: "lancopeParser"
- - name: "withOutputFieldName"
- args:
- - "lancope"
- - name: "withMessageFilter"
- args:
- - ref: "genericMessageFilter"
- - name: "withMetricConfig"
- args:
- - ref: "metricConfig"
- - id: "geoEnrichmentBolt"
- className: "org.apache.metron.enrichment.common.GenericEnrichmentBolt"
- configMethods:
- - name: "withEnrichmentTag"
- args: ["geo"]
- - name: "withAdapter"
- args:
- - ref: "geoEnrichmentAdapter"
- - name: "withMaxTimeRetain"
- args: [10]
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withOutputFieldName"
- args: ["lancope"]
- - name: "withKeys"
- args:
- - ref: "geoKeys"
- - name: "withMetricConfiguration"
+ - ref: "parser"
+ - name: "withEnrichments"
args:
- - ref: "metricConfig"
+ - ref: "enrichments"
- id: "indexingBolt"
className: "org.apache.metron.indexing.TelemetryIndexingBolt"
configMethods:
@@ -266,6 +273,36 @@ bolts:
- name: "withMetricConfiguration"
args:
- ref: "metricConfig"
+ - id: "geoEnrichmentBolt"
+ className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+ configMethods:
+ - name: "withEnrichment"
+ args:
+ - ref: "geoEnrichment"
+ - name: "withMaxCacheSize"
+ args: [10000]
+ - name: "withMaxTimeRetain"
+ args: [10]
+ - id: "hostEnrichmentBolt"
+ className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+ configMethods:
+ - name: "withEnrichment"
+ args:
+ - ref: "hostEnrichment"
+ - name: "withMaxCacheSize"
+ args: [10000]
+ - name: "withMaxTimeRetain"
+ args: [10]
+ - id: "joinBolt"
+ className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
+ configMethods:
+ - name: "withEnrichments"
+ args:
+ - ref: "enrichments"
+ - name: "withMaxCacheSize"
+ args: [10000]
+ - name: "withMaxTimeRetain"
+ args: [10]
streams:
- name: "spout -> parser"
@@ -273,19 +310,47 @@ streams:
to: "parserBolt"
grouping:
type: SHUFFLE
+ - name: "parser -> host"
+ from: "parserBolt"
+ to: "hostEnrichmentBolt"
+ grouping:
+ streamId: "host"
+ type: FIELDS
+ args: ["key"]
- name: "parser -> geo"
from: "parserBolt"
to: "geoEnrichmentBolt"
grouping:
+ streamId: "geo"
type: FIELDS
+ args: ["key"]
+ - name: "parser -> join"
+ from: "parserBolt"
+ to: "joinBolt"
+ grouping:
streamId: "message"
+ type: FIELDS
args: ["key"]
- - name: "geo -> alerts"
+ - name: "geo -> join"
from: "geoEnrichmentBolt"
- to: "alertsBolt"
+ to: "joinBolt"
+ grouping:
+ streamId: "geo"
+ type: FIELDS
+ args: ["key"]
+ - name: "host -> join"
+ from: "hostEnrichmentBolt"
+ to: "joinBolt"
grouping:
+ streamId: "host"
type: FIELDS
+ args: ["key"]
+ - name: "join -> alerts"
+ from: "joinBolt"
+ to: "alertsBolt"
+ grouping:
streamId: "message"
+ type: FIELDS
args: ["key"]
- name: "alerts -> alertsIndexing"
from: "alertsBolt"
@@ -293,8 +358,8 @@ streams:
grouping:
streamId: "message"
type: SHUFFLE
- - name: "alerts -> indexing"
- from: "alertsBolt"
+ - name: "join -> indexing"
+ from: "joinBolt"
to: "indexingBolt"
grouping:
streamId: "message"
@@ -317,4 +382,4 @@ streams:
to: "errorIndexingBolt"
grouping:
streamId: "error"
- type: SHUFFLE
+ type: SHUFFLE
\ No newline at end of file