You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/02/10 21:17:41 UTC
[3/4] incubator-metron git commit: METRON-33 Execute Enrichments in
Parallel (merrimanr via cestella) closes apache/incubator-metron#19
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/TelemetryParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/TelemetryParserBolt.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/TelemetryParserBolt.java
new file mode 100644
index 0000000..067e4b4
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/TelemetryParserBolt.java
@@ -0,0 +1,137 @@
+package org.apache.metron.bolt;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.metron.domain.Enrichment;
+import org.apache.metron.filters.GenericMessageFilter;
+import org.apache.metron.helpers.topology.ErrorGenerator;
+import org.apache.metron.parser.interfaces.MessageFilter;
+import org.apache.metron.parser.interfaces.MessageParser;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class TelemetryParserBolt extends
+ SplitBolt<JSONObject> {
+
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(TelemetryParserBolt.class);
+
+ protected MessageParser<JSONObject> parser;
+ protected MessageFilter<JSONObject> filter = new GenericMessageFilter();
+ protected List<Enrichment> enrichments = new ArrayList<>();
+
+ /**
+ * @param parser The parser class for parsing the incoming raw message byte
+ * stream
+ * @return Instance of this class
+ */
+ public TelemetryParserBolt withMessageParser(MessageParser<JSONObject>
+ parser) {
+ this.parser = parser;
+ return this;
+ }
+
+ /**
+ * @param filter A class for filtering/dropping incomming telemetry messages
+ * @return Instance of this class
+ */
+ public TelemetryParserBolt withMessageFilter(MessageFilter<JSONObject>
+ filter) {
+ this.filter = filter;
+ return this;
+ }
+
+ /**
+ * @param enrichments A class for sending tuples to enrichment bolt
+ * @return Instance of this class
+ */
+ public TelemetryParserBolt withEnrichments(List<Enrichment> enrichments) {
+ this.enrichments = enrichments;
+ return this;
+ }
+
+ @Override
+ public void prepare(Map map, TopologyContext topologyContext) {
+ LOG.info("[Metron] Preparing TelemetryParser Bolt...");
+ if (this.parser == null) {
+ throw new IllegalStateException("MessageParser must be specified");
+ }
+ parser.init();
+ }
+
+ @Override
+ public String getKey(Tuple tuple, JSONObject message) {
+ return UUID.randomUUID().toString();
+ }
+
+ @Override
+ public Set<String> getStreamIds() {
+ Set<String> streamIds = new HashSet<>();
+ for(Enrichment enrichment: enrichments) {
+ streamIds.add(enrichment.getName());
+ }
+ return streamIds;
+ }
+
+ @Override
+ public List<JSONObject> generateMessages(Tuple tuple) {
+ List<JSONObject> filteredMessages = new ArrayList<>();
+ byte[] originalMessage = tuple.getBinary(0);
+ try {
+ originalMessage = tuple.getBinary(0);
+ if (originalMessage == null || originalMessage.length == 0) {
+ throw new Exception("Invalid message length");
+ }
+ List<JSONObject> messages = parser.parse(originalMessage);
+ for (JSONObject message : messages) {
+ if (!parser.validate(message)) {
+ throw new Exception("Message validation failed: "
+ + message);
+ } else {
+ if (filter != null && filter.emitTuple(message)) {
+ filteredMessages.add(message);
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to parse telemetry message", e);
+ collector.fail(tuple);
+ JSONObject error = ErrorGenerator.generateErrorMessage(
+ "Parsing problem: " + new String(originalMessage), e);
+ collector.emit("error", new Values(error));
+ }
+ return filteredMessages;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Map<String, JSONObject> splitMessage(JSONObject message) {
+ Map<String, JSONObject> streamMessageMap = new HashMap<>();
+ for (Enrichment enrichment : enrichments) {
+ List<String> fields = enrichment.getFields();
+ if (fields != null && fields.size() > 0) {
+ JSONObject enrichmentObject = new JSONObject();
+ for (String field : fields) {
+ enrichmentObject.put(field, message.get(field));
+ }
+ streamMessageMap.put(enrichment.getName(), enrichmentObject);
+ }
+ }
+ return streamMessageMap;
+ }
+
+ @Override
+ public void declareOther(OutputFieldsDeclarer declarer) {
+
+ }
+
+ @Override
+ public void emitOther(Tuple tuple, List<JSONObject> messages) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/filters/BroMessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/filters/BroMessageFilter.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/filters/BroMessageFilter.java
index 3a86957..f04bb1f 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/filters/BroMessageFilter.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/filters/BroMessageFilter.java
@@ -1,16 +1,16 @@
package org.apache.metron.filters;
+import org.apache.commons.configuration.Configuration;
+import org.apache.metron.parser.interfaces.MessageFilter;
+import org.json.simple.JSONObject;
+
import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import org.apache.commons.configuration.Configuration;
-import org.json.simple.JSONObject;
-
-import org.apache.metron.parser.interfaces.MessageFilter;
-
-public class BroMessageFilter implements MessageFilter,Serializable {
+public class BroMessageFilter implements MessageFilter<JSONObject>,
+ Serializable {
/**
* Filter protocols based on whitelists and blacklists
@@ -21,14 +21,14 @@ public class BroMessageFilter implements MessageFilter,Serializable {
private final Set<String> _known_protocols;
/**
- * @param filter Commons configuration for reading properties files
+ * @param conf Commons configuration for reading properties files
* @param key Key in a JSON mesage where the protocol field is located
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public BroMessageFilter(Configuration conf, String key) {
_key = key;
- _known_protocols = new HashSet<String>();
+ _known_protocols = new HashSet<>();
List known_protocols = conf.getList("source.known.protocols");
_known_protocols.addAll(known_protocols);
}
@@ -39,6 +39,7 @@ public class BroMessageFilter implements MessageFilter,Serializable {
*/
public boolean emitTuple(JSONObject message) {
- return _known_protocols.contains(message.get(_key));
+ String protocol = (String) message.get(_key);
+ return _known_protocols.contains(protocol);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/filters/GenericMessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/filters/GenericMessageFilter.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/filters/GenericMessageFilter.java
index f136b2b..29413b7 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/filters/GenericMessageFilter.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/filters/GenericMessageFilter.java
@@ -1,15 +1,13 @@
package org.apache.metron.filters;
-import java.io.Serializable;
+import org.apache.metron.parser.interfaces.MessageFilter;
import org.json.simple.JSONObject;
-import org.apache.metron.parser.interfaces.MessageFilter;
+import java.io.Serializable;
-public class GenericMessageFilter implements MessageFilter,Serializable {
+public class GenericMessageFilter implements MessageFilter<JSONObject>,
+ Serializable {
- /**
- *
- */
private static final long serialVersionUID = 3626397212398318852L;
public boolean emitTuple(JSONObject message) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/AbstractParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/AbstractParserBolt.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/AbstractParserBolt.java
deleted file mode 100644
index 5bec434..0000000
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/AbstractParserBolt.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.parsing;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.base.BaseRichBolt;
-
-import com.codahale.metrics.Counter;
-import org.apache.metron.metrics.MetricReporter;
-import org.apache.metron.parser.interfaces.MessageFilter;
-import org.apache.metron.parser.interfaces.MessageParser;
-
-@SuppressWarnings("rawtypes")
-public abstract class AbstractParserBolt extends BaseRichBolt {
- /**
- *
- */
- private static final long serialVersionUID = -6710596708304282838L;
-
- protected static final Logger LOG = LoggerFactory
- .getLogger(AbstractParserBolt.class);
-
- protected OutputCollector _collector;
- protected MessageParser _parser;
-
- protected String OutputFieldName;
- protected MetricReporter _reporter;
- protected MessageFilter _filter;
-
- protected Counter ackCounter, emitCounter, failCounter;
-
- /**
- * Register counters to be reported to graphite
- * */
-
- protected void registerCounters() {
-
- String ackString = _parser.getClass().getSimpleName() + ".ack";
-
- String emitString = _parser.getClass().getSimpleName() + ".emit";
-
- String failString = _parser.getClass().getSimpleName() + ".fail";
-
- ackCounter = _reporter.registerCounter(ackString);
- emitCounter = _reporter.registerCounter(emitString);
- failCounter = _reporter.registerCounter(failString);
-
- }
-
- /**
- * Check to make sure all required variables have been initialized
- * */
-
- public final void prepare(Map conf, TopologyContext topologyContext,
- OutputCollector collector) {
- _collector = collector;
- if (this._parser == null)
- throw new IllegalStateException("MessageParser must be specified");
- if (this.OutputFieldName == null)
- throw new IllegalStateException("OutputFieldName must be specified");
-
- if (this._filter == null)
- throw new IllegalStateException("MessageFilter must be specified");
-
- try {
- doPrepare(conf, topologyContext, collector);
- } catch (IOException e) {
- LOG.error("Counld not initialize...");
- e.printStackTrace();
- }
- }
-
- /**
- * @param parser
- * The parser class for parsing the incoming raw message byte
- * stream
- * @return Instance of this class
- */
-
- public boolean checkForSchemaCorrectness(JSONObject message) {
- int correct = 0;
-
-
- if (!(message.containsKey("original_string"))) {
- LOG.trace("[Metron] Message does not have original_string: " + message);
- return false;
- } else if (!(message.containsKey("timestamp"))) {
- LOG.trace("[Metron] Message does not have timestamp: " + message);
- return false;
- } else {
- LOG.trace("[Metron] Message conforms to schema: "
- + message);
- return true;
- }
- }
-
- abstract void doPrepare(Map conf, TopologyContext topologyContext,
- OutputCollector collector) throws IOException;
-
- protected String generateTopologyKey(String src_ip, String dst_ip)
- throws Exception {
- try {
- if (dst_ip == null && src_ip == null)
- return "0";
-
- if (src_ip == null || src_ip.length() == 0)
- return dst_ip;
-
- if (dst_ip == null || dst_ip.length() == 0)
- return src_ip;
-
- double ip1 = Double.parseDouble(src_ip.replace(".", ""));
- double ip2 = Double.parseDouble(dst_ip.replace(".", ""));
-
- return String.valueOf(ip1 + ip2);
- } catch (Exception e) {
- return "0";
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/PcapParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/PcapParserBolt.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/PcapParserBolt.java
deleted file mode 100644
index 6f692d6..0000000
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/PcapParserBolt.java
+++ /dev/null
@@ -1,227 +0,0 @@
-package org.apache.metron.parsing;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-import org.json.simple.JSONObject;
-import org.json.simple.JSONValue;
-
-import org.apache.metron.helpers.topology.ErrorGenerator;
-import org.apache.metron.parsing.parsers.PcapParser;
-import org.apache.metron.pcap.PacketInfo;
-
-import backtype.storm.generated.Grouping;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-
-
-/**
- * The Class PcapParserBolt parses each input tuple and emits a new tuple which
- * contains the information (header_json,group_key,pcap_id, timestamp, pcap) as
- * defined in the output schema.
- *
- * @author sheetal
- * @version $Revision: 1.0 $
- */
-public class PcapParserBolt implements IRichBolt {
-
- /** The Constant serialVersionUID. */
- private static final long serialVersionUID = -1449830233777209255L;
-
- /** The Constant LOG. */
- private static final Logger LOG = Logger.getLogger(PcapParserBolt.class);
-
- /** The collector. */
- private OutputCollector collector = null;
-
- /** The conf. */
- @SuppressWarnings("rawtypes")
-private Map conf;
-
- /** The number of chars to use for shuffle grouping. */
- @SuppressWarnings("unused")
-private int numberOfCharsToUseForShuffleGrouping = 4;
-
- /** The divisor to convert nanos to expected time precision. */
- private long timePrecisionDivisor = 1L;
-
-
- // HBaseStreamPartitioner hBaseStreamPartitioner = null ;
-
- /**
- * The Constructor.
- */
- public PcapParserBolt() {
-
- }
-
- public PcapParserBolt withTsPrecision(String tsPrecision) {
- if (tsPrecision.equalsIgnoreCase("MILLI")) {
- //Convert nanos to millis
- LOG.info("Configured for MILLI, setting timePrecisionDivisor to 1000000L" );
- timePrecisionDivisor = 1000000L;
- } else if (tsPrecision.equalsIgnoreCase("MICRO")) {
- //Convert nanos to micro
- LOG.info("Configured for MICRO, setting timePrecisionDivisor to 1000L" );
- timePrecisionDivisor = 1000L;
- } else if (tsPrecision.equalsIgnoreCase("NANO")) {
- //Keep nano as is.
- LOG.info("Configured for NANO, setting timePrecisionDivisor to 1L" );
- timePrecisionDivisor = 1L;
- } else {
- LOG.info("bolt.parser.ts.precision not set. Default to NANO");
- timePrecisionDivisor = 1L;
- }
- return this;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm
- * .topology.OutputFieldsDeclarer)
- */
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declareStream("message", new Fields("key", "message"));
- //declarer.declareStream("pcap_index_stream", new Fields("index_json", "pcap_id"));
- declarer.declareStream("pcap_header_stream", new Fields("header_json", "pcap_id"));
- declarer.declareStream("pcap_data_stream", new Fields("pcap_id", "timestamp", "pcap"));
- declarer.declareStream("error", new Fields("error"));
-
- }
-
- /*
- * (non-Javadoc)
- *
- * @see backtype.storm.topology.IComponent#getComponentConfiguration()
- */
- /**
- * Method getComponentConfiguration.
- *
- *
- *
- * @return Map<String,Object> * @see
- * backtype.storm.topology.IComponent#getComponentConfiguration() * @see
- * backtype.storm.topology.IComponent#getComponentConfiguration() * @see
- * backtype.storm.topology.IComponent#getComponentConfiguration()
- */
-
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see backtype.storm.task.IBolt#prepare(java.util.Map,
- * backtype.storm.task.TopologyContext, backtype.storm.task.OutputCollector)
- */
-
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- this.conf = stormConf;
- if (conf.containsKey("bolt.parser.num.of.key.chars.to.use.for.shuffle.grouping")) {
- this.numberOfCharsToUseForShuffleGrouping = Integer.valueOf(conf.get(
- "bolt.parser.num.of.key.chars.to.use.for.shuffle.grouping").toString());
- }
-
- Grouping._Fields a;
-
-
- // hBaseStreamPartitioner = new HBaseStreamPartitioner(
- // conf.get("bolt.hbase.table.name").toString(),
- // 0,
- // Integer.parseInt(conf.get("bolt.hbase.partitioner.region.info.refresh.interval.mins").toString()))
- // ;
- // hBaseStreamPartitioner.prepare();
-
- }
-
- /**
- * Processes each input tuple and emits tuple which holds the following
- * information about a network packet : group_key : first 3 digits of the
- * pcap_id pcap_id : generated from network packet srcIp, dstIp, protocol,
- * srcPort, dstPort header_json : contains global header, ipv4 header, tcp
- * header(if the n/w protocol is tcp), udp header (if the n/w protocol is udp)
- * timestamp : the n/w packet capture timestamp pcap : tuple in binary array.
- *
- * @param input
- * Tuple
- * @see backtype.storm.task.IBolt#execute(Tuple)
- */
-
- @SuppressWarnings("unchecked")
-public void execute(Tuple input) {
-
- // LOG.debug("In PcapParserBolt bolt: Got tuple " + input);
- // LOG.debug("Got this pcap : " + new String(input.getBinary(0)));
-
- List<PacketInfo> packetInfoList = null;
- try {
- packetInfoList = PcapParser.parse(input.getBinary(0));
-
- if (packetInfoList != null) {
-
- for (PacketInfo packetInfo : packetInfoList) {
-
-
- String string_pcap = packetInfo.getJsonIndexDoc();
- Object obj=JSONValue.parse(string_pcap);
- JSONObject header=(JSONObject)obj;
-
- JSONObject message = new JSONObject();
- //message.put("key", packetInfo.getKey());
-
- message.put("message", header);
-
- collector.emit("message", new Values(packetInfo.getKey(), message));
-
- //collector.emit("pcap_index_stream", new Values(packetInfo.getJsonIndexDoc(), packetInfo.getKey()));
-
- collector.emit("pcap_header_stream", new Values(packetInfo.getJsonDoc(), packetInfo.getKey()));
- collector.emit("pcap_data_stream", new Values(packetInfo.getKey(),
- packetInfo.getPacketTimeInNanos() / timePrecisionDivisor,
- input.getBinary(0)));
-
- // collector.emit(new Values(packetInfo.getJsonDoc(), packetInfo
- // .getKey().substring(0, numberOfCharsToUseForShuffleGrouping),
- // packetInfo.getKey(), (packetInfo.getPacketHeader().getTsSec()
- // * secMultiplier + packetInfo.getPacketHeader().getTsUsec()
- // * microSecMultiplier), input.getBinary(0)));
- }
- }
-
- } catch (Exception e) {
- collector.fail(input);
- e.printStackTrace();
- LOG.error("Exception while processing tuple", e);
-
-
- JSONObject error = ErrorGenerator.generateErrorMessage(
- "Alerts problem: " + input.getBinary(0), e);
- collector.emit("error", new Values(error));
-
- return;
- }
- collector.ack(input);
-
- }
-
- /*
- * (non-Javadoc)
- *
- * @see backtype.storm.task.IBolt#cleanup()
- */
-
- public void cleanup() {
- // TODO Auto-generated method stub
-
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/TelemetryParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/TelemetryParserBolt.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/TelemetryParserBolt.java
deleted file mode 100644
index 8865d89..0000000
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/TelemetryParserBolt.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.parsing;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.commons.configuration.Configuration;
-import org.json.simple.JSONObject;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import org.apache.metron.helpers.topology.ErrorGenerator;
-import org.apache.metron.json.serialization.JSONEncoderHelper;
-import org.apache.metron.metrics.MetricReporter;
-import org.apache.metron.parser.interfaces.MessageFilter;
-import org.apache.metron.parser.interfaces.MessageParser;
-
-/**
- * Uses an adapter to parse a telemetry message from its native format into a
- * standard JSON. For a list of available adapter please check
- * org.apache.metron.parser.parsers. The input is a raw byte array and the output is a
- * JSONObject
- * <p>
- * The parsing conventions are as follows:
- * <p>
- * <ul>
- *
- * <li>ip_src_addr = source ip of a message
- * <li>ip_dst_addr = destination ip of a message
- * <li>ip_src_port = source port of a message
- * <li>ip_dst_port = destination port of a message
- * <li>protocol = protocol of a message
- * <ul>
- * <p>
- * <p>
- * If a message does not contain at least one of these variables it will be
- * failed
- **/
-
-@SuppressWarnings("rawtypes")
-public class TelemetryParserBolt extends AbstractParserBolt {
-
- private static final long serialVersionUID = -2647123143398352020L;
- private JSONObject metricConfiguration;
-
- /**
- * @param parser
- * The parser class for parsing the incoming raw message byte
- * stream
- * @return Instance of this class
- */
-
- public TelemetryParserBolt withMessageParser(MessageParser parser) {
- _parser = parser;
- return this;
- }
-
- /**
- * @param OutputFieldName
- * Field name of the output tuple
- * @return Instance of this class
- */
-
- public TelemetryParserBolt withOutputFieldName(String OutputFieldName) {
- this.OutputFieldName = OutputFieldName;
- return this;
- }
-
- /**
- * @param filter
- * A class for filtering/dropping incomming telemetry messages
- * @return Instance of this class
- */
-
- public TelemetryParserBolt withMessageFilter(MessageFilter filter) {
- this._filter = filter;
- return this;
- }
-
- /**
- * @param config
- * A class for generating custom metrics into graphite
- * @return Instance of this class
- */
-
- public TelemetryParserBolt withMetricConfig(Configuration config) {
- this.metricConfiguration = JSONEncoderHelper.getJSON(config
- .subset("org.apache.metron.metrics"));
- return this;
- }
-
- @Override
- void doPrepare(Map conf, TopologyContext topologyContext,
- OutputCollector collector) throws IOException {
-
- LOG.info("[Metron] Preparing TelemetryParser Bolt...");
-
- if (metricConfiguration != null) {
- _reporter = new MetricReporter();
- _reporter
- .initialize(metricConfiguration, TelemetryParserBolt.class);
- LOG.info("[Metron] Metric reporter is initialized");
- } else {
- LOG.info("[Metron] Metric reporter is not initialized");
- }
- this.registerCounters();
-
- if(_parser != null)
- _parser.init();
-
-
- }
-
- @SuppressWarnings("unchecked")
- public void execute(Tuple tuple) {
-
- LOG.trace("[Metron] Starting to process a new incoming tuple");
-
- byte[] original_message = null;
-
- try {
-
- original_message = tuple.getBinary(0);
-
- LOG.trace("[Metron] Starting the parsing process");
-
- if (original_message == null || original_message.length == 0) {
- LOG.error("Incomming tuple is null");
- throw new Exception("Invalid message length");
- }
-
- LOG.trace("[Metron] Attempting to transofrm binary message to JSON");
- JSONObject transformed_message = _parser.parse(original_message);
- LOG.debug("[Metron] Transformed Telemetry message: "
- + transformed_message);
-
- if (transformed_message == null || transformed_message.isEmpty())
- throw new Exception("Unable to turn binary message into a JSON");
-
- LOG.trace("[Metron] Checking if the transformed JSON conforms to the right schema");
-
- if (!checkForSchemaCorrectness(transformed_message)) {
- throw new Exception("Incorrect formatting on message: "
- + transformed_message);
- }
-
- else {
- LOG.trace("[Metron] JSON message has the right schema");
- boolean filtered = false;
-
- if (_filter != null) {
- if (!_filter.emitTuple(transformed_message)) {
- filtered = true;
- }
- }
-
- if (!filtered) {
- String ip1 = null;
-
- if (transformed_message.containsKey("ip_src_addr"))
- ip1 = transformed_message.get("ip_src_addr").toString();
-
- String ip2 = null;
-
- if (transformed_message.containsKey("ip_dst_addr"))
- ip2 = transformed_message.get("ip_dst_addr").toString();
-
- String key = generateTopologyKey(ip1, ip2);
-
- JSONObject new_message = new JSONObject();
- new_message.put("message", transformed_message);
- _collector.emit("message", new Values(key, new_message));
- }
-
- _collector.ack(tuple);
- if (metricConfiguration != null)
- ackCounter.inc();
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- LOG.error("Failed to parse telemetry message :" + original_message);
- _collector.fail(tuple);
-
- if (metricConfiguration != null)
- failCounter.inc();
-
- JSONObject error = ErrorGenerator.generateErrorMessage(
- "Parsing problem: " + new String(original_message),
- e);
- _collector.emit("error", new Values(error));
- }
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declearer) {
- declearer.declareStream("message", new Fields("key", "message"));
- declearer.declareStream("error", new Fields("message"));
-
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/AbstractParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/AbstractParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/AbstractParser.java
deleted file mode 100644
index bf8c596..0000000
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/AbstractParser.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.parsing.parsers;
-
-import java.io.Serializable;
-
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.metron.parser.interfaces.MessageParser;
-import org.apache.metron.parsing.AbstractParserBolt;
-
-@SuppressWarnings("serial")
-public abstract class AbstractParser implements MessageParser, Serializable {
-
- protected static final Logger _LOG = LoggerFactory
- .getLogger(AbstractParserBolt.class);
-
- public void initializeParser() {
- _LOG.debug("Initializing adapter...");
-
-
- }
-
- public void init() {
-
- }
-
-
- abstract public JSONObject parse(byte[] raw_message);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java
index 870991c..2b8b955 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java
@@ -22,25 +22,33 @@ import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
+
@SuppressWarnings("serial")
-public class BasicBroParser extends AbstractParser {
+public class BasicBroParser extends BasicParser {
protected static final Logger _LOG = LoggerFactory
.getLogger(BasicBroParser.class);
private JSONCleaner cleaner = new JSONCleaner();
+ @Override
+ public void init() {
+
+ }
+
@SuppressWarnings("unchecked")
- public JSONObject parse(byte[] msg) {
+ public List<JSONObject> parse(byte[] msg) {
_LOG.trace("[Metron] Starting to parse incoming message");
String rawMessage = null;
-
+ List<JSONObject> messages = new ArrayList<>();
try {
rawMessage = new String(msg, "UTF-8");
_LOG.trace("[Metron] Received message: " + rawMessage);
- JSONObject cleanedMessage = cleaner.Clean(rawMessage);
+ JSONObject cleanedMessage = cleaner.clean(rawMessage);
_LOG.debug("[Metron] Cleaned message: " + cleanedMessage);
if (cleanedMessage == null || cleanedMessage.isEmpty()) {
@@ -77,9 +85,10 @@ public class BasicBroParser extends AbstractParser {
replaceKey(payload, "timestamp", new String[]{ "ts" });
+ long timestamp = 0L;
if (payload.containsKey("timestamp")) {
try {
- long timestamp = Long.parseLong(payload.get("timestamp").toString());
+ timestamp = Long.parseLong(payload.get("timestamp").toString());
payload.put("timestamp", timestamp);
} catch (NumberFormatException nfe) {
_LOG.error(String.format("[Metron] timestamp is invalid: %s", payload.get("timestamp")));
@@ -102,8 +111,8 @@ public class BasicBroParser extends AbstractParser {
payload.put("protocol", key);
_LOG.debug("[Metron] Returning parsed message: " + payload);
-
- return payload;
+ messages.add(payload);
+ return messages;
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicFireEyeParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicFireEyeParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicFireEyeParser.java
index dbced1c..b5d1293 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicFireEyeParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicFireEyeParser.java
@@ -1,36 +1,28 @@
package org.apache.metron.parsing.parsers;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Serializable;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.parsing.utils.ParserUtils;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.text.ParseException;
-import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Locale;
-import java.util.Map;
-import java.util.TimeZone;
+import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.apache.commons.lang3.StringUtils;
-import org.json.simple.JSONObject;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
+public class BasicFireEyeParser extends BasicParser {
-import oi.thekraken.grok.api.Grok;
-import oi.thekraken.grok.api.Match;
-import oi.thekraken.grok.api.exception.GrokException;
+ private static final long serialVersionUID = 6328907550159134550L;
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(BasicFireEyeParser.class);
-public class BasicFireEyeParser extends AbstractParser implements Serializable {
- private static final long serialVersionUID = 6328907550159134550L;
- //String tsRegex = "(.*)([a-z][A-Z]+)\\s+(\\d+)\\s+(\\d+\\:\\d+\\:\\d+)\\s+(\\d+\\.\\d+\\.\\d+\\.\\d+)(.*)$";
String tsRegex ="([a-zA-Z]{3})\\s+(\\d+)\\s+(\\d+\\:\\d+\\:\\d+)\\s+(\\d+\\.\\d+\\.\\d+\\.\\d+)";
@@ -49,9 +41,14 @@ public class BasicFireEyeParser extends AbstractParser implements Serializable {
}
@Override
- public JSONObject parse(byte[] raw_message) {
- String toParse = "";
+ public void init() {
+ }
+
+ @Override
+ public List<JSONObject> parse(byte[] raw_message) {
+ String toParse = "";
+ List<JSONObject> messages = new ArrayList<>();
try {
toParse = new String(raw_message, "UTF-8");
@@ -80,8 +77,8 @@ public class BasicFireEyeParser extends AbstractParser implements Serializable {
JSONObject toReturn = parseMessage(toParse);
toReturn.put("timestamp", getTimeStamp(toParse,delimiter));
-
- return toReturn;
+ messages.add(toReturn);
+ return messages;
} catch (Exception e) {
e.printStackTrace();
@@ -90,37 +87,6 @@ public class BasicFireEyeParser extends AbstractParser implements Serializable {
}
- public static Long convertToEpoch(String m, String d, String ts,
- boolean adjust_timezone) throws ParseException {
- d = d.trim();
-
- if (d.length() <= 2)
- d = "0" + d;
-
- Date date = new SimpleDateFormat("MMM", Locale.ENGLISH).parse(m);
- Calendar cal = Calendar.getInstance();
- cal.setTime(date);
- String month = String.valueOf(cal.get(Calendar.MONTH));
- int year = Calendar.getInstance().get(Calendar.YEAR);
-
- if (month.length() <= 2)
- month = "0" + month;
-
- String coglomerated_ts = year + "-" + month + "-" + d + " " + ts;
-
- System.out.println(coglomerated_ts);
-
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
- if (adjust_timezone)
- sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
-
- date = sdf.parse(coglomerated_ts);
- long timeInMillisSinceEpoch = date.getTime();
-
- return timeInMillisSinceEpoch;
- }
-
private long getTimeStamp(String toParse,String delimiter) throws ParseException {
long ts = 0;
@@ -134,8 +100,8 @@ public class BasicFireEyeParser extends AbstractParser implements Serializable {
time = tsMatcher.group(3);
} else {
- _LOG.warn("Unable to find timestamp in message: " + toParse);
- ts = convertToEpoch(month, day, time, true);
+ LOG.warn("Unable to find timestamp in message: " + toParse);
+ ts = ParserUtils.convertToEpoch(month, day, time, true);
}
return ts;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicIseParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicIseParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicIseParser.java
index 416491d..d4cdbe5 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicIseParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicIseParser.java
@@ -17,34 +17,41 @@
package org.apache.metron.parsing.parsers;
-import java.io.StringReader;
-
+import com.esotericsoftware.minlog.Log;
+import org.apache.metron.ise.parser.ISEParser;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.esotericsoftware.minlog.Log;
-import org.apache.metron.ise.parser.ISEParser;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.List;
@SuppressWarnings("serial")
-public class BasicIseParser extends AbstractParser {
+public class BasicIseParser extends BasicParser {
- protected static final Logger _LOG = LoggerFactory
+ private static final Logger _LOG = LoggerFactory
.getLogger(BasicIseParser.class);
static final transient ISEParser _parser = new ISEParser("header=");
+ @Override
+ public void init() {
+
+ }
+
@SuppressWarnings("unchecked")
- public JSONObject parse(byte[] msg) {
+ @Override
+ public List<JSONObject> parse(byte[] msg) {
String raw_message = "";
-
+ List<JSONObject> messages = new ArrayList<>();
try {
raw_message = new String(msg, "UTF-8");
_LOG.debug("Received message: " + raw_message);
-
+
/*
- * Reinitialize Parser. It has the effect of calling the constructor again.
+ * Reinitialize Parser. It has the effect of calling the constructor again.
*/
_parser.ReInit(new StringReader("header=" + raw_message.trim()));
@@ -54,7 +61,7 @@ public class BasicIseParser extends AbstractParser {
String ip_src_port = (String) payload.get("Device Port");
String ip_dst_addr = (String) payload.get("DestinationIPAddress");
String ip_dst_port = (String) payload.get("DestinationPort");
-
+
/*
* Standard Fields for Metron.
*/
@@ -67,11 +74,8 @@ public class BasicIseParser extends AbstractParser {
payload.put("ip_dst_addr", ip_dst_addr);
if(ip_dst_port != null)
payload.put("ip_dst_port", ip_dst_port);
-
- JSONObject message = new JSONObject();
- //message.put("message", payload);
-
- return payload;
+ messages.add(payload);
+ return messages;
} catch (Exception e) {
Log.error(e.toString());
@@ -80,5 +84,10 @@ public class BasicIseParser extends AbstractParser {
return null;
}
+ @Override
+ public boolean validate(JSONObject message) {
+ return true;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicLancopeParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicLancopeParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicLancopeParser.java
index ab2a508..b8be87f 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicLancopeParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicLancopeParser.java
@@ -17,23 +17,35 @@
package org.apache.metron.parsing.parsers;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
@SuppressWarnings("serial")
-public class BasicLancopeParser extends AbstractParser {
+public class BasicLancopeParser extends BasicParser {
// Sample Lancope Message
// {"message":"<131>Jul 17 15:59:01 smc-01 StealthWatch[12365]: 2014-07-17T15:58:30Z 10.40.10.254 0.0.0.0 Minor High Concern Index The host's concern index has either exceeded the CI threshold or rapidly increased. Observed 36.55M points. Policy maximum allows up to 20M points.","@version":"1","@timestamp":"2014-07-17T15:56:05.992Z","type":"syslog","host":"10.122.196.201"}
- @SuppressWarnings("unchecked")
+ private static final Logger _LOG = LoggerFactory.getLogger(BasicLancopeParser
+ .class);
+
@Override
- public JSONObject parse(byte[] msg) {
+ public void init() {
- JSONObject payload = null;
+ }
+ //@SuppressWarnings("unchecked")
+ @Override
+ public List<JSONObject> parse(byte[] msg) {
+
+ JSONObject payload = null;
+ List<JSONObject> messages = new ArrayList<>();
try {
String raw_message = new String(msg, "UTF-8");
@@ -56,13 +68,15 @@ public class BasicLancopeParser extends AbstractParser {
Date date;
date = formatter.parse(fixed_date);
- payload.put("timestamp", date.getTime());
+ long timestamp = date.getTime();
+ payload.put("timestamp", timestamp);
payload.remove("@timestamp");
payload.remove("message");
payload.put("original_string", message);
- return payload;
+ messages.add(payload);
+ return messages;
} catch (Exception e) {
_LOG.error("Unable to parse message: " + payload.toJSONString());
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicLogstashParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicLogstashParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicLogstashParser.java
index 39fe207..32b7433 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicLogstashParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicLogstashParser.java
@@ -1,17 +1,22 @@
package org.apache.metron.parsing.parsers;
-import java.io.UnsupportedEncodingException;
-import java.text.SimpleDateFormat;
-
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-public class BasicLogstashParser extends AbstractParser {
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+
+public class BasicLogstashParser extends BasicParser {
@Override
- public JSONObject parse(byte[] raw_message) {
-
+ public void init() {
+
+ }
+
+ @Override
+ public List<JSONObject> parse(byte[] raw_message) {
+ List<JSONObject> messages = new ArrayList<>();
try {
/*
@@ -38,9 +43,10 @@ public class BasicLogstashParser extends AbstractParser {
rawJson = mutate(rawJson, "src_ip", "ip_src_addr");
// convert timestamp to milli since epoch
- rawJson.put("timestamp", LogstashToEpoch((String) rawJson.remove("@timestamp")));
-
- return rawJson;
+ long timestamp = LogstashToEpoch((String) rawJson.remove("@timestamp"));
+ rawJson.put("timestamp", timestamp);
+ messages.add(rawJson);
+ return messages;
} catch (Exception e) {
e.printStackTrace();
return null;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicPaloAltoFirewallParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicPaloAltoFirewallParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicPaloAltoFirewallParser.java
index dd87130..ac105d7 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicPaloAltoFirewallParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicPaloAltoFirewallParser.java
@@ -2,183 +2,190 @@ package org.apache.metron.parsing.parsers;
import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+public class BasicPaloAltoFirewallParser extends BasicParser {
+
+ private static final Logger _LOG = LoggerFactory.getLogger
+ (BasicPaloAltoFirewallParser.class);
+
+ private static final long serialVersionUID = 3147090149725343999L;
+ public static final String PaloAltoDomain = "palo_alto_domain";
+ public static final String ReceiveTime = "receive_time";
+ public static final String SerialNum = "serial_num";
+ public static final String Type = "type";
+ public static final String ThreatContentType = "threat_content_type";
+ public static final String ConfigVersion = "config_version";
+ public static final String GenerateTime = "generate_time";
+ public static final String SourceAddress = "source_address";
+ public static final String DestinationAddress = "destination_address";
+ public static final String NATSourceIP = "nat_source_ip";
+ public static final String NATDestinationIP = "nat_destination_ip";
+ public static final String Rule = "rule";
+ public static final String SourceUser = "source_user";
+ public static final String DestinationUser = "destination_user";
+ public static final String Application = "application";
+ public static final String VirtualSystem = "virtual_system";
+ public static final String SourceZone = "source_zone";
+ public static final String DestinationZone = "destination_zone";
+ public static final String InboundInterface = "inbound_interface";
+ public static final String OutboundInterface = "outbound_interface";
+ public static final String LogAction = "log_action";
+ public static final String TimeLogged = "time_logged";
+ public static final String SessionID = "session_id";
+ public static final String RepeatCount = "repeat_count";
+ public static final String SourcePort = "source_port";
+ public static final String DestinationPort = "destination_port";
+ public static final String NATSourcePort = "nats_source_port";
+ public static final String NATDestinationPort = "nats_destination_port";
+ public static final String Flags = "flags";
+ public static final String IPProtocol = "ip_protocol";
+ public static final String Action = "action";
+
+ //Threat
+ public static final String URL = "url";
+ public static final String HOST = "host";
+ public static final String ThreatContentName = "threat_content_name";
+ public static final String Category = "category";
+ public static final String Direction = "direction";
+ public static final String Seqno = "seqno";
+ public static final String ActionFlags = "action_flags";
+ public static final String SourceCountry = "source_country";
+ public static final String DestinationCountry = "destination_country";
+ public static final String Cpadding = "cpadding";
+ public static final String ContentType = "content_type";
+
+ //Traffic
+ public static final String Bytes = "content_type";
+ public static final String BytesSent = "content_type";
+ public static final String BytesReceived = "content_type";
+ public static final String Packets = "content_type";
+ public static final String StartTime = "content_type";
+ public static final String ElapsedTimeInSec = "content_type";
+ public static final String Padding = "content_type";
+ public static final String PktsSent = "pkts_sent";
+ public static final String PktsReceived = "pkts_received";
+
+
+ @Override
+ public void init() {
+
+ }
+
+ @SuppressWarnings({"unchecked", "unused"})
+ public List<JSONObject> parse(byte[] msg) {
+
+ JSONObject outputMessage = new JSONObject();
+ String toParse = "";
+ List<JSONObject> messages = new ArrayList<>();
+ try {
+
+ toParse = new String(msg, "UTF-8");
+ _LOG.debug("Received message: " + toParse);
+
+
+ parseMessage(toParse, outputMessage);
+ long timestamp = System.currentTimeMillis();
+ outputMessage.put("timestamp", System.currentTimeMillis());
+ outputMessage.put("ip_src_addr", outputMessage.remove("source_address"));
+ outputMessage.put("ip_src_port", outputMessage.remove("source_port"));
+ outputMessage.put("ip_dst_addr", outputMessage.remove("destination_address"));
+ outputMessage.put("ip_dst_port", outputMessage.remove("destination_port"));
+ outputMessage.put("protocol", outputMessage.remove("ip_protocol"));
+
+ outputMessage.put("original_string", toParse);
+ messages.add(outputMessage);
+ return messages;
+ } catch (Exception e) {
+ e.printStackTrace();
+ _LOG.error("Failed to parse: " + toParse);
+ return null;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void parseMessage(String message, JSONObject outputMessage) {
+
+ String[] tokens = message.split(",");
+
+ String type = tokens[3].trim();
+
+ //populate common objects
+ outputMessage.put(PaloAltoDomain, tokens[0].trim());
+ outputMessage.put(ReceiveTime, tokens[1].trim());
+ outputMessage.put(SerialNum, tokens[2].trim());
+ outputMessage.put(Type, type);
+ outputMessage.put(ThreatContentType, tokens[4].trim());
+ outputMessage.put(ConfigVersion, tokens[5].trim());
+ outputMessage.put(GenerateTime, tokens[6].trim());
+ outputMessage.put(SourceAddress, tokens[7].trim());
+ outputMessage.put(DestinationAddress, tokens[8].trim());
+ outputMessage.put(NATSourceIP, tokens[9].trim());
+ outputMessage.put(NATDestinationIP, tokens[10].trim());
+ outputMessage.put(Rule, tokens[11].trim());
+ outputMessage.put(SourceUser, tokens[12].trim());
+ outputMessage.put(DestinationUser, tokens[13].trim());
+ outputMessage.put(Application, tokens[14].trim());
+ outputMessage.put(VirtualSystem, tokens[15].trim());
+ outputMessage.put(SourceZone, tokens[16].trim());
+ outputMessage.put(DestinationZone, tokens[17].trim());
+ outputMessage.put(InboundInterface, tokens[18].trim());
+ outputMessage.put(OutboundInterface, tokens[19].trim());
+ outputMessage.put(LogAction, tokens[20].trim());
+ outputMessage.put(TimeLogged, tokens[21].trim());
+ outputMessage.put(SessionID, tokens[22].trim());
+ outputMessage.put(RepeatCount, tokens[23].trim());
+ outputMessage.put(SourcePort, tokens[24].trim());
+ outputMessage.put(DestinationPort, tokens[25].trim());
+ outputMessage.put(NATSourcePort, tokens[26].trim());
+ outputMessage.put(NATDestinationPort, tokens[27].trim());
+ outputMessage.put(Flags, tokens[28].trim());
+ outputMessage.put(IPProtocol, tokens[29].trim());
+ outputMessage.put(Action, tokens[30].trim());
+
+
+ if ("THREAT".equals(type.toUpperCase())) {
+ outputMessage.put(URL, tokens[31].trim());
+ try {
+ URL url = new URL(tokens[31].trim());
+ outputMessage.put(HOST, url.getHost());
+ } catch (MalformedURLException e) {
+ }
+ outputMessage.put(ThreatContentName, tokens[32].trim());
+ outputMessage.put(Category, tokens[33].trim());
+ outputMessage.put(Direction, tokens[34].trim());
+ outputMessage.put(Seqno, tokens[35].trim());
+ outputMessage.put(ActionFlags, tokens[36].trim());
+ outputMessage.put(SourceCountry, tokens[37].trim());
+ outputMessage.put(DestinationCountry, tokens[38].trim());
+ outputMessage.put(Cpadding, tokens[39].trim());
+ outputMessage.put(ContentType, tokens[40].trim());
+
+ } else {
+ outputMessage.put(Bytes, tokens[31].trim());
+ outputMessage.put(BytesSent, tokens[32].trim());
+ outputMessage.put(BytesReceived, tokens[33].trim());
+ outputMessage.put(Packets, tokens[34].trim());
+ outputMessage.put(StartTime, tokens[35].trim());
+ outputMessage.put(ElapsedTimeInSec, tokens[36].trim());
+ outputMessage.put(Category, tokens[37].trim());
+ outputMessage.put(Padding, tokens[38].trim());
+ outputMessage.put(Seqno, tokens[39].trim());
+ outputMessage.put(ActionFlags, tokens[40].trim());
+ outputMessage.put(SourceCountry, tokens[41].trim());
+ outputMessage.put(DestinationCountry, tokens[42].trim());
+ outputMessage.put(Cpadding, tokens[43].trim());
+ outputMessage.put(PktsSent, tokens[44].trim());
+ outputMessage.put(PktsReceived, tokens[45].trim());
+ }
+
+ }
-import org.apache.metron.parser.interfaces.MessageParser;
-
-public class BasicPaloAltoFirewallParser extends AbstractParser implements MessageParser{
-
- private static final long serialVersionUID = 3147090149725343999L;
- public static final String PaloAltoDomain = "palo_alto_domain";
- public static final String ReceiveTime = "receive_time";
- public static final String SerialNum = "serial_num";
- public static final String Type = "type";
- public static final String ThreatContentType = "threat_content_type";
- public static final String ConfigVersion = "config_version";
- public static final String GenerateTime = "generate_time";
- public static final String SourceAddress = "source_address";
- public static final String DestinationAddress = "destination_address";
- public static final String NATSourceIP = "nat_source_ip";
- public static final String NATDestinationIP = "nat_destination_ip";
- public static final String Rule = "rule";
- public static final String SourceUser = "source_user";
- public static final String DestinationUser = "destination_user";
- public static final String Application = "application";
- public static final String VirtualSystem = "virtual_system";
- public static final String SourceZone = "source_zone";
- public static final String DestinationZone = "destination_zone";
- public static final String InboundInterface = "inbound_interface";
- public static final String OutboundInterface = "outbound_interface";
- public static final String LogAction = "log_action";
- public static final String TimeLogged = "time_logged";
- public static final String SessionID = "session_id";
- public static final String RepeatCount = "repeat_count";
- public static final String SourcePort = "source_port";
- public static final String DestinationPort = "destination_port";
- public static final String NATSourcePort = "nats_source_port";
- public static final String NATDestinationPort = "nats_destination_port";
- public static final String Flags = "flags";
- public static final String IPProtocol = "ip_protocol";
- public static final String Action = "action";
-
- //Threat
- public static final String URL = "url";
- public static final String HOST = "host";
- public static final String ThreatContentName = "threat_content_name";
- public static final String Category = "category";
- public static final String Direction = "direction";
- public static final String Seqno = "seqno";
- public static final String ActionFlags = "action_flags";
- public static final String SourceCountry = "source_country";
- public static final String DestinationCountry = "destination_country";
- public static final String Cpadding = "cpadding";
- public static final String ContentType = "content_type";
-
- //Traffic
- public static final String Bytes = "content_type";
- public static final String BytesSent = "content_type";
- public static final String BytesReceived = "content_type";
- public static final String Packets = "content_type";
- public static final String StartTime = "content_type";
- public static final String ElapsedTimeInSec = "content_type";
- public static final String Padding = "content_type";
- public static final String PktsSent = "pkts_sent";
- public static final String PktsReceived = "pkts_received";
-
-
- @SuppressWarnings({ "unchecked", "unused" })
- public JSONObject parse(byte[] msg) {
-
- JSONObject outputMessage = new JSONObject();
- String toParse = "";
-
- try {
-
- toParse = new String(msg, "UTF-8");
- _LOG.debug("Received message: " + toParse);
-
-
- parseMessage(toParse,outputMessage);
-
- outputMessage.put("timestamp", System.currentTimeMillis());
- outputMessage.put("ip_src_addr", outputMessage.remove("source_address"));
- outputMessage.put("ip_src_port", outputMessage.remove("source_port"));
- outputMessage.put("ip_dst_addr", outputMessage.remove("destination_address"));
- outputMessage.put("ip_dst_port", outputMessage.remove("destination_port"));
- outputMessage.put("protocol", outputMessage.remove("ip_protocol"));
-
- outputMessage.put("original_string", toParse);
- return outputMessage;
- } catch (Exception e) {
- e.printStackTrace();
- _LOG.error("Failed to parse: " + toParse);
- return null;
- }
- }
-
- @SuppressWarnings("unchecked")
- private void parseMessage(String message,JSONObject outputMessage) {
-
- String[] tokens = message.split(",");
-
- String type = tokens[3].trim();
-
- //populate common objects
- outputMessage.put(PaloAltoDomain, tokens[0].trim());
- outputMessage.put(ReceiveTime, tokens[1].trim());
- outputMessage.put(SerialNum, tokens[2].trim());
- outputMessage.put(Type, type);
- outputMessage.put(ThreatContentType, tokens[4].trim());
- outputMessage.put(ConfigVersion, tokens[5].trim());
- outputMessage.put(GenerateTime, tokens[6].trim());
- outputMessage.put(SourceAddress, tokens[7].trim());
- outputMessage.put(DestinationAddress, tokens[8].trim());
- outputMessage.put(NATSourceIP, tokens[9].trim());
- outputMessage.put(NATDestinationIP, tokens[10].trim());
- outputMessage.put(Rule, tokens[11].trim());
- outputMessage.put(SourceUser, tokens[12].trim());
- outputMessage.put(DestinationUser, tokens[13].trim());
- outputMessage.put(Application, tokens[14].trim());
- outputMessage.put(VirtualSystem, tokens[15].trim());
- outputMessage.put(SourceZone, tokens[16].trim());
- outputMessage.put(DestinationZone, tokens[17].trim());
- outputMessage.put(InboundInterface, tokens[18].trim());
- outputMessage.put(OutboundInterface, tokens[19].trim());
- outputMessage.put(LogAction, tokens[20].trim());
- outputMessage.put(TimeLogged, tokens[21].trim());
- outputMessage.put(SessionID, tokens[22].trim());
- outputMessage.put(RepeatCount, tokens[23].trim());
- outputMessage.put(SourcePort, tokens[24].trim());
- outputMessage.put(DestinationPort, tokens[25].trim());
- outputMessage.put(NATSourcePort, tokens[26].trim());
- outputMessage.put(NATDestinationPort, tokens[27].trim());
- outputMessage.put(Flags, tokens[28].trim());
- outputMessage.put(IPProtocol, tokens[29].trim());
- outputMessage.put(Action, tokens[30].trim());
-
-
- if("THREAT".equals(type.toUpperCase())) {
- outputMessage.put(URL, tokens[31].trim());
- try {
- URL url = new URL(tokens[31].trim());
- outputMessage.put(HOST, url.getHost());
- } catch (MalformedURLException e) {
- }
- outputMessage.put(ThreatContentName, tokens[32].trim());
- outputMessage.put(Category, tokens[33].trim());
- outputMessage.put(Direction, tokens[34].trim());
- outputMessage.put(Seqno, tokens[35].trim());
- outputMessage.put(ActionFlags, tokens[36].trim());
- outputMessage.put(SourceCountry, tokens[37].trim());
- outputMessage.put(DestinationCountry, tokens[38].trim());
- outputMessage.put(Cpadding, tokens[39].trim());
- outputMessage.put(ContentType, tokens[40].trim());
-
- }
- else
- {
- outputMessage.put(Bytes, tokens[31].trim());
- outputMessage.put(BytesSent, tokens[32].trim());
- outputMessage.put(BytesReceived, tokens[33].trim());
- outputMessage.put(Packets, tokens[34].trim());
- outputMessage.put(StartTime, tokens[35].trim());
- outputMessage.put(ElapsedTimeInSec, tokens[36].trim());
- outputMessage.put(Category, tokens[37].trim());
- outputMessage.put(Padding, tokens[38].trim());
- outputMessage.put(Seqno, tokens[39].trim());
- outputMessage.put(ActionFlags, tokens[40].trim());
- outputMessage.put(SourceCountry, tokens[41].trim());
- outputMessage.put(DestinationCountry, tokens[42].trim());
- outputMessage.put(Cpadding, tokens[43].trim());
- outputMessage.put(PktsSent, tokens[44].trim());
- outputMessage.put(PktsReceived, tokens[45].trim());
- }
-
- }
-
-
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicParser.java
new file mode 100644
index 0000000..9a3b983
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicParser.java
@@ -0,0 +1,54 @@
+package org.apache.metron.parsing.parsers;
+
+import org.apache.metron.parser.interfaces.MessageParser;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+public abstract class BasicParser implements
+ MessageParser<JSONObject>,
+ Serializable {
+
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(BasicParser.class);
+
+ @Override
+ public boolean validate(JSONObject message) {
+ JSONObject value = message;
+ if (!(value.containsKey("original_string"))) {
+ LOG.trace("[Metron] Message does not have original_string: " + message);
+ return false;
+ } else if (!(value.containsKey("timestamp"))) {
+ LOG.trace("[Metron] Message does not have timestamp: " + message);
+ return false;
+ } else {
+ LOG.trace("[Metron] Message conforms to schema: "
+ + message);
+ return true;
+ }
+ }
+
+ public String getKey(JSONObject value) {
+ try {
+ String ipSrcAddr = null;
+ String ipDstAddr = null;
+ if (value.containsKey("ip_src_addr"))
+ ipSrcAddr = value.get("ip_src_addr").toString();
+ if (value.containsKey("ip_dst_addr"))
+ ipDstAddr = value.get("ip_dst_addr").toString();
+ if (ipSrcAddr == null && ipDstAddr == null)
+ return "0";
+ if (ipSrcAddr == null || ipSrcAddr.length() == 0)
+ return ipDstAddr;
+ if (ipDstAddr == null || ipDstAddr.length() == 0)
+ return ipSrcAddr;
+ double ip1 = Double.parseDouble(ipSrcAddr.replace(".", ""));
+ double ip2 = Double.parseDouble(ipDstAddr.replace(".", ""));
+ return String.valueOf(ip1 + ip2);
+ } catch (Exception e) {
+ return "0";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java
index 91fe312..75cd92c 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java
@@ -1,16 +1,21 @@
package org.apache.metron.parsing.parsers;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-
-import org.apache.metron.parser.interfaces.MessageParser;
import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
@SuppressWarnings("serial")
-public class BasicSnortParser extends AbstractParser implements MessageParser {
+public class BasicSnortParser extends BasicParser {
+
+ private static final Logger _LOG = LoggerFactory
+ .getLogger(BasicSnortParser.class);
/**
* The default field names for Snort Alerts.
@@ -51,9 +56,15 @@ public class BasicSnortParser extends AbstractParser implements MessageParser {
private String recordDelimiter = ",";
@Override
- public JSONObject parse(byte[] rawMessage) {
+ public void init() {
+
+ }
+
+ @Override
+ public List<JSONObject> parse(byte[] rawMessage) {
JSONObject jsonMessage = new JSONObject();
+ List<JSONObject> messages = new ArrayList<>();
try {
// snort alerts expected as csv records
String csvMessage = new String(rawMessage, "UTF-8");
@@ -63,7 +74,7 @@ public class BasicSnortParser extends AbstractParser implements MessageParser {
if (records.length != fieldNames.length) {
throw new IllegalArgumentException("Unexpected number of fields, expected: " + fieldNames.length + " got: " + records.length);
}
-
+ long timestamp = 0L;
// build the json record from each field
for (int i=0; i<records.length; i++) {
@@ -73,7 +84,8 @@ public class BasicSnortParser extends AbstractParser implements MessageParser {
if("timestamp".equals(field)) {
// convert the timestamp to epoch
- jsonMessage.put("timestamp", toEpoch(record));
+ timestamp = toEpoch(record);
+ jsonMessage.put("timestamp", timestamp);
} else {
jsonMessage.put(field, record);
@@ -82,7 +94,7 @@ public class BasicSnortParser extends AbstractParser implements MessageParser {
// add original msg; required by 'checkForSchemaCorrectness'
jsonMessage.put("original_string", csvMessage);
-
+ messages.add(jsonMessage);
} catch (Exception e) {
_LOG.error("unable to parse message: " + rawMessage);
@@ -90,13 +102,13 @@ public class BasicSnortParser extends AbstractParser implements MessageParser {
return null;
}
- return jsonMessage;
+ return messages;
}
/**
* Parses Snort's default date-time representation and
* converts to epoch.
- * @param datetime Snort's default date-time as String '01/27-16:01:04.877970'
+ * @param snortDatetime Snort's default date-time as String '01/27-16:01:04.877970'
* @return epoch time
* @throws java.text.ParseException
*/
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSourcefireParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSourcefireParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSourcefireParser.java
index 3beed77..aa25511 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSourcefireParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSourcefireParser.java
@@ -17,15 +17,20 @@
package org.apache.metron.parsing.parsers;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import org.apache.metron.parser.interfaces.MessageParser;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
@SuppressWarnings("serial")
-public class BasicSourcefireParser extends AbstractParser implements MessageParser{
+public class BasicSourcefireParser extends BasicParser {
+
+ private static final Logger _LOG = LoggerFactory
+ .getLogger(BasicSourcefireParser.class);
public static final String hostkey = "host";
String domain_name_regex = "([^\\.]+)\\.([a-z]{2}|[a-z]{3}|([a-z]{2}\\.[a-z]{2}))$";
@@ -34,12 +39,17 @@ public class BasicSourcefireParser extends AbstractParser implements MessagePars
Pattern sidPattern = Pattern.compile(sidRegex);
Pattern pattern = Pattern.compile(domain_name_regex);
+ @Override
+ public void init() {
+
+ }
+
@SuppressWarnings({ "unchecked", "unused" })
- public JSONObject parse(byte[] msg) {
+ public List<JSONObject> parse(byte[] msg) {
JSONObject payload = new JSONObject();
String toParse = "";
-
+ List<JSONObject> messages = new ArrayList<>();
try {
toParse = new String(msg, "UTF-8");
@@ -80,8 +90,8 @@ public class BasicSourcefireParser extends AbstractParser implements MessagePars
payload.put("ip_dst_addr", dest);
dest_ip = dest;
}
-
- payload.put("timestamp", System.currentTimeMillis());
+ long timestamp = System.currentTimeMillis();
+ payload.put("timestamp", timestamp);
Matcher sidMatcher = sidPattern.matcher(toParse);
String originalString = null;
@@ -95,8 +105,8 @@ public class BasicSourcefireParser extends AbstractParser implements MessagePars
}
payload.put("original_string", originalString);
payload.put("signature_id", signatureId);
-
- return payload;
+ messages.add(payload);
+ return messages;
} catch (Exception e) {
e.printStackTrace();
_LOG.error("Failed to parse: " + toParse);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicYafParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicYafParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicYafParser.java
index 48823e4..41ed29d 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicYafParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicYafParser.java
@@ -14,14 +14,15 @@ import java.io.IOException;
import java.io.InputStream;
import java.text.ParseException;
import java.text.SimpleDateFormat;
-import java.util.Calendar;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.List;
import java.util.Map;
/**
* Created by rmerriman on 1/27/16.
*/
-public class BasicYafParser extends AbstractParser {
+public class BasicYafParser extends BasicParser {
/**
@@ -104,17 +105,17 @@ public class BasicYafParser extends AbstractParser {
}
@SuppressWarnings("unchecked")
- public JSONObject parse(byte[] msg) {
+ public List<JSONObject> parse(byte[] msg) {
//return parseManual(msg);
return parseWithGrok(msg);
}
- private JSONObject parseWithGrok(byte[] msg) {
+ private List<JSONObject> parseWithGrok(byte[] msg) {
_LOG.trace("[Metron] Starting to parse incoming message with grok");
JSONObject jsonMessage = new JSONObject();
+ List<JSONObject> messages = new ArrayList<>();
try {
String rawMessage = new String(msg, "UTF-8");
- System.out.println("Received message: " + rawMessage);
Match gm = grok.match(rawMessage);
gm.captures();
@@ -123,8 +124,10 @@ public class BasicYafParser extends AbstractParser {
jsonMessage.put("original_string", rawMessage);
String startTime = (String) grokMap.get("start_time");
+ long timestamp = 0L;
if (startTime != null) {
- jsonMessage.put("timestamp", toEpoch(startTime));
+ timestamp = toEpoch(startTime);
+ jsonMessage.put("timestamp", timestamp);
} else {
jsonMessage.put("timestamp", "0");
}
@@ -136,11 +139,12 @@ public class BasicYafParser extends AbstractParser {
}
jsonMessage.remove("YAF_DELIMITED");
jsonMessage.remove("start_time");
+ messages.add(jsonMessage);
} catch (Exception e) {
e.printStackTrace();
return null;
}
- return jsonMessage;
+ return messages;
}
private JSONObject parseManual(byte[] msg) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokAsaParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokAsaParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokAsaParser.java
index 1238899..9f526b4 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokAsaParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokAsaParser.java
@@ -1,27 +1,20 @@
package org.apache.metron.parsing.parsers;
+import oi.thekraken.grok.api.Grok;
+import oi.thekraken.grok.api.Match;
+import oi.thekraken.grok.api.exception.GrokException;
+import org.apache.commons.io.IOUtils;
+import org.json.simple.JSONObject;
+
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.Serializable;
import java.text.ParseException;
import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Locale;
-import java.util.Map;
-import java.util.TimeZone;
-
-import oi.thekraken.grok.api.Grok;
-import oi.thekraken.grok.api.Match;
-import oi.thekraken.grok.api.exception.GrokException;
-
-import org.apache.commons.io.IOUtils;
-import org.json.simple.JSONObject;
+import java.util.*;
-public class GrokAsaParser extends AbstractParser implements Serializable {
+public class GrokAsaParser extends BasicParser {
private static final long serialVersionUID = 945353287115350798L;
private transient Grok grok;
@@ -214,11 +207,11 @@ public class GrokAsaParser extends AbstractParser implements Serializable {
}
@Override
- public JSONObject parse(byte[] raw_message) {
+ public List<JSONObject> parse(byte[] raw_message) {
String toParse = "";
JSONObject toReturn;
-
+ List<JSONObject> messages = new ArrayList<>();
try {
toParse = new String(raw_message, "UTF-8");
@@ -240,11 +233,11 @@ public class GrokAsaParser extends AbstractParser implements Serializable {
toReturn.putAll(response);
//System.out.println("*******I MAPPED: " + toReturn);
-
- toReturn.put("timestamp", convertToEpoch(toReturn.get("MONTH").toString(), toReturn
- .get("MONTHDAY").toString(),
- toReturn.get("TIME").toString(),
- true));
+ long timestamp = convertToEpoch(toReturn.get("MONTH").toString(), toReturn
+ .get("MONTHDAY").toString(),
+ toReturn.get("TIME").toString(),
+ true);
+ toReturn.put("timestamp", timestamp);
toReturn.remove("MONTHDAY");
toReturn.remove("TIME");
@@ -255,8 +248,8 @@ public class GrokAsaParser extends AbstractParser implements Serializable {
toReturn.put("ip_src_addr", toReturn.remove("IPORHOST"));
toReturn.put("original_string", toParse);
-
- return toReturn;
+ messages.add(toReturn);
+ return messages;
} catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokSourcefireParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokSourcefireParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokSourcefireParser.java
index 98dcffa..92fe6b6 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokSourcefireParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokSourcefireParser.java
@@ -1,15 +1,16 @@
package org.apache.metron.parsing.parsers;
-import java.net.URL;
-
-import oi.thekraken.grok.api.Match;
import oi.thekraken.grok.api.Grok;
+import oi.thekraken.grok.api.Match;
import oi.thekraken.grok.api.exception.GrokException;
-
import org.json.simple.JSONObject;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
-public class GrokSourcefireParser extends AbstractParser{
+public class GrokSourcefireParser extends BasicParser {
/**
*
@@ -38,14 +39,19 @@ public class GrokSourcefireParser extends AbstractParser{
grok = Grok.create(filepath);
grok.compile("%{"+pattern+"}");
}
-
+
@Override
- public JSONObject parse(byte[] raw_message) {
+ public void init() {
+
+ }
+
+ @Override
+ public List<JSONObject> parse(byte[] raw_message) {
JSONObject payload = new JSONObject();
String toParse = "";
JSONObject toReturn;
-
+ List<JSONObject> messages = new ArrayList<>();
try {
toParse = new String(raw_message, "UTF-8");
@@ -60,7 +66,8 @@ public class GrokSourcefireParser extends AbstractParser{
proto = proto.replace("{", "");
proto = proto.replace("}", "");
toReturn.put("protocol", proto);
- return toReturn;
+ messages.add(toReturn);
+ return messages;
}
catch(Exception e)
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokUtils.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokUtils.java
deleted file mode 100644
index b8a6f6b..0000000
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokUtils.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.metron.parsing.parsers;
-import java.io.Serializable;
-
-import com.google.code.regexp.Pattern;
-
-public class GrokUtils implements Serializable {
-
- private static final long serialVersionUID = 7465176887422419286L;
- /**
- * Extract Grok patter like %{FOO} to FOO, Also Grok pattern with semantic.
- */
- public static final Pattern GROK_PATTERN = Pattern.compile(
- "%\\{" +
- "(?<name>" +
- "(?<pattern>[A-z0-9]+)" +
- "(?::(?<subname>[A-z0-9_:;\\/\\s\\.]+))?" +
- ")" +
- "(?:=(?<definition>" +
- "(?:" +
- "(?:[^{}]+|\\.+)+" +
- ")+" +
- ")" +
- ")?" +
- "\\}");
-
- }
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/JSONCleaner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/JSONCleaner.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/JSONCleaner.java
index 1ab12bc..8c69a4d 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/JSONCleaner.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/JSONCleaner.java
@@ -28,7 +28,7 @@ public class JSONCleaner implements Serializable {
* Takes a json String as input and removes any Special Chars (^ a-z A-Z 0-9) in the keys
*/
@SuppressWarnings({"unchecked","rawtypes"})
- public JSONObject Clean(String jsonString) throws ParseException
+ public JSONObject clean(String jsonString) throws ParseException
{
JSONParser parser = new JSONParser();
@@ -54,7 +54,7 @@ public class JSONCleaner implements Serializable {
String jsonText = "{\"first_1\": 123, \"second\": [4, 5, 6], \"third\": 789}";
JSONCleaner cleaner = new JSONCleaner();
try {
- //cleaner.Clean(jsonText);
+ //cleaner.clean(jsonText);
Map obj=new HashMap();
obj.put("name","foo");
obj.put("num",new Integer(100));
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/MetronGrok.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/MetronGrok.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/MetronGrok.java
index 44d9312..e3843d5 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/MetronGrok.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/MetronGrok.java
@@ -13,6 +13,7 @@ import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.parsing.utils.GrokUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/ParserUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/ParserUtils.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/ParserUtils.java
deleted file mode 100644
index a63889d..0000000
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/ParserUtils.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.metron.parsing.parsers;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.commons.io.IOUtils;
-
-public class ParserUtils {
-
- public static final String PREFIX = "stream2file";
- public static final String SUFFIX = ".tmp";
-
- public static File stream2file(InputStream in) throws IOException {
- final File tempFile = File.createTempFile(PREFIX, SUFFIX);
- tempFile.deleteOnExit();
- try (FileOutputStream out = new FileOutputStream(tempFile)) {
- IOUtils.copy(in, out);
- }
- return tempFile;
- }
-}