You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/10/24 15:04:31 UTC
[42/51] [abbrv] metron git commit: METRON-1820 Update to new
Simple-Syslog-5424 version to support error handling (ottobackwards) closes
apache/metron#1234
METRON-1820 Update to new Simple-Syslog-5424 version to support error handling (ottobackwards) closes apache/metron#1234
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/08f3de0f
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/08f3de0f
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/08f3de0f
Branch: refs/heads/feature/METRON-1090-stellar-assignment
Commit: 08f3de0fe31fefa828952cbe76456580a4697630
Parents: 6214150
Author: ottobackwards <ot...@gmail.com>
Authored: Mon Oct 15 16:36:36 2018 -0400
Committer: otto <ot...@apache.org>
Committed: Mon Oct 15 16:36:36 2018 -0400
----------------------------------------------------------------------
dependencies_with_url.csv | 2 +-
.../impl/SensorParserConfigServiceImpl.java | 65 +++++++++++++-
.../org/apache/metron/parsers/GrokParser.java | 18 +---
.../apache/metron/parsers/bolt/ParserBolt.java | 37 +++++++-
.../parsers/interfaces/MessageParser.java | 18 ----
.../interfaces/MultilineMessageParser.java | 51 +++++++++++
.../metron/parsers/syslog/Syslog5424Parser.java | 44 ++++++++--
.../apache/metron/parsers/GrokParserTest.java | 8 +-
.../metron/parsers/MultiLineGrokParserTest.java | 5 +-
.../metron/parsers/bolt/ParserBoltTest.java | 91 +++++++++++---------
.../parsers/syslog/Syslog5424ParserTest.java | 55 +++++++++---
.../websphere/GrokWebSphereParserTest.java | 56 ++++++++----
pom.xml | 2 +-
13 files changed, 335 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/dependencies_with_url.csv
----------------------------------------------------------------------
diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv
index fb6c03c..2e1eedd 100644
--- a/dependencies_with_url.csv
+++ b/dependencies_with_url.csv
@@ -484,4 +484,4 @@ org.sonatype.sisu:sisu-inject-bean:jar:2.2.2:compile
org.sonatype.sisu:sisu-inject-plexus:jar:2.2.2:compile
com.zaxxer:HikariCP:jar:2.7.8:compile,ASLv2,https://github.com/brettwooldridge/HikariCP
org.hibernate.validator:hibernate-validator:jar:6.0.9.Final:compile,ASLv2,https://github.com/hibernate/hibernate-validator
-com.github.palindromicity:simple-syslog-5424:jar:0.0.8:compile,ASLv2,https://github.com/palindromicity/simple-syslog-5424
+com.github.palindromicity:simple-syslog-5424:jar:0.0.9:compile,ASLv2,https://github.com/palindromicity/simple-syslog-5424
http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
index 85b84b8..4cd272e 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import org.apache.curator.framework.CuratorFramework;
import org.apache.hadoop.fs.Path;
@@ -33,6 +34,8 @@ import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.common.zookeeper.ConfigurationsCache;
import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.apache.metron.parsers.interfaces.MultilineMessageParser;
import org.apache.metron.rest.MetronRestConstants;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.model.ParseMessageRequest;
@@ -138,13 +141,53 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService
} else if (sensorParserConfig.getParserClassName() == null) {
throw new RestException("SensorParserConfig must have a parserClassName");
} else {
- MessageParser<JSONObject> parser;
+ MultilineMessageParser<JSONObject> parser;
+ Object parserObject;
try {
- parser = (MessageParser<JSONObject>) Class.forName(sensorParserConfig.getParserClassName())
+ parserObject = Class.forName(sensorParserConfig.getParserClassName())
.newInstance();
} catch (Exception e) {
throw new RestException(e.toString(), e.getCause());
}
+
+ if (!(parserObject instanceof MultilineMessageParser)) {
+ parser = new MultilineMessageParser<JSONObject>() {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void configure(Map<String, Object> config) {
+ ((MessageParser<JSONObject>)parserObject).configure(config);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void init() {
+ ((MessageParser<JSONObject>)parserObject).init();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public boolean validate(JSONObject message) {
+ return ((MessageParser<JSONObject>)parserObject).validate(message);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public List<JSONObject> parse(byte[] message) {
+ return ((MessageParser<JSONObject>)parserObject).parse(message);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Optional<List<JSONObject>> parseOptional(byte[] message) {
+ return ((MessageParser<JSONObject>)parserObject).parseOptional(message);
+ }
+ };
+ } else {
+ parser = (MultilineMessageParser<JSONObject>)parserObject;
+ }
+
+
Path temporaryGrokPath = null;
if (isGrokConfig(sensorParserConfig)) {
String name = parseMessageRequest.getSensorParserConfig().getSensorTopic();
@@ -152,13 +195,27 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService
sensorParserConfig.getParserConfig()
.put(MetronRestConstants.GROK_PATH_KEY, new Path(temporaryGrokPath, name).toString());
}
+
parser.configure(sensorParserConfig.getParserConfig());
parser.init();
- JSONObject results = parser.parse(parseMessageRequest.getSampleData().getBytes()).get(0);
+
+ Optional<MessageParserResult<JSONObject>> result = parser.parseOptionalResult(parseMessageRequest.getSampleData().getBytes());
+ if (!result.isPresent()) {
+ throw new RestException("Unknown error parsing sample data");
+ }
+
+ if (result.get().getMasterThrowable().isPresent()) {
+ throw new RestException("Error parsing sample data",result.get().getMasterThrowable().get());
+ }
+
+ if (result.get().getMessages().isEmpty()) {
+ throw new RestException("No results from parsing sample data");
+ }
+
if (isGrokConfig(sensorParserConfig) && temporaryGrokPath != null) {
grokService.deleteTemporary();
}
- return results;
+ return result.get().getMessages().get(0);
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
index bead477..a81149d 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.metron.common.Constants;
import org.apache.metron.parsers.interfaces.MessageParser;
import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.apache.metron.parsers.interfaces.MultilineMessageParser;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,7 +54,7 @@ import java.util.Optional;
import java.util.TimeZone;
-public class GrokParser implements MessageParser<JSONObject>, Serializable {
+public class GrokParser implements MultilineMessageParser<JSONObject>, Serializable {
protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -143,21 +144,6 @@ public class GrokParser implements MessageParser<JSONObject>, Serializable {
@SuppressWarnings("unchecked")
@Override
- public List<JSONObject> parse(byte[] rawMessage) {
- Optional<MessageParserResult<JSONObject>> resultOptional = parseOptionalResult(rawMessage);
- if (!resultOptional.isPresent()) {
- return Collections.EMPTY_LIST;
- }
- Map<Object,Throwable> errors = resultOptional.get().getMessageThrowables();
- if (!errors.isEmpty()) {
- throw new RuntimeException(errors.entrySet().iterator().next().getValue());
- }
-
- return resultOptional.get().getMessages();
- }
-
- @SuppressWarnings("unchecked")
- @Override
public Optional<MessageParserResult<JSONObject>> parseOptionalResult(byte[] rawMessage) {
if (grok == null) {
init();
http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index ff5c1d4..05334c2 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -37,6 +37,7 @@ import org.apache.metron.parsers.filters.Filters;
import org.apache.metron.parsers.interfaces.MessageFilter;
import org.apache.metron.parsers.interfaces.MessageParser;
import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.apache.metron.parsers.interfaces.MultilineMessageParser;
import org.apache.metron.parsers.topology.ParserComponents;
import org.apache.metron.stellar.common.CachingStellarProcessor;
import org.apache.metron.stellar.dsl.Context;
@@ -307,9 +308,43 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
, sensorParserConfig.getReadMetadata()
, sensorParserConfig.getRawMessageStrategyConfig()
);
+
metadata = rawMessage.getMetadata();
- Optional<MessageParserResult<JSONObject>> results = parser.parseOptionalResult(rawMessage.getMessage());
+ MultilineMessageParser mmp = null;
+ if (!(parser instanceof MultilineMessageParser)) {
+ mmp = new MultilineMessageParser<JSONObject>() {
+
+ @Override
+ public void configure(Map<String, Object> config) {
+ parser.configure(config);
+ }
+
+ @Override
+ public void init() {
+ parser.init();
+ }
+
+ @Override
+ public boolean validate(JSONObject message) {
+ return parser.validate(message);
+ }
+
+ @Override
+ public List<JSONObject> parse(byte[] message) {
+ return parser.parse(message);
+ }
+
+ @Override
+ public Optional<List<JSONObject>> parseOptional(byte[] message) {
+ return parser.parseOptional(message);
+ }
+ };
+ } else {
+ mmp = (MultilineMessageParser) parser;
+ }
+
+ Optional<MessageParserResult<JSONObject>> results = mmp.parseOptionalResult(rawMessage.getMessage());
// check if there is a master error
if (results.isPresent() && results.get().getMasterThrowable().isPresent()) {
http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
index f8243b9..665076b 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
@@ -50,24 +50,6 @@ public interface MessageParser<T> extends Configurable {
}
/**
- * Take raw data and convert it to messages. Each raw message may produce multiple messages and therefore
- * multiple errors. A {@link MessageParserResult} is returned, which will have both the messages produced
- * and the errors.
- * @param parseMessage the raw bytes of the message
- * @return Optional of {@link MessageParserResult}
- */
- default Optional<MessageParserResult<T>> parseOptionalResult(byte[] parseMessage) {
- List<T> list = new ArrayList<>();
- try {
- Optional<List<T>> optionalMessages = parseOptional(parseMessage);
- optionalMessages.ifPresent(list::addAll);
- } catch (Throwable t) {
- return Optional.of(new DefaultMessageParserResult<>(t));
- }
- return Optional.of(new DefaultMessageParserResult<T>(list));
- }
-
- /**
* Validate the message to ensure that it's correct.
* @param message the message to validate
* @return true if the message is valid, false if not
http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java
new file mode 100644
index 0000000..7818f9a
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers.interfaces;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.metron.parsers.DefaultMessageParserResult;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+public interface MultilineMessageParser<T> extends MessageParser<T> {
+
+ default List<T> parse(byte[] rawMessage) {
+ throw new NotImplementedException("parse is not implemented");
+ }
+
+ /**
+ * Take raw data and convert it to messages. Each raw message may produce multiple messages and therefore
+ * multiple errors. A {@link MessageParserResult} is returned, which will have both the messages produced
+ * and the errors.
+ * @param parseMessage the raw bytes of the message
+ * @return Optional of {@link MessageParserResult}
+ */
+ default Optional<MessageParserResult<T>> parseOptionalResult(byte[] parseMessage) {
+ List<T> list = new ArrayList<>();
+ try {
+ Optional<List<T>> optionalMessages = parseOptional(parseMessage);
+ optionalMessages.ifPresent(list::addAll);
+ } catch (Throwable t) {
+ return Optional.of(new DefaultMessageParserResult<>(t));
+ }
+ return Optional.of(new DefaultMessageParserResult<T>(list));
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
index e3ad941..79a082a 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
@@ -25,23 +25,35 @@ import com.github.palindromicity.syslog.SyslogParserBuilder;
import com.github.palindromicity.syslog.dsl.SyslogFieldKeys;
import org.apache.commons.lang3.StringUtils;
import org.apache.metron.parsers.BasicParser;
+import org.apache.metron.parsers.DefaultMessageParserResult;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.apache.metron.parsers.interfaces.MultilineMessageParser;
import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
+import java.io.IOException;
import java.io.Reader;
+import java.io.Serializable;
import java.io.StringReader;
+import java.lang.invoke.MethodHandles;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
/**
* Parser for well structured RFC 5424 messages.
*/
-public class Syslog5424Parser extends BasicParser {
+public class Syslog5424Parser implements MultilineMessageParser<JSONObject>, Serializable {
+ protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String NIL_POLICY_CONFIG = "nilPolicy";
private transient SyslogParser syslogParser;
@@ -62,15 +74,31 @@ public class Syslog5424Parser extends BasicParser {
}
@Override
+ public boolean validate(JSONObject message) {
+ JSONObject value = message;
+ if (!(value.containsKey("original_string"))) {
+ LOG.trace("[Metron] Message does not have original_string: {}", message);
+ return false;
+ } else if (!(value.containsKey("timestamp"))) {
+ LOG.trace("[Metron] Message does not have timestamp: {}", message);
+ return false;
+ } else {
+ LOG.trace("[Metron] Message conforms to schema: {}", message);
+ return true;
+ }
+ }
+
+ @Override
@SuppressWarnings("unchecked")
- public List<JSONObject> parse(byte[] rawMessage) {
+ public Optional<MessageParserResult<JSONObject>> parseOptionalResult(byte[] rawMessage) {
try {
if (rawMessage == null || rawMessage.length == 0) {
- return null;
+ return Optional.empty();
}
String originalString = new String(rawMessage);
List<JSONObject> returnList = new ArrayList<>();
+ Map<Object,Throwable> errorMap = new HashMap<>();
try (Reader reader = new BufferedReader(new StringReader(originalString))) {
syslogParser.parseLines(reader, (m) -> {
JSONObject jsonObject = new JSONObject(m);
@@ -79,14 +107,14 @@ public class Syslog5424Parser extends BasicParser {
jsonObject.put("original_string", originalString);
setTimestamp(jsonObject);
returnList.add(jsonObject);
- });
+ },errorMap::put);
- return returnList;
+ return Optional.of(new DefaultMessageParserResult<JSONObject>(returnList,errorMap));
}
- } catch (Exception e) {
- String message = "Unable to parse " + new String(rawMessage) + ": " + e.getMessage();
+ } catch (IOException e) {
+ String message = "Unable to read buffer " + new String(rawMessage) + ": " + e.getMessage();
LOG.error(message, e);
- throw new IllegalStateException(message, e);
+ return Optional.of(new DefaultMessageParserResult<JSONObject>( new IllegalStateException(message, e)));
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java
index 7fa6a31..24386fa 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java
@@ -19,6 +19,7 @@ package org.apache.metron.parsers;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
@@ -30,6 +31,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
public abstract class GrokParserTest {
@@ -53,8 +55,10 @@ public abstract class GrokParserTest {
JSONObject expected = (JSONObject) jsonParser.parse(e.getValue());
byte[] rawMessage = e.getKey().getBytes();
-
- List<JSONObject> parsedList = grokParser.parse(rawMessage);
+ Optional<MessageParserResult<JSONObject>> resultOptional = grokParser.parseOptionalResult(rawMessage);
+ Assert.assertNotNull(resultOptional);
+ Assert.assertTrue(resultOptional.isPresent());
+ List<JSONObject> parsedList = resultOptional.get().getMessages();
Assert.assertEquals(1, parsedList.size());
compare(expected, parsedList.get(0));
}
http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java
index cc4d20f..e24a39d 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java
@@ -61,7 +61,10 @@ public class MultiLineGrokParserTest {
Map<String, String> testData = getTestData();
for (Map.Entry<String, String> e : testData.entrySet()) {
byte[] rawMessage = e.getKey().getBytes();
- List<JSONObject> parsedList = grokParser.parse(rawMessage);
+ Optional<MessageParserResult<JSONObject>> resultOptional = grokParser.parseOptionalResult(rawMessage);
+ Assert.assertNotNull(resultOptional);
+ Assert.assertTrue(resultOptional.isPresent());
+ List<JSONObject> parsedList = resultOptional.get().getMessages();
Assert.assertEquals(10, parsedList.size());
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index 0ae2817..e5e7180 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -17,29 +17,7 @@
*/
package org.apache.metron.parsers.bolt;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.argThat;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.curator.framework.CuratorFramework;
import org.apache.hadoop.hbase.util.Bytes;
@@ -55,10 +33,10 @@ import org.apache.metron.common.writer.BulkMessageWriter;
import org.apache.metron.common.writer.BulkWriterResponse;
import org.apache.metron.common.writer.MessageWriter;
import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
-import org.apache.metron.parsers.BasicParser;
import org.apache.metron.parsers.DefaultMessageParserResult;
import org.apache.metron.parsers.interfaces.MessageFilter;
-import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.apache.metron.parsers.interfaces.MultilineMessageParser;
import org.apache.metron.parsers.topology.ParserComponents;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.test.bolt.BaseBoltTest;
@@ -72,10 +50,33 @@ import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mock;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
public class ParserBoltTest extends BaseBoltTest {
@Mock
- private MessageParser<JSONObject> parser;
+ private MultilineMessageParser<JSONObject> parser;
@Mock
private MessageWriter<JSONObject> writer;
@@ -210,7 +211,7 @@ public class ParserBoltTest extends BaseBoltTest {
byte[] sampleBinary = "some binary message".getBytes();
when(tuple.getBinary(0)).thenReturn(sampleBinary);
- when(parser.parseOptional(sampleBinary)).thenReturn(null);
+ when(parser.parseOptionalResult(sampleBinary)).thenReturn(null);
parserBolt.execute(tuple);
verify(parser, times(0)).validate(any());
verify(writer, times(0)).write(eq(sensorType), any(ParserWriterConfiguration.class), eq(tuple), any());
@@ -405,9 +406,9 @@ public class ParserBoltTest extends BaseBoltTest {
verify(parser, times(1)).init();
verify(batchWriter, times(1)).init(any(), any(), any());
when(parser.validate(any())).thenReturn(true);
- when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject(new HashMap<String, Object>() {{
+ when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject(new HashMap<String, Object>() {{
put("field2", "blah");
- }}))));
+ }})))));
parserBolt.execute(t1);
verify(batchWriter, times(0)).write(any(), any(), any(), any());
verify(outputCollector, times(1)).ack(t1);
@@ -437,26 +438,36 @@ public class ParserBoltTest extends BaseBoltTest {
String sensorType = "dummy";
RecordingWriter recordingWriter = new RecordingWriter();
//create a parser which acts like a basic parser but returns no timestamp field.
- BasicParser dummyParser = new BasicParser() {
+ MultilineMessageParser<JSONObject> dummyParser = new MultilineMessageParser<JSONObject>() {
@Override
- public void init() {
-
+ public void configure(Map<String, Object> config) {
}
@Override
- public List<JSONObject> parse(byte[] rawMessage) {
- return ImmutableList.of(new JSONObject() {{
- put("data", "foo");
- put("timestampstr", "2016-01-05 17:02:30");
- put("original_string", "blah");
- }});
+ public void init() {
}
@Override
- public void configure(Map<String, Object> config) {
+ public boolean validate(JSONObject message) {
+ Object timestampObject = message.get(Constants.Fields.TIMESTAMP.getName());
+ if (timestampObject instanceof Long) {
+ Long timestamp = (Long) timestampObject;
+ return timestamp > 0;
+ }
+ return false;
+ }
+ @Override
+ @SuppressWarnings("unchecked")
+ public Optional<MessageParserResult<JSONObject>> parseOptionalResult(byte[] rawMessage) {
+ return Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject() {{
+ put("data", "foo");
+ put("timestampstr", "2016-01-05 17:02:30");
+ put("original_string", "blah");
+ }})));
}
};
+
Map<String, ParserComponents> parserMap = Collections.singletonMap(
sensorType,
new ParserComponents(
@@ -502,7 +513,7 @@ public class ParserBoltTest extends BaseBoltTest {
verify(parser, times(1)).init();
verify(batchWriter, times(1)).init(any(), any(), any());
when(parser.validate(any())).thenReturn(true);
- when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject())));
+ when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject()))));
when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
BulkWriterResponse response = new BulkWriterResponse();
Tuple[] uniqueTuples = new Tuple[ParserConfigurations.DEFAULT_KAFKA_BATCH_SIZE];
@@ -592,7 +603,7 @@ public class ParserBoltTest extends BaseBoltTest {
verify(parser, times(1)).init();
verify(batchWriter, times(1)).init(any(), any(), any());
when(parser.validate(any())).thenReturn(true);
- when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject())));
+ when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject()))));
when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
BulkWriterResponse response = new BulkWriterResponse();
response.addSuccess(t1);
http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/syslog/Syslog5424ParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/syslog/Syslog5424ParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/syslog/Syslog5424ParserTest.java
index 0ef26ff..b3e4507 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/syslog/Syslog5424ParserTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/syslog/Syslog5424ParserTest.java
@@ -20,6 +20,7 @@ package org.apache.metron.parsers.syslog;
import com.github.palindromicity.syslog.NilPolicy;
import com.github.palindromicity.syslog.dsl.SyslogFieldKeys;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
import org.json.simple.JSONObject;
import org.junit.Assert;
import org.junit.Test;
@@ -28,6 +29,7 @@ import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.function.Consumer;
public class Syslog5424ParserTest {
@@ -87,7 +89,7 @@ public class Syslog5424ParserTest {
});
}
- @Test(expected = IllegalStateException.class)
+ @Test()
public void testNotValid() {
test(null, "not valid", (message) -> Assert.assertTrue(false));
}
@@ -100,7 +102,7 @@ public class Syslog5424ParserTest {
}
parser.configure(config);
- List<JSONObject> output = parser.parse(line.getBytes());
+ parser.parseOptionalResult(line.getBytes());
}
@Test
@@ -116,8 +118,33 @@ public class Syslog5424ParserTest {
.append(SYSLOG_LINE_MISSING)
.append("\n")
.append(SYSLOG_LINE_ALL);
- List<JSONObject> output = parser.parse(builder.toString().getBytes());
- Assert.assertEquals(3,output.size());
+ Optional<MessageParserResult<JSONObject>> resultOptional = parser.parseOptionalResult(builder.toString().getBytes());
+ Assert.assertNotNull(resultOptional);
+ Assert.assertTrue(resultOptional.isPresent());
+ List<JSONObject> parsedList = resultOptional.get().getMessages();
+ Assert.assertEquals(3,parsedList.size());
+ }
+
+ @Test
+ public void testReadMultiLineWithErrors() throws Exception {
+ Syslog5424Parser parser = new Syslog5424Parser();
+ Map<String, Object> config = new HashMap<>();
+ config.put(Syslog5424Parser.NIL_POLICY_CONFIG, NilPolicy.DASH.name());
+ parser.configure(config);
+ StringBuilder builder = new StringBuilder();
+ builder
+ .append("HEREWEGO!!!!\n")
+ .append(SYSLOG_LINE_ALL)
+ .append("\n")
+ .append(SYSLOG_LINE_MISSING)
+ .append("\n")
+ .append("BOOM!\n")
+ .append(SYSLOG_LINE_ALL)
+ .append("\nOHMY!");
+ Optional<MessageParserResult<JSONObject>> output = parser.parseOptionalResult(builder.toString().getBytes());
+ Assert.assertTrue(output.isPresent());
+ Assert.assertEquals(3,output.get().getMessages().size());
+ Assert.assertEquals(3,output.get().getMessageThrowables().size());
}
@Test
@@ -126,21 +153,29 @@ public class Syslog5424ParserTest {
Map<String, Object> config = new HashMap<>();
config.put(Syslog5424Parser.NIL_POLICY_CONFIG, NilPolicy.DASH.name());
parser.configure(config);
- List<JSONObject> output = parser.parse(SYSLOG_LINE_MISSING_DATE.getBytes());
- String timeStampString = output.get(0).get("timestamp").toString();
+ Optional<MessageParserResult<JSONObject>> output = parser.parseOptionalResult(SYSLOG_LINE_MISSING_DATE.getBytes());
+ Assert.assertNotNull(output);
+ Assert.assertTrue(output.isPresent());
+ String timeStampString = output.get().getMessages().get(0).get("timestamp").toString();
DateTimeFormatter.ISO_DATE_TIME.parse(timeStampString);
config.clear();
config.put(Syslog5424Parser.NIL_POLICY_CONFIG, NilPolicy.NULL.name());
parser.configure(config);
- output = parser.parse(SYSLOG_LINE_MISSING_DATE.getBytes());
- timeStampString = output.get(0).get("timestamp").toString();
+ output = parser.parseOptionalResult(SYSLOG_LINE_MISSING_DATE.getBytes());
+ Assert.assertNotNull(output);
+ Assert.assertTrue(output.isPresent());
+ timeStampString = output.get().getMessages().get(0).get("timestamp").toString();
DateTimeFormatter.ISO_DATE_TIME.parse(timeStampString);
config.clear();
config.put(Syslog5424Parser.NIL_POLICY_CONFIG, NilPolicy.OMIT.name());
parser.configure(config);
- output = parser.parse(SYSLOG_LINE_MISSING_DATE.getBytes());
- timeStampString = output.get(0).get("timestamp").toString();
+
+ output = parser.parseOptionalResult(SYSLOG_LINE_MISSING_DATE.getBytes());
+ Assert.assertNotNull(output);
+ Assert.assertTrue(output.isPresent());
+
+ timeStampString = output.get().getMessages().get(0).get("timestamp").toString();
DateTimeFormatter.ISO_DATE_TIME.parse(timeStampString);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
index 2923a4f..eb447d0 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
@@ -24,11 +24,14 @@ import java.time.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.apache.log4j.Level;
import org.apache.metron.parsers.GrokParser;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
import org.apache.metron.test.utils.UnitTestHelper;
import org.json.simple.JSONObject;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -54,7 +57,10 @@ public class GrokWebSphereParserTest {
parser.configure(parserConfig);
String testString = "<133>Apr 15 17:47:28 ABCXML1413 [rojOut][0x81000033][auth][notice] user(rick007): "
+ "[120.43.200.6]: User logged into 'cohlOut'.";
- List<JSONObject> result = parser.parse(testString.getBytes());
+ Optional<MessageParserResult<JSONObject>> resultOptional = parser.parseOptionalResult(testString.getBytes());
+ Assert.assertNotNull(resultOptional);
+ Assert.assertTrue(resultOptional.isPresent());
+ List<JSONObject> result = resultOptional.get().getMessages();
JSONObject parsedJSON = result.get(0);
long expectedTimestamp = ZonedDateTime.of(Year.now(UTC).getValue(), 4, 15, 17, 47, 28, 0, UTC).toInstant().toEpochMilli();
@@ -73,14 +79,17 @@ public class GrokWebSphereParserTest {
}
@Test
- public void tetsParseLogoutLine() throws Exception {
+ public void testParseLogoutLine() throws Exception {
//Set up parser, parse message
GrokWebSphereParser parser = new GrokWebSphereParser();
parser.configure(parserConfig);
String testString = "<134>Apr 15 18:02:27 PHIXML3RWD [0x81000019][auth][info] [14.122.2.201]: "
+ "User 'hjpotter' logged out from 'default'.";
- List<JSONObject> result = parser.parse(testString.getBytes());
+ Optional<MessageParserResult<JSONObject>> resultOptional = parser.parseOptionalResult(testString.getBytes());
+ Assert.assertNotNull(resultOptional);
+ Assert.assertTrue(resultOptional.isPresent());
+ List<JSONObject> result = resultOptional.get().getMessages();
JSONObject parsedJSON = result.get(0);
long expectedTimestamp = ZonedDateTime.of(Year.now(UTC).getValue(), 4, 15, 18, 2, 27, 0, UTC).toInstant().toEpochMilli();
@@ -98,14 +107,17 @@ public class GrokWebSphereParserTest {
}
@Test
- public void tetsParseRBMLine() throws Exception {
+ public void testParseRBMLine() throws Exception {
//Set up parser, parse message
GrokWebSphereParser parser = new GrokWebSphereParser();
parser.configure(parserConfig);
String testString = "<131>Apr 15 17:36:35 ROBXML3QRS [0x80800018][auth][error] rbm(RBM-Settings): "
+ "trans(3502888135)[request] gtid(3502888135): RBM: Resource access denied.";
- List<JSONObject> result = parser.parse(testString.getBytes());
+ Optional<MessageParserResult<JSONObject>> resultOptional = parser.parseOptionalResult(testString.getBytes());
+ Assert.assertNotNull(resultOptional);
+ Assert.assertTrue(resultOptional.isPresent());
+ List<JSONObject> result = resultOptional.get().getMessages();
JSONObject parsedJSON = result.get(0);
long expectedTimestamp = ZonedDateTime.of(Year.now(UTC).getValue(), 4, 15, 17, 36, 35, 0, UTC).toInstant().toEpochMilli();
@@ -122,16 +134,18 @@ public class GrokWebSphereParserTest {
}
@Test
- public void tetsParseOtherLine() throws Exception {
+ public void testParseOtherLine() throws Exception {
//Set up parser, parse message
GrokWebSphereParser parser = new GrokWebSphereParser();
parser.configure(parserConfig);
String testString = "<134>Apr 15 17:17:34 SAGPXMLQA333 [0x8240001c][audit][info] trans(191): (admin:default:system:*): "
+ "ntp-service 'NTP Service' - Operational state down";
- List<JSONObject> result = parser.parse(testString.getBytes());
+ Optional<MessageParserResult<JSONObject>> resultOptional = parser.parseOptionalResult(testString.getBytes());
+ Assert.assertNotNull(resultOptional);
+ Assert.assertTrue(resultOptional.isPresent());
+ List<JSONObject> result = resultOptional.get().getMessages();
JSONObject parsedJSON = result.get(0);
-
long expectedTimestamp = ZonedDateTime.of(Year.now(UTC).getValue(), 4, 15, 17, 17, 34, 0, UTC).toInstant().toEpochMilli();
//Compare fields
@@ -153,7 +167,10 @@ public class GrokWebSphereParserTest {
parser.configure(parserConfig);
String testString = "<133>Apr 15 17:47:28 ABCXML1413 [rojOut][0x81000033][auth][notice] rick007): "
+ "[120.43.200. User logged into 'cohlOut'.";
- List<JSONObject> result = parser.parse(testString.getBytes());
+ Optional<MessageParserResult<JSONObject>> resultOptional = parser.parseOptionalResult(testString.getBytes());
+ Assert.assertNotNull(resultOptional);
+ Assert.assertTrue(resultOptional.isPresent());
+ List<JSONObject> result = resultOptional.get().getMessages();
JSONObject parsedJSON = result.get(0);
long expectedTimestamp = ZonedDateTime.of(Year.now(UTC).getValue(), 4, 15, 17, 47, 28, 0, UTC).toInstant().toEpochMilli();
@@ -172,14 +189,17 @@ public class GrokWebSphereParserTest {
}
@Test
- public void tetsParseMalformedLogoutLine() throws Exception {
+ public void testParseMalformedLogoutLine() throws Exception {
//Set up parser, attempt to parse malformed message
GrokWebSphereParser parser = new GrokWebSphereParser();
parser.configure(parserConfig);
String testString = "<134>Apr 15 18:02:27 PHIXML3RWD [0x81000019][auth][info] [14.122.2.201: "
+ "User 'hjpotter' logged out from 'default.";
- List<JSONObject> result = parser.parse(testString.getBytes());
+ Optional<MessageParserResult<JSONObject>> resultOptional = parser.parseOptionalResult(testString.getBytes());
+ Assert.assertNotNull(resultOptional);
+ Assert.assertTrue(resultOptional.isPresent());
+ List<JSONObject> result = resultOptional.get().getMessages();
JSONObject parsedJSON = result.get(0);
long expectedTimestamp = ZonedDateTime.of(Year.now(UTC).getValue(), 4, 15, 18, 2, 27, 0, UTC).toInstant().toEpochMilli();
@@ -197,14 +217,17 @@ public class GrokWebSphereParserTest {
}
@Test
- public void tetsParseMalformedRBMLine() throws Exception {
+ public void testParseMalformedRBMLine() throws Exception {
//Set up parser, parse message
GrokWebSphereParser parser = new GrokWebSphereParser();
parser.configure(parserConfig);
String testString = "<131>Apr 15 17:36:35 ROBXML3QRS [0x80800018][auth][error] rbmRBM-Settings): "
+ "trans3502888135)[request] gtid3502888135) RBM: Resource access denied.";
- List<JSONObject> result = parser.parse(testString.getBytes());
+ Optional<MessageParserResult<JSONObject>> resultOptional = parser.parseOptionalResult(testString.getBytes());
+ Assert.assertNotNull(resultOptional);
+ Assert.assertTrue(resultOptional.isPresent());
+ List<JSONObject> result = resultOptional.get().getMessages();
JSONObject parsedJSON = result.get(0);
long expectedTimestamp = ZonedDateTime.of(Year.now(UTC).getValue(), 4, 15, 17, 36, 35, 0, UTC).toInstant().toEpochMilli();
@@ -221,14 +244,17 @@ public class GrokWebSphereParserTest {
}
@Test
- public void tetsParseMalformedOtherLine() throws Exception {
+ public void testParseMalformedOtherLine() throws Exception {
//Set up parser, parse message
GrokWebSphereParser parser = new GrokWebSphereParser();
parser.configure(parserConfig);
String testString = "<134>Apr 15 17:17:34 SAGPXMLQA333 [0x8240001c][audit][info] trans 191) admindefaultsystem*): "
+ "ntp-service 'NTP Service' - Operational state down:";
- List<JSONObject> result = parser.parse(testString.getBytes());
+ Optional<MessageParserResult<JSONObject>> resultOptional = parser.parseOptionalResult(testString.getBytes());
+ Assert.assertNotNull(resultOptional);
+ Assert.assertTrue(resultOptional.isPresent());
+ List<JSONObject> result = resultOptional.get().getMessages();
JSONObject parsedJSON = result.get(0);
long expectedTimestamp = ZonedDateTime.of(Year.now(UTC).getValue(), 4, 15, 17, 17, 34, 0, UTC).toInstant().toEpochMilli();
http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1e6adb0..f412036 100644
--- a/pom.xml
+++ b/pom.xml
@@ -137,7 +137,7 @@
<global_reflections_version>0.9.10</global_reflections_version>
<global_checkstyle_version>8.0</global_checkstyle_version>
<global_log4j_core_version>2.1</global_log4j_core_version>
- <global_simple_syslog_version>0.0.8</global_simple_syslog_version>
+ <global_simple_syslog_version>0.0.9</global_simple_syslog_version>
<global_spark_version>2.3.1</global_spark_version>
</properties>