You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/02/10 21:17:41 UTC

[3/4] incubator-metron git commit: METRON-33 Execute Enrichments in Parallel (merrimanr via cestella) closes apache/incubator-metron#19

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/TelemetryParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/TelemetryParserBolt.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/TelemetryParserBolt.java
new file mode 100644
index 0000000..067e4b4
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/TelemetryParserBolt.java
@@ -0,0 +1,137 @@
+package org.apache.metron.bolt;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.metron.domain.Enrichment;
+import org.apache.metron.filters.GenericMessageFilter;
+import org.apache.metron.helpers.topology.ErrorGenerator;
+import org.apache.metron.parser.interfaces.MessageFilter;
+import org.apache.metron.parser.interfaces.MessageParser;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class TelemetryParserBolt extends
+        SplitBolt<JSONObject> {
+
+  protected static final Logger LOG = LoggerFactory
+          .getLogger(TelemetryParserBolt.class);
+
+  protected MessageParser<JSONObject> parser;
+  protected MessageFilter<JSONObject> filter = new GenericMessageFilter();
+  protected List<Enrichment> enrichments = new ArrayList<>();
+
+  /**
+   * @param parser The parser class for parsing the incoming raw message byte
+   *               stream
+   * @return Instance of this class
+   */
+  public TelemetryParserBolt withMessageParser(MessageParser<JSONObject>
+                                                      parser) {
+    this.parser = parser;
+    return this;
+  }
+
+  /**
+   * @param filter A class for filtering/dropping incomming telemetry messages
+   * @return Instance of this class
+   */
+  public TelemetryParserBolt withMessageFilter(MessageFilter<JSONObject>
+                                                      filter) {
+    this.filter = filter;
+    return this;
+  }
+
+  /**
+   * @param enrichments A class for sending tuples to enrichment bolt
+   * @return Instance of this class
+   */
+  public TelemetryParserBolt withEnrichments(List<Enrichment> enrichments) {
+    this.enrichments = enrichments;
+    return this;
+  }
+
+  @Override
+  public void prepare(Map map, TopologyContext topologyContext) {
+    LOG.info("[Metron] Preparing TelemetryParser Bolt...");
+    if (this.parser == null) {
+      throw new IllegalStateException("MessageParser must be specified");
+    }
+    parser.init();
+  }
+
+  @Override
+  public String getKey(Tuple tuple, JSONObject message) {
+    return UUID.randomUUID().toString();
+  }
+
+  @Override
+  public Set<String> getStreamIds() {
+    Set<String> streamIds = new HashSet<>();
+    for(Enrichment enrichment: enrichments) {
+      streamIds.add(enrichment.getName());
+    }
+    return streamIds;
+  }
+
+  @Override
+  public List<JSONObject> generateMessages(Tuple tuple) {
+    List<JSONObject> filteredMessages = new ArrayList<>();
+    byte[] originalMessage = tuple.getBinary(0);
+    try {
+      originalMessage = tuple.getBinary(0);
+      if (originalMessage == null || originalMessage.length == 0) {
+        throw new Exception("Invalid message length");
+      }
+      List<JSONObject> messages = parser.parse(originalMessage);
+      for (JSONObject message : messages) {
+        if (!parser.validate(message)) {
+          throw new Exception("Message validation failed: "
+                  + message);
+        } else {
+          if (filter != null && filter.emitTuple(message)) {
+            filteredMessages.add(message);
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to parse telemetry message", e);
+      collector.fail(tuple);
+      JSONObject error = ErrorGenerator.generateErrorMessage(
+              "Parsing problem: " + new String(originalMessage), e);
+      collector.emit("error", new Values(error));
+    }
+    return filteredMessages;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Map<String, JSONObject> splitMessage(JSONObject message) {
+    Map<String, JSONObject> streamMessageMap = new HashMap<>();
+    for (Enrichment enrichment : enrichments) {
+      List<String> fields = enrichment.getFields();
+      if (fields != null && fields.size() > 0) {
+        JSONObject enrichmentObject = new JSONObject();
+        for (String field : fields) {
+          enrichmentObject.put(field, message.get(field));
+        }
+        streamMessageMap.put(enrichment.getName(), enrichmentObject);
+      }
+    }
+    return streamMessageMap;
+  }
+
+  @Override
+  public void declareOther(OutputFieldsDeclarer declarer) {
+
+  }
+
+  @Override
+  public void emitOther(Tuple tuple, List<JSONObject> messages) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/filters/BroMessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/filters/BroMessageFilter.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/filters/BroMessageFilter.java
index 3a86957..f04bb1f 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/filters/BroMessageFilter.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/filters/BroMessageFilter.java
@@ -1,16 +1,16 @@
 package org.apache.metron.filters;
 
+import org.apache.commons.configuration.Configuration;
+import org.apache.metron.parser.interfaces.MessageFilter;
+import org.json.simple.JSONObject;
+
 import java.io.Serializable;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.commons.configuration.Configuration;
-import org.json.simple.JSONObject;
-
-import org.apache.metron.parser.interfaces.MessageFilter;
-
-public class BroMessageFilter implements MessageFilter,Serializable {
+public class BroMessageFilter implements MessageFilter<JSONObject>,
+				Serializable {
 
 	/**
 	 * Filter protocols based on whitelists and blacklists
@@ -21,14 +21,14 @@ public class BroMessageFilter implements MessageFilter,Serializable {
 	private final Set<String> _known_protocols;
 
 	 /**
-	 * @param  filter  Commons configuration for reading properties files
+	 * @param  conf  Commons configuration for reading properties files
 	 * @param  key Key in a JSON mesage where the protocol field is located
 	 */
 	
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	public BroMessageFilter(Configuration conf, String key) {
 		_key = key;
-		_known_protocols = new HashSet<String>();
+		_known_protocols = new HashSet<>();
 		List known_protocols = conf.getList("source.known.protocols");
 		_known_protocols.addAll(known_protocols);
 	}
@@ -39,6 +39,7 @@ public class BroMessageFilter implements MessageFilter,Serializable {
 	 */
 	
 	public boolean emitTuple(JSONObject message) {
-		return _known_protocols.contains(message.get(_key));
+		String protocol = (String) message.get(_key);
+		return _known_protocols.contains(protocol);
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/filters/GenericMessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/filters/GenericMessageFilter.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/filters/GenericMessageFilter.java
index f136b2b..29413b7 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/filters/GenericMessageFilter.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/filters/GenericMessageFilter.java
@@ -1,15 +1,13 @@
 package org.apache.metron.filters;
-import java.io.Serializable;
 
+import org.apache.metron.parser.interfaces.MessageFilter;
 import org.json.simple.JSONObject;
 
-import org.apache.metron.parser.interfaces.MessageFilter;
+import java.io.Serializable;
 
-public class GenericMessageFilter implements MessageFilter,Serializable {
+public class GenericMessageFilter implements MessageFilter<JSONObject>,
+				Serializable {
 
-	/**
-	 * 
-	 */
 	private static final long serialVersionUID = 3626397212398318852L;
 
 	public boolean emitTuple(JSONObject message) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/AbstractParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/AbstractParserBolt.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/AbstractParserBolt.java
deleted file mode 100644
index 5bec434..0000000
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/AbstractParserBolt.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.parsing;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.base.BaseRichBolt;
-
-import com.codahale.metrics.Counter;
-import org.apache.metron.metrics.MetricReporter;
-import org.apache.metron.parser.interfaces.MessageFilter;
-import org.apache.metron.parser.interfaces.MessageParser;
-
-@SuppressWarnings("rawtypes")
-public abstract class AbstractParserBolt extends BaseRichBolt {
-	/**
-	 * 
-	 */
-	private static final long serialVersionUID = -6710596708304282838L;
-
-	protected static final Logger LOG = LoggerFactory
-			.getLogger(AbstractParserBolt.class);
-
-	protected OutputCollector _collector;
-	protected MessageParser _parser;
-
-	protected String OutputFieldName;
-	protected MetricReporter _reporter;
-	protected MessageFilter _filter;
-
-	protected Counter ackCounter, emitCounter, failCounter;
-
-	/**
-	 * Register counters to be reported to graphite
-	 * */
-
-	protected void registerCounters() {
-
-		String ackString = _parser.getClass().getSimpleName() + ".ack";
-
-		String emitString = _parser.getClass().getSimpleName() + ".emit";
-
-		String failString = _parser.getClass().getSimpleName() + ".fail";
-
-		ackCounter = _reporter.registerCounter(ackString);
-		emitCounter = _reporter.registerCounter(emitString);
-		failCounter = _reporter.registerCounter(failString);
-
-	}
-
-	/**
-	 * Check to make sure all required variables have been initialized
-	 * */
-
-	public final void prepare(Map conf, TopologyContext topologyContext,
-			OutputCollector collector) {
-		_collector = collector;
-		if (this._parser == null)
-			throw new IllegalStateException("MessageParser must be specified");
-		if (this.OutputFieldName == null)
-			throw new IllegalStateException("OutputFieldName must be specified");
-
-		if (this._filter == null)
-			throw new IllegalStateException("MessageFilter must be specified");
-
-		try {
-			doPrepare(conf, topologyContext, collector);
-		} catch (IOException e) {
-			LOG.error("Counld not initialize...");
-			e.printStackTrace();
-		}
-	}
-
-	/**
-	 * @param parser
-	 *            The parser class for parsing the incoming raw message byte
-	 *            stream
-	 * @return Instance of this class
-	 */
-
-	public boolean checkForSchemaCorrectness(JSONObject message) {
-		int correct = 0;
-
-		
-		if (!(message.containsKey("original_string"))) {
-			LOG.trace("[Metron] Message does not have original_string: " + message);
-			return false;
-		} else if (!(message.containsKey("timestamp"))) { 
-			LOG.trace("[Metron] Message does not have timestamp: " + message);
-			return false;
-		} else {
-			LOG.trace("[Metron] Message conforms to schema: "
-					+ message);
-			return true;
-		}
-	}
-
-	abstract void doPrepare(Map conf, TopologyContext topologyContext,
-			OutputCollector collector) throws IOException;
-
-	protected String generateTopologyKey(String src_ip, String dst_ip)
-			throws Exception {
-		try {
-			if (dst_ip == null && src_ip == null)
-				return "0";
-
-			if (src_ip == null || src_ip.length() == 0)
-				return dst_ip;
-
-			if (dst_ip == null || dst_ip.length() == 0)
-				return src_ip;
-
-			double ip1 = Double.parseDouble(src_ip.replace(".", ""));
-			double ip2 = Double.parseDouble(dst_ip.replace(".", ""));
-
-			return String.valueOf(ip1 + ip2);
-		} catch (Exception e) {
-			return "0";
-		}
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/PcapParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/PcapParserBolt.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/PcapParserBolt.java
deleted file mode 100644
index 6f692d6..0000000
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/PcapParserBolt.java
+++ /dev/null
@@ -1,227 +0,0 @@
-package org.apache.metron.parsing;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-import org.json.simple.JSONObject;
-import org.json.simple.JSONValue;
-
-import org.apache.metron.helpers.topology.ErrorGenerator;
-import org.apache.metron.parsing.parsers.PcapParser;
-import org.apache.metron.pcap.PacketInfo;
-
-import backtype.storm.generated.Grouping;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-
-
-/**
- * The Class PcapParserBolt parses each input tuple and emits a new tuple which
- * contains the information (header_json,group_key,pcap_id, timestamp, pcap) as
- * defined in the output schema.
- * 
- * @author sheetal
- * @version $Revision: 1.0 $
- */
-public class PcapParserBolt implements IRichBolt {
-
-  /** The Constant serialVersionUID. */
-  private static final long serialVersionUID = -1449830233777209255L;
-
-  /** The Constant LOG. */
-  private static final Logger LOG = Logger.getLogger(PcapParserBolt.class);
-
-  /** The collector. */
-  private OutputCollector collector = null;
-
-  /** The conf. */
-  @SuppressWarnings("rawtypes")
-private Map conf;
-
-  /** The number of chars to use for shuffle grouping. */
-  @SuppressWarnings("unused")
-private int numberOfCharsToUseForShuffleGrouping = 4;
-
-  /** The divisor to convert nanos to expected time precision. */
-  private long timePrecisionDivisor = 1L;
-
-
-  // HBaseStreamPartitioner hBaseStreamPartitioner = null ;
-
-  /**
-   * The Constructor.
-   */
-  public PcapParserBolt() {
-
-  }
-
-  public PcapParserBolt withTsPrecision(String tsPrecision) {
-	if (tsPrecision.equalsIgnoreCase("MILLI")) {
-	  //Convert nanos to millis
-	  LOG.info("Configured for MILLI, setting timePrecisionDivisor to 1000000L" );
-	  timePrecisionDivisor = 1000000L;
-	} else if (tsPrecision.equalsIgnoreCase("MICRO")) {
-	  //Convert nanos to micro
-	  LOG.info("Configured for MICRO, setting timePrecisionDivisor to 1000L" );
-	  timePrecisionDivisor = 1000L;
-	} else if (tsPrecision.equalsIgnoreCase("NANO")) {
-	  //Keep nano as is.
-	  LOG.info("Configured for NANO, setting timePrecisionDivisor to 1L" );
-	  timePrecisionDivisor = 1L;
-	} else {
-	  LOG.info("bolt.parser.ts.precision not set. Default to NANO");
-	  timePrecisionDivisor = 1L;
-	}
-	return this;
-  }
-  
-  /*
-   * (non-Javadoc)
-   * 
-   * @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm
-   * .topology.OutputFieldsDeclarer)
-   */
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-	  declarer.declareStream("message", new Fields("key", "message")); 
-    //declarer.declareStream("pcap_index_stream", new Fields("index_json", "pcap_id"));
-    declarer.declareStream("pcap_header_stream", new Fields("header_json", "pcap_id"));
-    declarer.declareStream("pcap_data_stream", new Fields("pcap_id", "timestamp", "pcap"));
-    declarer.declareStream("error", new Fields("error"));
-
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see backtype.storm.topology.IComponent#getComponentConfiguration()
-   */
-  /**
-   * Method getComponentConfiguration.
-   * 
-   * 
-   * 
-   * @return Map<String,Object> * @see
-   *         backtype.storm.topology.IComponent#getComponentConfiguration() * @see
-   *         backtype.storm.topology.IComponent#getComponentConfiguration() * @see
-   *         backtype.storm.topology.IComponent#getComponentConfiguration()
-   */
-
-  public Map<String, Object> getComponentConfiguration() {
-    return null;
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see backtype.storm.task.IBolt#prepare(java.util.Map,
-   * backtype.storm.task.TopologyContext, backtype.storm.task.OutputCollector)
-   */
-
-  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-    this.collector = collector;
-    this.conf = stormConf;
-    if (conf.containsKey("bolt.parser.num.of.key.chars.to.use.for.shuffle.grouping")) {
-      this.numberOfCharsToUseForShuffleGrouping = Integer.valueOf(conf.get(
-          "bolt.parser.num.of.key.chars.to.use.for.shuffle.grouping").toString());
-    }
-    
-    Grouping._Fields a;
-
-
-    // hBaseStreamPartitioner = new HBaseStreamPartitioner(
-    // conf.get("bolt.hbase.table.name").toString(),
-    // 0,
-    // Integer.parseInt(conf.get("bolt.hbase.partitioner.region.info.refresh.interval.mins").toString()))
-    // ;
-    // hBaseStreamPartitioner.prepare();
-
-  }
-
-  /**
-   * Processes each input tuple and emits tuple which holds the following
-   * information about a network packet : group_key : first 3 digits of the
-   * pcap_id pcap_id : generated from network packet srcIp, dstIp, protocol,
-   * srcPort, dstPort header_json : contains global header, ipv4 header, tcp
-   * header(if the n/w protocol is tcp), udp header (if the n/w protocol is udp)
-   * timestamp : the n/w packet capture timestamp pcap : tuple in binary array.
-   * 
-   * @param input
-   *          Tuple
-   * @see backtype.storm.task.IBolt#execute(Tuple)
-   */
-
-  @SuppressWarnings("unchecked")
-public void execute(Tuple input) {
-
-    // LOG.debug("In PcapParserBolt bolt: Got tuple " + input);
-    // LOG.debug("Got this pcap : " + new String(input.getBinary(0)));
-
-    List<PacketInfo> packetInfoList = null;
-    try {
-      packetInfoList = PcapParser.parse(input.getBinary(0));
-
-      if (packetInfoList != null) {
-
-        for (PacketInfo packetInfo : packetInfoList) {
-        	
-        	
-        	String string_pcap = packetInfo.getJsonIndexDoc();
-        	Object obj=JSONValue.parse(string_pcap);
-        	  JSONObject header=(JSONObject)obj;
-        	
-        	JSONObject message = new JSONObject();
-        	//message.put("key", packetInfo.getKey());
-        	
-        	message.put("message", header);
-        	
-        	collector.emit("message", new Values(packetInfo.getKey(), message));
-        	
-        	//collector.emit("pcap_index_stream", new Values(packetInfo.getJsonIndexDoc(), packetInfo.getKey()));
-        	
-          collector.emit("pcap_header_stream", new Values(packetInfo.getJsonDoc(), packetInfo.getKey()));
-          collector.emit("pcap_data_stream", new Values(packetInfo.getKey(),
-             packetInfo.getPacketTimeInNanos() / timePrecisionDivisor,
-              input.getBinary(0)));
-
-          // collector.emit(new Values(packetInfo.getJsonDoc(), packetInfo
-          // .getKey().substring(0, numberOfCharsToUseForShuffleGrouping),
-          // packetInfo.getKey(), (packetInfo.getPacketHeader().getTsSec()
-          // * secMultiplier + packetInfo.getPacketHeader().getTsUsec()
-          // * microSecMultiplier), input.getBinary(0)));
-        }
-      }
-
-    } catch (Exception e) {
-      collector.fail(input);
-      e.printStackTrace();
-      LOG.error("Exception while processing tuple", e);
-      
-
-		JSONObject error = ErrorGenerator.generateErrorMessage(
-				"Alerts problem: " + input.getBinary(0), e);
-		collector.emit("error", new Values(error));
-		
-      return;
-    }
-    collector.ack(input);
-
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see backtype.storm.task.IBolt#cleanup()
-   */
-
-  public void cleanup() {
-    // TODO Auto-generated method stub
-
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/TelemetryParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/TelemetryParserBolt.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/TelemetryParserBolt.java
deleted file mode 100644
index 8865d89..0000000
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/TelemetryParserBolt.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.parsing;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.commons.configuration.Configuration;
-import org.json.simple.JSONObject;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import org.apache.metron.helpers.topology.ErrorGenerator;
-import org.apache.metron.json.serialization.JSONEncoderHelper;
-import org.apache.metron.metrics.MetricReporter;
-import org.apache.metron.parser.interfaces.MessageFilter;
-import org.apache.metron.parser.interfaces.MessageParser;
-
-/**
- * Uses an adapter to parse a telemetry message from its native format into a
- * standard JSON. For a list of available adapter please check
- * org.apache.metron.parser.parsers. The input is a raw byte array and the output is a
- * JSONObject
- * <p>
- * The parsing conventions are as follows:
- * <p>
- * <ul>
- * 
- * <li>ip_src_addr = source ip of a message
- * <li>ip_dst_addr = destination ip of a message
- * <li>ip_src_port = source port of a message
- * <li>ip_dst_port = destination port of a message
- * <li>protocol = protocol of a message
- * <ul>
- * <p>
- * <p>
- * If a message does not contain at least one of these variables it will be
- * failed
- **/
-
-@SuppressWarnings("rawtypes")
-public class TelemetryParserBolt extends AbstractParserBolt {
-
-	private static final long serialVersionUID = -2647123143398352020L;
-	private JSONObject metricConfiguration;
-
-	/**
-	 * @param parser
-	 *            The parser class for parsing the incoming raw message byte
-	 *            stream
-	 * @return Instance of this class
-	 */
-
-	public TelemetryParserBolt withMessageParser(MessageParser parser) {
-		_parser = parser;
-		return this;
-	}
-
-	/**
-	 * @param OutputFieldName
-	 *            Field name of the output tuple
-	 * @return Instance of this class
-	 */
-
-	public TelemetryParserBolt withOutputFieldName(String OutputFieldName) {
-		this.OutputFieldName = OutputFieldName;
-		return this;
-	}
-
-	/**
-	 * @param filter
-	 *            A class for filtering/dropping incomming telemetry messages
-	 * @return Instance of this class
-	 */
-
-	public TelemetryParserBolt withMessageFilter(MessageFilter filter) {
-		this._filter = filter;
-		return this;
-	}
-
-	/**
-	 * @param config
-	 *            A class for generating custom metrics into graphite
-	 * @return Instance of this class
-	 */
-
-	public TelemetryParserBolt withMetricConfig(Configuration config) {
-		this.metricConfiguration = JSONEncoderHelper.getJSON(config
-				.subset("org.apache.metron.metrics"));
-		return this;
-	}
-
-	@Override
-	void doPrepare(Map conf, TopologyContext topologyContext,
-			OutputCollector collector) throws IOException {
-
-		LOG.info("[Metron] Preparing TelemetryParser Bolt...");
-
-		if (metricConfiguration != null) {
-			_reporter = new MetricReporter();
-			_reporter
-					.initialize(metricConfiguration, TelemetryParserBolt.class);
-			LOG.info("[Metron] Metric reporter is initialized");
-		} else {
-			LOG.info("[Metron] Metric reporter is not initialized");
-		}
-		this.registerCounters();
-		
-		if(_parser != null)
-		_parser.init();
-		
-		
-	}
-
-	@SuppressWarnings("unchecked")
-	public void execute(Tuple tuple) {
-
-		LOG.trace("[Metron] Starting to process a new incoming tuple");
-
-		byte[] original_message = null;
-
-		try {
-
-			original_message = tuple.getBinary(0);
-
-			LOG.trace("[Metron] Starting the parsing process");
-
-			if (original_message == null || original_message.length == 0) {
-				LOG.error("Incomming tuple is null");
-				throw new Exception("Invalid message length");
-			}
-
-			LOG.trace("[Metron] Attempting to transofrm binary message to JSON");
-			JSONObject transformed_message = _parser.parse(original_message);
-			LOG.debug("[Metron] Transformed Telemetry message: "
-					+ transformed_message);
-
-			if (transformed_message == null || transformed_message.isEmpty())
-				throw new Exception("Unable to turn binary message into a JSON");
-
-			LOG.trace("[Metron] Checking if the transformed JSON conforms to the right schema");
-
-			if (!checkForSchemaCorrectness(transformed_message)) {
-				throw new Exception("Incorrect formatting on message: "
-						+ transformed_message);
-			}
-
-			else {
-				LOG.trace("[Metron] JSON message has the right schema");
-				boolean filtered = false;
-
-				if (_filter != null) {
-					if (!_filter.emitTuple(transformed_message)) {
-						filtered = true;
-					}
-				}
-
-				if (!filtered) {
-					String ip1 = null;
-
-					if (transformed_message.containsKey("ip_src_addr"))
-						ip1 = transformed_message.get("ip_src_addr").toString();
-
-					String ip2 = null;
-
-					if (transformed_message.containsKey("ip_dst_addr"))
-						ip2 = transformed_message.get("ip_dst_addr").toString();
-
-					String key = generateTopologyKey(ip1, ip2);
-
-					JSONObject new_message = new JSONObject();
-					new_message.put("message", transformed_message);
-					_collector.emit("message", new Values(key, new_message));
-				}
-
-				_collector.ack(tuple);
-				if (metricConfiguration != null)
-					ackCounter.inc();
-			}
-
-		} catch (Exception e) {
-			e.printStackTrace();
-			LOG.error("Failed to parse telemetry message :" + original_message);
-			_collector.fail(tuple);
-
-			if (metricConfiguration != null)
-				failCounter.inc();
-
-			JSONObject error = ErrorGenerator.generateErrorMessage(
-					"Parsing problem: " + new String(original_message),
-					e);
-			_collector.emit("error", new Values(error));
-		}
-	}
-
-	public void declareOutputFields(OutputFieldsDeclarer declearer) {
-		declearer.declareStream("message", new Fields("key", "message"));
-		declearer.declareStream("error", new Fields("message"));
-
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/AbstractParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/AbstractParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/AbstractParser.java
deleted file mode 100644
index bf8c596..0000000
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/AbstractParser.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.parsing.parsers;
-
-import java.io.Serializable;
-
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.metron.parser.interfaces.MessageParser;
-import org.apache.metron.parsing.AbstractParserBolt;
-
-@SuppressWarnings("serial")
-public abstract class AbstractParser implements MessageParser, Serializable {
-
-	protected static final Logger _LOG = LoggerFactory
-			.getLogger(AbstractParserBolt.class);
-
-	public void initializeParser() {
-		_LOG.debug("Initializing adapter...");
-		
-
-	}
-	
-	public void init() {
-		
-	}
-	
-	
-	abstract public JSONObject parse(byte[] raw_message);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java
index 870991c..2b8b955 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java
@@ -22,25 +22,33 @@ import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
+
 @SuppressWarnings("serial")
-public class BasicBroParser extends AbstractParser {
+public class BasicBroParser extends BasicParser {
 
     protected static final Logger _LOG = LoggerFactory
             .getLogger(BasicBroParser.class);
     private JSONCleaner cleaner = new JSONCleaner();
 
+    @Override
+    public void init() {
+
+    }
+
     @SuppressWarnings("unchecked")
-    public JSONObject parse(byte[] msg) {
+    public List<JSONObject> parse(byte[] msg) {
 
         _LOG.trace("[Metron] Starting to parse incoming message");
 
         String rawMessage = null;
-
+        List<JSONObject> messages = new ArrayList<>();
         try {
             rawMessage = new String(msg, "UTF-8");
             _LOG.trace("[Metron] Received message: " + rawMessage);
 
-            JSONObject cleanedMessage = cleaner.Clean(rawMessage);
+            JSONObject cleanedMessage = cleaner.clean(rawMessage);
             _LOG.debug("[Metron] Cleaned message: " + cleanedMessage);
 
             if (cleanedMessage == null || cleanedMessage.isEmpty()) {
@@ -77,9 +85,10 @@ public class BasicBroParser extends AbstractParser {
 
             replaceKey(payload, "timestamp", new String[]{ "ts" });
 
+            long timestamp = 0L;
             if (payload.containsKey("timestamp")) {
                 try {
-                    long timestamp = Long.parseLong(payload.get("timestamp").toString());
+                    timestamp = Long.parseLong(payload.get("timestamp").toString());
                     payload.put("timestamp", timestamp);
                 } catch (NumberFormatException nfe) {
                     _LOG.error(String.format("[Metron] timestamp is invalid: %s", payload.get("timestamp")));
@@ -102,8 +111,8 @@ public class BasicBroParser extends AbstractParser {
 
             payload.put("protocol", key);
             _LOG.debug("[Metron] Returning parsed message: " + payload);
-
-            return payload;
+            messages.add(payload);
+            return messages;
 
         } catch (Exception e) {
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicFireEyeParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicFireEyeParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicFireEyeParser.java
index dbced1c..b5d1293 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicFireEyeParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicFireEyeParser.java
@@ -1,36 +1,28 @@
 package org.apache.metron.parsing.parsers;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Serializable;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.parsing.utils.ParserUtils;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.text.ParseException;
-import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Locale;
-import java.util.Map;
-import java.util.TimeZone;
+import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.commons.lang3.StringUtils;
-import org.json.simple.JSONObject;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
+public class BasicFireEyeParser extends BasicParser {
 
-import oi.thekraken.grok.api.Grok;
-import oi.thekraken.grok.api.Match;
-import oi.thekraken.grok.api.exception.GrokException;
+	private static final long serialVersionUID = 6328907550159134550L;
+	protected static final Logger LOG = LoggerFactory
+					.getLogger(BasicFireEyeParser.class);
 
-public class BasicFireEyeParser extends AbstractParser implements Serializable {
 
-	private static final long serialVersionUID = 6328907550159134550L;
-	//String tsRegex = "(.*)([a-z][A-Z]+)\\s+(\\d+)\\s+(\\d+\\:\\d+\\:\\d+)\\s+(\\d+\\.\\d+\\.\\d+\\.\\d+)(.*)$";
 	String tsRegex ="([a-zA-Z]{3})\\s+(\\d+)\\s+(\\d+\\:\\d+\\:\\d+)\\s+(\\d+\\.\\d+\\.\\d+\\.\\d+)";
 	
 	
@@ -49,9 +41,14 @@ public class BasicFireEyeParser extends AbstractParser implements Serializable {
 	}
 
 	@Override
-	public JSONObject parse(byte[] raw_message) {
-		String toParse = "";
+	public void init() {
 
+	}
+
+	@Override
+	public List<JSONObject> parse(byte[] raw_message) {
+		String toParse = "";
+		List<JSONObject> messages = new ArrayList<>();
 		try {
 
 			toParse = new String(raw_message, "UTF-8");
@@ -80,8 +77,8 @@ public class BasicFireEyeParser extends AbstractParser implements Serializable {
 			JSONObject toReturn = parseMessage(toParse);
 
 			toReturn.put("timestamp", getTimeStamp(toParse,delimiter));
-
-			return toReturn;
+			messages.add(toReturn);
+			return messages;
 
 		} catch (Exception e) {
 			e.printStackTrace();
@@ -90,37 +87,6 @@ public class BasicFireEyeParser extends AbstractParser implements Serializable {
 
 	}
 
-	public static Long convertToEpoch(String m, String d, String ts,
-			boolean adjust_timezone) throws ParseException {
-		d = d.trim();
-
-		if (d.length() <= 2)
-			d = "0" + d;
-
-		Date date = new SimpleDateFormat("MMM", Locale.ENGLISH).parse(m);
-		Calendar cal = Calendar.getInstance();
-		cal.setTime(date);
-		String month = String.valueOf(cal.get(Calendar.MONTH));
-		int year = Calendar.getInstance().get(Calendar.YEAR);
-
-		if (month.length() <= 2)
-			month = "0" + month;
-
-		String coglomerated_ts = year + "-" + month + "-" + d + " " + ts;
-
-		System.out.println(coglomerated_ts);
-
-		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
-		if (adjust_timezone)
-			sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
-
-		date = sdf.parse(coglomerated_ts);
-		long timeInMillisSinceEpoch = date.getTime();
-
-		return timeInMillisSinceEpoch;
-	}
-
 	private long getTimeStamp(String toParse,String delimiter) throws ParseException {
 		
 		long ts = 0;
@@ -134,8 +100,8 @@ public class BasicFireEyeParser extends AbstractParser implements Serializable {
 			time = tsMatcher.group(3);
 	
 				} else {
-			_LOG.warn("Unable to find timestamp in message: " + toParse);
-			ts = convertToEpoch(month, day, time, true);
+			LOG.warn("Unable to find timestamp in message: " + toParse);
+			ts = ParserUtils.convertToEpoch(month, day, time, true);
 		}
 
 			return ts;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicIseParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicIseParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicIseParser.java
index 416491d..d4cdbe5 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicIseParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicIseParser.java
@@ -17,34 +17,41 @@
 
 package org.apache.metron.parsing.parsers;
 
-import java.io.StringReader;
-
+import com.esotericsoftware.minlog.Log;
+import org.apache.metron.ise.parser.ISEParser;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.esotericsoftware.minlog.Log;
-import org.apache.metron.ise.parser.ISEParser;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.List;
 
 @SuppressWarnings("serial")
-public class BasicIseParser extends AbstractParser {
+public class BasicIseParser extends BasicParser {
 
-	protected static final Logger _LOG = LoggerFactory
+	private static final Logger _LOG = LoggerFactory
 			.getLogger(BasicIseParser.class);
 	static final transient ISEParser _parser = new ISEParser("header=");
 
+	@Override
+	public void init() {
+
+	}
+
 	@SuppressWarnings("unchecked")
-	public JSONObject parse(byte[] msg) {
+	@Override
+	public List<JSONObject> parse(byte[] msg) {
 	
 		String raw_message = "";
-
+		List<JSONObject> messages = new ArrayList<>();
 		try {
 
 			raw_message = new String(msg, "UTF-8");
 			_LOG.debug("Received message: " + raw_message);
-			
+
 			/*
-			 * Reinitialize Parser. It has the effect of calling the constructor again. 
+			 * Reinitialize Parser. It has the effect of calling the constructor again.
 			 */
 			_parser.ReInit(new StringReader("header=" + raw_message.trim()));
 
@@ -54,7 +61,7 @@ public class BasicIseParser extends AbstractParser {
 			String ip_src_port = (String) payload.get("Device Port");
 			String ip_dst_addr = (String) payload.get("DestinationIPAddress");
 			String ip_dst_port = (String) payload.get("DestinationPort");
-			
+
 			/*
 			 * Standard Fields for Metron.
 			 */
@@ -67,11 +74,8 @@ public class BasicIseParser extends AbstractParser {
 				payload.put("ip_dst_addr", ip_dst_addr);
 			if(ip_dst_port != null)
 				payload.put("ip_dst_port", ip_dst_port);
-
-			JSONObject message = new JSONObject();
-			//message.put("message", payload);
-
-			return payload;
+			messages.add(payload);
+			return messages;
 
 		} catch (Exception e) {
 			Log.error(e.toString());
@@ -80,5 +84,10 @@ public class BasicIseParser extends AbstractParser {
 		return null;
 	}
 
+	@Override
+	public boolean validate(JSONObject message) {
+		return true;
+	}
+
 	
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicLancopeParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicLancopeParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicLancopeParser.java
index ab2a508..b8be87f 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicLancopeParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicLancopeParser.java
@@ -17,23 +17,35 @@
 
 package org.apache.metron.parsing.parsers;
 
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
 
 @SuppressWarnings("serial")
-public class BasicLancopeParser extends AbstractParser {
+public class BasicLancopeParser extends BasicParser {
 	// Sample Lancope Message
 	// {"message":"<131>Jul 17 15:59:01 smc-01 StealthWatch[12365]: 2014-07-17T15:58:30Z 10.40.10.254 0.0.0.0 Minor High Concern Index The host's concern index has either exceeded the CI threshold or rapidly increased. Observed 36.55M points. Policy maximum allows up to 20M points.","@version":"1","@timestamp":"2014-07-17T15:56:05.992Z","type":"syslog","host":"10.122.196.201"}
 
-	@SuppressWarnings("unchecked")
+	private static final Logger _LOG = LoggerFactory.getLogger(BasicLancopeParser
+					.class);
+
 	@Override
-	public JSONObject parse(byte[] msg) {
+	public void init() {
 
-		JSONObject payload = null;
+	}
 
+	//@SuppressWarnings("unchecked")
+	@Override
+	public List<JSONObject> parse(byte[] msg) {
+
+		JSONObject payload = null;
+		List<JSONObject> messages = new ArrayList<>();
 		try {
 			
 			String raw_message = new String(msg, "UTF-8");
@@ -56,13 +68,15 @@ public class BasicLancopeParser extends AbstractParser {
 			Date date;
 
 			date = formatter.parse(fixed_date);
-			payload.put("timestamp", date.getTime());
+			long timestamp = date.getTime();
+			payload.put("timestamp", timestamp);
 
 			payload.remove("@timestamp");
 			payload.remove("message");
 			payload.put("original_string", message);
 
-			return payload;
+			messages.add(payload);
+			return messages;
 		} catch (Exception e) {
 
 			_LOG.error("Unable to parse message: " + payload.toJSONString());

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicLogstashParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicLogstashParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicLogstashParser.java
index 39fe207..32b7433 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicLogstashParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicLogstashParser.java
@@ -1,17 +1,22 @@
 package org.apache.metron.parsing.parsers;
 
-import java.io.UnsupportedEncodingException;
-import java.text.SimpleDateFormat;
-
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
 
-public class BasicLogstashParser extends AbstractParser {
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+
+public class BasicLogstashParser extends BasicParser {
 
 	@Override
-	public JSONObject parse(byte[] raw_message) {
-		
+	public void init() {
+
+	}
+
+	@Override
+	public List<JSONObject> parse(byte[] raw_message) {
+		List<JSONObject> messages = new ArrayList<>();
 		try {
 			
 			/*
@@ -38,9 +43,10 @@ public class BasicLogstashParser extends AbstractParser {
 			rawJson = mutate(rawJson, "src_ip", "ip_src_addr");
 			
 			// convert timestamp to milli since epoch
-			rawJson.put("timestamp", LogstashToEpoch((String) rawJson.remove("@timestamp")));
-
-			return rawJson;
+			long timestamp = LogstashToEpoch((String) rawJson.remove("@timestamp"));
+			rawJson.put("timestamp", timestamp);
+			messages.add(rawJson);
+			return messages;
 		} catch (Exception e) {
 			e.printStackTrace();
 			return null;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicPaloAltoFirewallParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicPaloAltoFirewallParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicPaloAltoFirewallParser.java
index dd87130..ac105d7 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicPaloAltoFirewallParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicPaloAltoFirewallParser.java
@@ -2,183 +2,190 @@ package org.apache.metron.parsing.parsers;
 
 
 import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+public class BasicPaloAltoFirewallParser extends BasicParser {
+
+  private static final Logger _LOG = LoggerFactory.getLogger
+          (BasicPaloAltoFirewallParser.class);
+
+  private static final long serialVersionUID = 3147090149725343999L;
+  public static final String PaloAltoDomain = "palo_alto_domain";
+  public static final String ReceiveTime = "receive_time";
+  public static final String SerialNum = "serial_num";
+  public static final String Type = "type";
+  public static final String ThreatContentType = "threat_content_type";
+  public static final String ConfigVersion = "config_version";
+  public static final String GenerateTime = "generate_time";
+  public static final String SourceAddress = "source_address";
+  public static final String DestinationAddress = "destination_address";
+  public static final String NATSourceIP = "nat_source_ip";
+  public static final String NATDestinationIP = "nat_destination_ip";
+  public static final String Rule = "rule";
+  public static final String SourceUser = "source_user";
+  public static final String DestinationUser = "destination_user";
+  public static final String Application = "application";
+  public static final String VirtualSystem = "virtual_system";
+  public static final String SourceZone = "source_zone";
+  public static final String DestinationZone = "destination_zone";
+  public static final String InboundInterface = "inbound_interface";
+  public static final String OutboundInterface = "outbound_interface";
+  public static final String LogAction = "log_action";
+  public static final String TimeLogged = "time_logged";
+  public static final String SessionID = "session_id";
+  public static final String RepeatCount = "repeat_count";
+  public static final String SourcePort = "source_port";
+  public static final String DestinationPort = "destination_port";
+  public static final String NATSourcePort = "nats_source_port";
+  public static final String NATDestinationPort = "nats_destination_port";
+  public static final String Flags = "flags";
+  public static final String IPProtocol = "ip_protocol";
+  public static final String Action = "action";
+
+  //Threat
+  public static final String URL = "url";
+  public static final String HOST = "host";
+  public static final String ThreatContentName = "threat_content_name";
+  public static final String Category = "category";
+  public static final String Direction = "direction";
+  public static final String Seqno = "seqno";
+  public static final String ActionFlags = "action_flags";
+  public static final String SourceCountry = "source_country";
+  public static final String DestinationCountry = "destination_country";
+  public static final String Cpadding = "cpadding";
+  public static final String ContentType = "content_type";
+
+  //Traffic
+  public static final String Bytes = "content_type";
+  public static final String BytesSent = "content_type";
+  public static final String BytesReceived = "content_type";
+  public static final String Packets = "content_type";
+  public static final String StartTime = "content_type";
+  public static final String ElapsedTimeInSec = "content_type";
+  public static final String Padding = "content_type";
+  public static final String PktsSent = "pkts_sent";
+  public static final String PktsReceived = "pkts_received";
+
+
+  @Override
+  public void init() {
+
+  }
+
+  @SuppressWarnings({"unchecked", "unused"})
+  public List<JSONObject> parse(byte[] msg) {
+
+    JSONObject outputMessage = new JSONObject();
+    String toParse = "";
+    List<JSONObject> messages = new ArrayList<>();
+    try {
+
+      toParse = new String(msg, "UTF-8");
+      _LOG.debug("Received message: " + toParse);
+
+
+      parseMessage(toParse, outputMessage);
+      long timestamp = System.currentTimeMillis();
+      outputMessage.put("timestamp", System.currentTimeMillis());
+      outputMessage.put("ip_src_addr", outputMessage.remove("source_address"));
+      outputMessage.put("ip_src_port", outputMessage.remove("source_port"));
+      outputMessage.put("ip_dst_addr", outputMessage.remove("destination_address"));
+      outputMessage.put("ip_dst_port", outputMessage.remove("destination_port"));
+      outputMessage.put("protocol", outputMessage.remove("ip_protocol"));
+
+      outputMessage.put("original_string", toParse);
+      messages.add(outputMessage);
+      return messages;
+    } catch (Exception e) {
+      e.printStackTrace();
+      _LOG.error("Failed to parse: " + toParse);
+      return null;
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private void parseMessage(String message, JSONObject outputMessage) {
+
+    String[] tokens = message.split(",");
+
+    String type = tokens[3].trim();
+
+    //populate common objects
+    outputMessage.put(PaloAltoDomain, tokens[0].trim());
+    outputMessage.put(ReceiveTime, tokens[1].trim());
+    outputMessage.put(SerialNum, tokens[2].trim());
+    outputMessage.put(Type, type);
+    outputMessage.put(ThreatContentType, tokens[4].trim());
+    outputMessage.put(ConfigVersion, tokens[5].trim());
+    outputMessage.put(GenerateTime, tokens[6].trim());
+    outputMessage.put(SourceAddress, tokens[7].trim());
+    outputMessage.put(DestinationAddress, tokens[8].trim());
+    outputMessage.put(NATSourceIP, tokens[9].trim());
+    outputMessage.put(NATDestinationIP, tokens[10].trim());
+    outputMessage.put(Rule, tokens[11].trim());
+    outputMessage.put(SourceUser, tokens[12].trim());
+    outputMessage.put(DestinationUser, tokens[13].trim());
+    outputMessage.put(Application, tokens[14].trim());
+    outputMessage.put(VirtualSystem, tokens[15].trim());
+    outputMessage.put(SourceZone, tokens[16].trim());
+    outputMessage.put(DestinationZone, tokens[17].trim());
+    outputMessage.put(InboundInterface, tokens[18].trim());
+    outputMessage.put(OutboundInterface, tokens[19].trim());
+    outputMessage.put(LogAction, tokens[20].trim());
+    outputMessage.put(TimeLogged, tokens[21].trim());
+    outputMessage.put(SessionID, tokens[22].trim());
+    outputMessage.put(RepeatCount, tokens[23].trim());
+    outputMessage.put(SourcePort, tokens[24].trim());
+    outputMessage.put(DestinationPort, tokens[25].trim());
+    outputMessage.put(NATSourcePort, tokens[26].trim());
+    outputMessage.put(NATDestinationPort, tokens[27].trim());
+    outputMessage.put(Flags, tokens[28].trim());
+    outputMessage.put(IPProtocol, tokens[29].trim());
+    outputMessage.put(Action, tokens[30].trim());
+
+
+    if ("THREAT".equals(type.toUpperCase())) {
+      outputMessage.put(URL, tokens[31].trim());
+      try {
+        URL url = new URL(tokens[31].trim());
+        outputMessage.put(HOST, url.getHost());
+      } catch (MalformedURLException e) {
+      }
+      outputMessage.put(ThreatContentName, tokens[32].trim());
+      outputMessage.put(Category, tokens[33].trim());
+      outputMessage.put(Direction, tokens[34].trim());
+      outputMessage.put(Seqno, tokens[35].trim());
+      outputMessage.put(ActionFlags, tokens[36].trim());
+      outputMessage.put(SourceCountry, tokens[37].trim());
+      outputMessage.put(DestinationCountry, tokens[38].trim());
+      outputMessage.put(Cpadding, tokens[39].trim());
+      outputMessage.put(ContentType, tokens[40].trim());
+
+    } else {
+      outputMessage.put(Bytes, tokens[31].trim());
+      outputMessage.put(BytesSent, tokens[32].trim());
+      outputMessage.put(BytesReceived, tokens[33].trim());
+      outputMessage.put(Packets, tokens[34].trim());
+      outputMessage.put(StartTime, tokens[35].trim());
+      outputMessage.put(ElapsedTimeInSec, tokens[36].trim());
+      outputMessage.put(Category, tokens[37].trim());
+      outputMessage.put(Padding, tokens[38].trim());
+      outputMessage.put(Seqno, tokens[39].trim());
+      outputMessage.put(ActionFlags, tokens[40].trim());
+      outputMessage.put(SourceCountry, tokens[41].trim());
+      outputMessage.put(DestinationCountry, tokens[42].trim());
+      outputMessage.put(Cpadding, tokens[43].trim());
+      outputMessage.put(PktsSent, tokens[44].trim());
+      outputMessage.put(PktsReceived, tokens[45].trim());
+    }
+
+  }
 
-import org.apache.metron.parser.interfaces.MessageParser;
-
-public class BasicPaloAltoFirewallParser extends AbstractParser implements MessageParser{
-
-	private static final long serialVersionUID = 3147090149725343999L;
-	public static final String PaloAltoDomain  = "palo_alto_domain";
-	public static final String ReceiveTime  = "receive_time";
-	public static final String SerialNum  = "serial_num";
-	public static final String Type  = "type";
-	public static final String ThreatContentType  = "threat_content_type";
-	public static final String ConfigVersion  = "config_version";
-	public static final String GenerateTime  = "generate_time";
-	public static final String SourceAddress  = "source_address";
-	public static final String DestinationAddress  = "destination_address";
-	public static final String NATSourceIP  = "nat_source_ip";
-	public static final String NATDestinationIP  = "nat_destination_ip";
-	public static final String Rule  = "rule";
-	public static final String SourceUser  = "source_user";
-	public static final String DestinationUser  = "destination_user";
-	public static final String Application  = "application";
-	public static final String VirtualSystem  = "virtual_system";
-	public static final String SourceZone  = "source_zone";
-	public static final String DestinationZone  = "destination_zone";
-	public static final String InboundInterface  = "inbound_interface";
-	public static final String OutboundInterface  = "outbound_interface";
-	public static final String LogAction  = "log_action";
-	public static final String TimeLogged  = "time_logged";
-	public static final String SessionID  = "session_id";
-	public static final String RepeatCount  = "repeat_count";
-	public static final String SourcePort  = "source_port";
-	public static final String DestinationPort  = "destination_port";
-	public static final String NATSourcePort  = "nats_source_port";
-	public static final String NATDestinationPort  = "nats_destination_port";
-	public static final String Flags  = "flags";
-	public static final String IPProtocol  = "ip_protocol";
-	public static final String Action  = "action";
-	
-	//Threat
-	public static final String URL  = "url";
-	public static final String HOST  = "host";
-	public static final String ThreatContentName  = "threat_content_name";
-	public static final String Category  = "category";
-	public static final String Direction  = "direction";
-	public static final String Seqno  = "seqno";
-	public static final String ActionFlags  = "action_flags";
-	public static final String SourceCountry  = "source_country";
-	public static final String DestinationCountry  = "destination_country";
-	public static final String Cpadding  = "cpadding";
-	public static final String ContentType = "content_type";
-	
-	//Traffic
-	public static final String Bytes = "content_type";
-	public static final String BytesSent = "content_type";
-	public static final String BytesReceived = "content_type";
-	public static final String Packets = "content_type";
-	public static final String StartTime = "content_type";
-	public static final String ElapsedTimeInSec = "content_type";
-	public static final String Padding = "content_type";
-	public static final String PktsSent = "pkts_sent";
-	public static final String PktsReceived = "pkts_received";
-	
-
-	@SuppressWarnings({ "unchecked", "unused" })
-	public JSONObject parse(byte[] msg) {
-
-		JSONObject outputMessage = new JSONObject();
-		String toParse = "";
-
-		try {
-
-			toParse = new String(msg, "UTF-8");
-			_LOG.debug("Received message: " + toParse);
-			
-			
-			parseMessage(toParse,outputMessage);
-			
-				outputMessage.put("timestamp", System.currentTimeMillis());
-				outputMessage.put("ip_src_addr", outputMessage.remove("source_address"));
-				outputMessage.put("ip_src_port", outputMessage.remove("source_port"));
-				outputMessage.put("ip_dst_addr", outputMessage.remove("destination_address"));
-				outputMessage.put("ip_dst_port", outputMessage.remove("destination_port"));
-				outputMessage.put("protocol", outputMessage.remove("ip_protocol"));
-				
-				outputMessage.put("original_string", toParse);
-			return outputMessage;
-		} catch (Exception e) {
-			e.printStackTrace();
-			_LOG.error("Failed to parse: " + toParse);
-			return null;
-		}
-	}
-		
-		@SuppressWarnings("unchecked")
-		private void parseMessage(String message,JSONObject outputMessage) {
-			
-			String[] tokens = message.split(",");
-			
-			String type = tokens[3].trim();
-			
-			//populate common objects
-			outputMessage.put(PaloAltoDomain, tokens[0].trim());
-			outputMessage.put(ReceiveTime, tokens[1].trim());
-			outputMessage.put(SerialNum, tokens[2].trim());
-			outputMessage.put(Type, type);
-			outputMessage.put(ThreatContentType, tokens[4].trim());
-			outputMessage.put(ConfigVersion, tokens[5].trim());
-			outputMessage.put(GenerateTime, tokens[6].trim());
-			outputMessage.put(SourceAddress, tokens[7].trim());
-			outputMessage.put(DestinationAddress, tokens[8].trim());
-			outputMessage.put(NATSourceIP, tokens[9].trim());
-			outputMessage.put(NATDestinationIP, tokens[10].trim());
-			outputMessage.put(Rule, tokens[11].trim());
-			outputMessage.put(SourceUser, tokens[12].trim());
-			outputMessage.put(DestinationUser, tokens[13].trim());
-			outputMessage.put(Application, tokens[14].trim());
-			outputMessage.put(VirtualSystem, tokens[15].trim());
-			outputMessage.put(SourceZone, tokens[16].trim());
-			outputMessage.put(DestinationZone, tokens[17].trim());
-			outputMessage.put(InboundInterface, tokens[18].trim());
-			outputMessage.put(OutboundInterface, tokens[19].trim());
-			outputMessage.put(LogAction, tokens[20].trim());
-			outputMessage.put(TimeLogged, tokens[21].trim());
-			outputMessage.put(SessionID, tokens[22].trim());
-			outputMessage.put(RepeatCount, tokens[23].trim());
-			outputMessage.put(SourcePort, tokens[24].trim());
-			outputMessage.put(DestinationPort, tokens[25].trim());
-			outputMessage.put(NATSourcePort, tokens[26].trim());
-			outputMessage.put(NATDestinationPort, tokens[27].trim());
-			outputMessage.put(Flags, tokens[28].trim());
-			outputMessage.put(IPProtocol, tokens[29].trim());
-			outputMessage.put(Action, tokens[30].trim());
-			
-			
-			if("THREAT".equals(type.toUpperCase())) {
-				outputMessage.put(URL, tokens[31].trim());
-				try {
-					URL url = new URL(tokens[31].trim());
-					outputMessage.put(HOST, url.getHost());
-				} catch (MalformedURLException e) {
-				}
-				outputMessage.put(ThreatContentName, tokens[32].trim());
-				outputMessage.put(Category, tokens[33].trim());
-				outputMessage.put(Direction, tokens[34].trim());
-				outputMessage.put(Seqno, tokens[35].trim());
-				outputMessage.put(ActionFlags, tokens[36].trim());
-				outputMessage.put(SourceCountry, tokens[37].trim());
-				outputMessage.put(DestinationCountry, tokens[38].trim());
-				outputMessage.put(Cpadding, tokens[39].trim());
-				outputMessage.put(ContentType, tokens[40].trim());
-				
-			}
-			else
-			{
-				outputMessage.put(Bytes, tokens[31].trim());
-				outputMessage.put(BytesSent, tokens[32].trim());
-				outputMessage.put(BytesReceived, tokens[33].trim());
-				outputMessage.put(Packets, tokens[34].trim());
-				outputMessage.put(StartTime, tokens[35].trim());
-				outputMessage.put(ElapsedTimeInSec, tokens[36].trim());
-				outputMessage.put(Category, tokens[37].trim());
-				outputMessage.put(Padding, tokens[38].trim());
-				outputMessage.put(Seqno, tokens[39].trim());
-				outputMessage.put(ActionFlags, tokens[40].trim());
-				outputMessage.put(SourceCountry, tokens[41].trim());
-				outputMessage.put(DestinationCountry, tokens[42].trim());
-				outputMessage.put(Cpadding, tokens[43].trim());
-				outputMessage.put(PktsSent, tokens[44].trim());
-				outputMessage.put(PktsReceived, tokens[45].trim());
-			}
-			
-		}
-
-		
-		
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicParser.java
new file mode 100644
index 0000000..9a3b983
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicParser.java
@@ -0,0 +1,54 @@
+package org.apache.metron.parsing.parsers;
+
+import org.apache.metron.parser.interfaces.MessageParser;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+public abstract class BasicParser implements
+        MessageParser<JSONObject>,
+        Serializable {
+
+  protected static final Logger LOG = LoggerFactory
+          .getLogger(BasicParser.class);
+
+  @Override
+  public boolean validate(JSONObject message) {
+    JSONObject value = message;
+    if (!(value.containsKey("original_string"))) {
+      LOG.trace("[Metron] Message does not have original_string: " + message);
+      return false;
+    } else if (!(value.containsKey("timestamp"))) {
+      LOG.trace("[Metron] Message does not have timestamp: " + message);
+      return false;
+    } else {
+      LOG.trace("[Metron] Message conforms to schema: "
+              + message);
+      return true;
+    }
+  }
+
+  public String getKey(JSONObject value) {
+    try {
+      String ipSrcAddr = null;
+      String ipDstAddr = null;
+      if (value.containsKey("ip_src_addr"))
+        ipSrcAddr = value.get("ip_src_addr").toString();
+      if (value.containsKey("ip_dst_addr"))
+        ipDstAddr = value.get("ip_dst_addr").toString();
+      if (ipSrcAddr == null && ipDstAddr == null)
+        return "0";
+      if (ipSrcAddr == null || ipSrcAddr.length() == 0)
+        return ipDstAddr;
+      if (ipDstAddr == null || ipDstAddr.length() == 0)
+        return ipSrcAddr;
+      double ip1 = Double.parseDouble(ipSrcAddr.replace(".", ""));
+      double ip2 = Double.parseDouble(ipDstAddr.replace(".", ""));
+      return String.valueOf(ip1 + ip2);
+    } catch (Exception e) {
+      return "0";
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java
index 91fe312..75cd92c 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java
@@ -1,16 +1,21 @@
 package org.apache.metron.parsing.parsers;
 
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-
-import org.apache.metron.parser.interfaces.MessageParser;
 import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
 
 @SuppressWarnings("serial")
-public class BasicSnortParser extends AbstractParser implements MessageParser {
+public class BasicSnortParser extends BasicParser {
+
+	private static final Logger _LOG = LoggerFactory
+					.getLogger(BasicSnortParser.class);
 
 	/**
 	 * The default field names for Snort Alerts.
@@ -51,9 +56,15 @@ public class BasicSnortParser extends AbstractParser implements MessageParser {
 	private String recordDelimiter = ",";
 
 	@Override
-	public JSONObject parse(byte[] rawMessage) {
+	public void init() {
+
+	}
+
+	@Override
+	public List<JSONObject> parse(byte[] rawMessage) {
 
 		JSONObject jsonMessage = new JSONObject();
+		List<JSONObject> messages = new ArrayList<>();
 		try {
 			// snort alerts expected as csv records
 			String csvMessage = new String(rawMessage, "UTF-8");
@@ -63,7 +74,7 @@ public class BasicSnortParser extends AbstractParser implements MessageParser {
 			if (records.length != fieldNames.length) {
 				throw new IllegalArgumentException("Unexpected number of fields, expected: " + fieldNames.length + " got: " + records.length);
 			}
-
+			long timestamp = 0L;
 			// build the json record from each field
 			for (int i=0; i<records.length; i++) {
 			
@@ -73,7 +84,8 @@ public class BasicSnortParser extends AbstractParser implements MessageParser {
 				if("timestamp".equals(field)) {
 
 					// convert the timestamp to epoch
-					jsonMessage.put("timestamp", toEpoch(record));
+					timestamp = toEpoch(record);
+					jsonMessage.put("timestamp", timestamp);
 					
 				} else {
 					jsonMessage.put(field, record);
@@ -82,7 +94,7 @@ public class BasicSnortParser extends AbstractParser implements MessageParser {
 
 			// add original msg; required by 'checkForSchemaCorrectness'
 			jsonMessage.put("original_string", csvMessage);
-
+			messages.add(jsonMessage);
 		} catch (Exception e) {
 
             _LOG.error("unable to parse message: " + rawMessage);
@@ -90,13 +102,13 @@ public class BasicSnortParser extends AbstractParser implements MessageParser {
             return null;
         }
 
-		return jsonMessage;
+		return messages;
 	}
 
 	/**
 	 * Parses Snort's default date-time representation and
 	 * converts to epoch.
-	 * @param datetime Snort's default date-time as String '01/27-16:01:04.877970'
+	 * @param snortDatetime Snort's default date-time as String '01/27-16:01:04.877970'
 	 * @return epoch time
 	 * @throws java.text.ParseException 
 	 */

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSourcefireParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSourcefireParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSourcefireParser.java
index 3beed77..aa25511 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSourcefireParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSourcefireParser.java
@@ -17,15 +17,20 @@
 
 package org.apache.metron.parsing.parsers;
 
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import org.apache.metron.parser.interfaces.MessageParser;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 @SuppressWarnings("serial")
-public class BasicSourcefireParser extends AbstractParser implements MessageParser{
+public class BasicSourcefireParser extends BasicParser {
+
+	private static final Logger _LOG = LoggerFactory
+					.getLogger(BasicSourcefireParser.class);
 
 	public static final String hostkey = "host";
 	String domain_name_regex = "([^\\.]+)\\.([a-z]{2}|[a-z]{3}|([a-z]{2}\\.[a-z]{2}))$";
@@ -34,12 +39,17 @@ public class BasicSourcefireParser extends AbstractParser implements MessagePars
 	Pattern sidPattern = Pattern.compile(sidRegex);	
 	Pattern pattern = Pattern.compile(domain_name_regex);
 
+	@Override
+	public void init() {
+
+	}
+
 	@SuppressWarnings({ "unchecked", "unused" })
-	public JSONObject parse(byte[] msg) {
+	public List<JSONObject> parse(byte[] msg) {
 
 		JSONObject payload = new JSONObject();
 		String toParse = "";
-
+		List<JSONObject> messages = new ArrayList<>();
 		try {
 
 			toParse = new String(msg, "UTF-8");
@@ -80,8 +90,8 @@ public class BasicSourcefireParser extends AbstractParser implements MessagePars
 				payload.put("ip_dst_addr", dest);
 				dest_ip = dest;
 			}
-
-			payload.put("timestamp", System.currentTimeMillis());
+			long timestamp = System.currentTimeMillis();
+			payload.put("timestamp", timestamp);
 			
 			Matcher sidMatcher = sidPattern.matcher(toParse);
 			String originalString = null;
@@ -95,8 +105,8 @@ public class BasicSourcefireParser extends AbstractParser implements MessagePars
 			}
 			payload.put("original_string", originalString);
 			payload.put("signature_id", signatureId);
-
-			return payload;
+			messages.add(payload);
+			return messages;
 		} catch (Exception e) {
 			e.printStackTrace();
 			_LOG.error("Failed to parse: " + toParse);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicYafParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicYafParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicYafParser.java
index 48823e4..41ed29d 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicYafParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicYafParser.java
@@ -14,14 +14,15 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.util.Calendar;
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
 import java.util.Map;
 
 /**
  * Created by rmerriman on 1/27/16.
  */
-public class BasicYafParser extends AbstractParser {
+public class BasicYafParser extends BasicParser {
 
 
   /**
@@ -104,17 +105,17 @@ public class BasicYafParser extends AbstractParser {
   }
 
   @SuppressWarnings("unchecked")
-  public JSONObject parse(byte[] msg) {
+  public List<JSONObject> parse(byte[] msg) {
     //return parseManual(msg);
     return parseWithGrok(msg);
   }
 
-  private JSONObject parseWithGrok(byte[] msg) {
+  private List<JSONObject> parseWithGrok(byte[] msg) {
     _LOG.trace("[Metron] Starting to parse incoming message with grok");
     JSONObject jsonMessage = new JSONObject();
+    List<JSONObject> messages = new ArrayList<>();
     try {
       String rawMessage = new String(msg, "UTF-8");
-      System.out.println("Received message: " + rawMessage);
 
       Match gm = grok.match(rawMessage);
       gm.captures();
@@ -123,8 +124,10 @@ public class BasicYafParser extends AbstractParser {
 
       jsonMessage.put("original_string", rawMessage);
       String startTime = (String) grokMap.get("start_time");
+      long timestamp = 0L;
       if (startTime != null) {
-        jsonMessage.put("timestamp", toEpoch(startTime));
+        timestamp = toEpoch(startTime);
+        jsonMessage.put("timestamp", timestamp);
       } else {
         jsonMessage.put("timestamp", "0");
       }
@@ -136,11 +139,12 @@ public class BasicYafParser extends AbstractParser {
       }
       jsonMessage.remove("YAF_DELIMITED");
       jsonMessage.remove("start_time");
+      messages.add(jsonMessage);
     } catch (Exception e) {
       e.printStackTrace();
       return null;
     }
-    return jsonMessage;
+    return messages;
   }
 
   private JSONObject parseManual(byte[] msg) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokAsaParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokAsaParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokAsaParser.java
index 1238899..9f526b4 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokAsaParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokAsaParser.java
@@ -1,27 +1,20 @@
 package org.apache.metron.parsing.parsers;
 
+import oi.thekraken.grok.api.Grok;
+import oi.thekraken.grok.api.Match;
+import oi.thekraken.grok.api.exception.GrokException;
+import org.apache.commons.io.IOUtils;
+import org.json.simple.JSONObject;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.Serializable;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Locale;
-import java.util.Map;
-import java.util.TimeZone;
-
-import oi.thekraken.grok.api.Grok;
-import oi.thekraken.grok.api.Match;
-import oi.thekraken.grok.api.exception.GrokException;
-
-import org.apache.commons.io.IOUtils;
-import org.json.simple.JSONObject;
+import java.util.*;
 
-public class GrokAsaParser extends AbstractParser implements Serializable {
+public class GrokAsaParser extends BasicParser {
 
 	private static final long serialVersionUID = 945353287115350798L;
 	private transient  Grok  grok;
@@ -214,11 +207,11 @@ public class GrokAsaParser extends AbstractParser implements Serializable {
 	}
 
 	@Override
-	public JSONObject parse(byte[] raw_message) {
+	public List<JSONObject> parse(byte[] raw_message) {
 
 		String toParse = "";
 		JSONObject toReturn;
-
+		List<JSONObject> messages = new ArrayList<>();
 		try {
 
 			toParse = new String(raw_message, "UTF-8");
@@ -240,11 +233,11 @@ public class GrokAsaParser extends AbstractParser implements Serializable {
 			toReturn.putAll(response);
 
 			//System.out.println("*******I MAPPED: " + toReturn);
-
-			toReturn.put("timestamp", convertToEpoch(toReturn.get("MONTH").toString(), toReturn
-					.get("MONTHDAY").toString(), 
-					toReturn.get("TIME").toString(),
-					true));
+			long timestamp = convertToEpoch(toReturn.get("MONTH").toString(), toReturn
+											.get("MONTHDAY").toString(),
+							toReturn.get("TIME").toString(),
+							true);
+			toReturn.put("timestamp", timestamp);
 			
 			toReturn.remove("MONTHDAY");
 			toReturn.remove("TIME");
@@ -255,8 +248,8 @@ public class GrokAsaParser extends AbstractParser implements Serializable {
 			
 			toReturn.put("ip_src_addr", toReturn.remove("IPORHOST"));
 			toReturn.put("original_string", toParse);
-
-			return toReturn;
+			messages.add(toReturn);
+			return messages;
 
 		} catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokSourcefireParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokSourcefireParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokSourcefireParser.java
index 98dcffa..92fe6b6 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokSourcefireParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokSourcefireParser.java
@@ -1,15 +1,16 @@
 package org.apache.metron.parsing.parsers;
 
-import java.net.URL;
-
-import oi.thekraken.grok.api.Match;
 import oi.thekraken.grok.api.Grok;
+import oi.thekraken.grok.api.Match;
 import oi.thekraken.grok.api.exception.GrokException;
-
 import org.json.simple.JSONObject;
 
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
 
-public class GrokSourcefireParser extends AbstractParser{
+public class GrokSourcefireParser extends BasicParser {
 	
 	/**
 	 * 
@@ -38,14 +39,19 @@ public class GrokSourcefireParser extends AbstractParser{
 		grok = Grok.create(filepath);
 		grok.compile("%{"+pattern+"}");
 	}
-	
+
 	@Override
-	public JSONObject parse(byte[] raw_message) {
+	public void init() {
+
+	}
+
+	@Override
+	public List<JSONObject> parse(byte[] raw_message) {
 		JSONObject payload = new JSONObject();
 		String toParse = "";
 		JSONObject toReturn;
-		
 
+		List<JSONObject> messages = new ArrayList<>();
 		try {
 
 			toParse = new String(raw_message, "UTF-8");
@@ -60,7 +66,8 @@ public class GrokSourcefireParser extends AbstractParser{
 			proto = proto.replace("{", "");
 			proto = proto.replace("}", "");
 			toReturn.put("protocol", proto);
-			return toReturn;
+			messages.add(toReturn);
+			return messages;
 			
 		}
 		catch(Exception e)

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokUtils.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokUtils.java
deleted file mode 100644
index b8a6f6b..0000000
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokUtils.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.metron.parsing.parsers;
-import java.io.Serializable;
-
-import com.google.code.regexp.Pattern;
-
-public class GrokUtils implements Serializable {
-
-	private static final long serialVersionUID = 7465176887422419286L;
-	/**
-	   * Extract Grok patter like %{FOO} to FOO, Also Grok pattern with semantic.
-	   */
-	  public static final Pattern GROK_PATTERN = Pattern.compile(
-	      "%\\{" +
-	      "(?<name>" +
-	        "(?<pattern>[A-z0-9]+)" +
-	          "(?::(?<subname>[A-z0-9_:;\\/\\s\\.]+))?" +
-	          ")" +
-	          "(?:=(?<definition>" +
-	            "(?:" +
-	            "(?:[^{}]+|\\.+)+" +
-	            ")+" +
-	            ")" +
-	      ")?" +
-	      "\\}");
-
-	}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/JSONCleaner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/JSONCleaner.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/JSONCleaner.java
index 1ab12bc..8c69a4d 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/JSONCleaner.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/JSONCleaner.java
@@ -28,7 +28,7 @@ public class JSONCleaner implements Serializable {
 	 * Takes a json String as input and removes any Special Chars (^ a-z A-Z 0-9) in the keys
 	 */
 	@SuppressWarnings({"unchecked","rawtypes"})
-	public JSONObject Clean(String jsonString) throws ParseException
+	public JSONObject clean(String jsonString) throws ParseException
 	{
 		JSONParser parser = new JSONParser();
 		
@@ -54,7 +54,7 @@ public class JSONCleaner implements Serializable {
 		String jsonText = "{\"first_1\": 123, \"second\": [4, 5, 6], \"third\": 789}";
 		JSONCleaner cleaner = new JSONCleaner();
 		try {
-			//cleaner.Clean(jsonText);
+			//cleaner.clean(jsonText);
 			Map obj=new HashMap();
 			  obj.put("name","foo");
 			  obj.put("num",new Integer(100));

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/MetronGrok.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/MetronGrok.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/MetronGrok.java
index 44d9312..e3843d5 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/MetronGrok.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/MetronGrok.java
@@ -13,6 +13,7 @@ import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.parsing.utils.GrokUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/ParserUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/ParserUtils.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/ParserUtils.java
deleted file mode 100644
index a63889d..0000000
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/ParserUtils.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.metron.parsing.parsers;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.commons.io.IOUtils;
-
-public class ParserUtils {
-	
-	public static final String PREFIX = "stream2file";
-	public static final String SUFFIX = ".tmp";
-
-	public static File stream2file(InputStream in) throws IOException {
-		final File tempFile = File.createTempFile(PREFIX, SUFFIX);
-		tempFile.deleteOnExit();
-		try (FileOutputStream out = new FileOutputStream(tempFile)) {
-			IOUtils.copy(in, out);
-		}
-		return tempFile;
-	}
-}