You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/10/24 15:04:33 UTC
[44/51] [abbrv] metron git commit: METRON-1681 Decouple the
ParserBolt from the Parse execution logic (merrimanr) closes
apache/metron#1213
METRON-1681 Decouple the ParserBolt from the Parse execution logic (merrimanr) closes apache/metron#1213
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/28542ad6
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/28542ad6
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/28542ad6
Branch: refs/heads/feature/METRON-1090-stellar-assignment
Commit: 28542ad64cf63f17b728b4b1c0e995a8973767f7
Parents: 08f3de0
Author: merrimanr <me...@gmail.com>
Authored: Thu Oct 18 13:59:52 2018 -0500
Committer: rmerriman <me...@gmail.com>
Committed: Thu Oct 18 13:59:52 2018 -0500
----------------------------------------------------------------------
.../impl/SensorParserConfigServiceImpl.java | 51 +-
.../parsers/DefaultParserRunnerResults.java | 71 ++
.../org/apache/metron/parsers/GrokParser.java | 3 +-
.../org/apache/metron/parsers/ParserRunner.java | 60 ++
.../apache/metron/parsers/ParserRunnerImpl.java | 322 +++++++
.../metron/parsers/ParserRunnerResults.java | 33 +
.../apache/metron/parsers/bolt/ParserBolt.java | 381 +++-----
.../parsers/filters/BroMessageFilter.java | 2 +-
.../metron/parsers/filters/StellarFilter.java | 2 +-
.../parsers/interfaces/MessageFilter.java | 2 +-
.../parsers/interfaces/MessageParser.java | 27 +-
.../interfaces/MultilineMessageParser.java | 51 --
.../metron/parsers/syslog/Syslog5424Parser.java | 4 +-
.../parsers/topology/ParserComponent.java | 56 ++
.../parsers/topology/ParserComponents.java | 67 --
.../parsers/topology/ParserTopologyBuilder.java | 39 +-
.../org/apache/metron/filters/FiltersTest.java | 4 +-
.../metron/parsers/MessageParserTest.java | 108 ++-
.../metron/parsers/ParserRunnerImplTest.java | 390 +++++++++
.../metron/parsers/bolt/ParserBoltTest.java | 859 ++++++-------------
.../parsers/integration/ParserDriver.java | 60 +-
21 files changed, 1481 insertions(+), 1111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
index 4cd272e..d0e4b3d 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
@@ -20,12 +20,10 @@ package org.apache.metron.rest.service.impl;
import static org.apache.metron.rest.MetronRestConstants.GROK_CLASS_NAME;
import com.fasterxml.jackson.databind.ObjectMapper;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import org.apache.curator.framework.CuratorFramework;
import org.apache.hadoop.fs.Path;
import org.apache.metron.common.configuration.ConfigurationType;
@@ -35,18 +33,14 @@ import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.common.zookeeper.ConfigurationsCache;
import org.apache.metron.parsers.interfaces.MessageParser;
import org.apache.metron.parsers.interfaces.MessageParserResult;
-import org.apache.metron.parsers.interfaces.MultilineMessageParser;
import org.apache.metron.rest.MetronRestConstants;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.model.ParseMessageRequest;
import org.apache.metron.rest.service.GrokService;
import org.apache.metron.rest.service.SensorParserConfigService;
import org.apache.metron.rest.util.ParserIndex;
-import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
import org.apache.zookeeper.KeeperException;
import org.json.simple.JSONObject;
-import org.reflections.Reflections;
-import org.reflections.util.ConfigurationBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -141,53 +135,13 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService
} else if (sensorParserConfig.getParserClassName() == null) {
throw new RestException("SensorParserConfig must have a parserClassName");
} else {
- MultilineMessageParser<JSONObject> parser;
- Object parserObject;
+ MessageParser<JSONObject> parser;
try {
- parserObject = Class.forName(sensorParserConfig.getParserClassName())
+ parser = (MessageParser<JSONObject>) Class.forName(sensorParserConfig.getParserClassName())
.newInstance();
} catch (Exception e) {
throw new RestException(e.toString(), e.getCause());
}
-
- if (!(parserObject instanceof MultilineMessageParser)) {
- parser = new MultilineMessageParser<JSONObject>() {
-
- @Override
- @SuppressWarnings("unchecked")
- public void configure(Map<String, Object> config) {
- ((MessageParser<JSONObject>)parserObject).configure(config);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void init() {
- ((MessageParser<JSONObject>)parserObject).init();
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public boolean validate(JSONObject message) {
- return ((MessageParser<JSONObject>)parserObject).validate(message);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public List<JSONObject> parse(byte[] message) {
- return ((MessageParser<JSONObject>)parserObject).parse(message);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public Optional<List<JSONObject>> parseOptional(byte[] message) {
- return ((MessageParser<JSONObject>)parserObject).parseOptional(message);
- }
- };
- } else {
- parser = (MultilineMessageParser<JSONObject>)parserObject;
- }
-
-
Path temporaryGrokPath = null;
if (isGrokConfig(sensorParserConfig)) {
String name = parseMessageRequest.getSensorParserConfig().getSensorTopic();
@@ -195,7 +149,6 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService
sensorParserConfig.getParserConfig()
.put(MetronRestConstants.GROK_PATH_KEY, new Path(temporaryGrokPath, name).toString());
}
-
parser.configure(sensorParserConfig.getParserConfig());
parser.init();
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java
new file mode 100644
index 0000000..79a9b5d
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers;
+
+import org.apache.metron.common.error.MetronError;
+import org.json.simple.JSONObject;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Default implementation of ParserRunnerResults.
+ */
+public class DefaultParserRunnerResults implements ParserRunnerResults<JSONObject> {
+
+ private List<JSONObject> messages = new ArrayList<>();
+ private List<MetronError> errors = new ArrayList<>();
+
+ public List<JSONObject> getMessages() {
+ return messages;
+ }
+
+ public List<MetronError> getErrors() {
+ return errors;
+ }
+
+ public void addMessage(JSONObject message) {
+ this.messages.add(message);
+ }
+
+ public void addError(MetronError error) {
+ this.errors.add(error);
+ }
+
+ public void addErrors(List<MetronError> errors) {
+ this.errors.addAll(errors);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ParserRunnerResults parserResult = (ParserRunnerResults) o;
+ return Objects.equals(messages, parserResult.getMessages()) &&
+ Objects.equals(errors, parserResult.getErrors());
+ }
+
+ @Override
+ public int hashCode() {
+ int result = messages != null ? messages.hashCode() : 0;
+ result = 31 * result + (errors != null ? errors.hashCode() : 0);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
index a81149d..6bdfb81 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.metron.common.Constants;
import org.apache.metron.parsers.interfaces.MessageParser;
import org.apache.metron.parsers.interfaces.MessageParserResult;
-import org.apache.metron.parsers.interfaces.MultilineMessageParser;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,7 +53,7 @@ import java.util.Optional;
import java.util.TimeZone;
-public class GrokParser implements MultilineMessageParser<JSONObject>, Serializable {
+public class GrokParser implements MessageParser<JSONObject>, Serializable {
protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java
new file mode 100644
index 0000000..f9123b1
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers;
+
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.message.metadata.RawMessage;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.apache.metron.stellar.dsl.Context;
+
+import java.util.List;
+import java.util.Set;
+import java.util.function.Supplier;
+
+/**
+ * A ParserRunner is responsible for initializing MessageParsers and parsing messages with the appropriate MessageParser.
+ * The information needed to initialize a MessageParser is supplied by the parser config supplier. After the parsers
+ * are initialized, the execute method can then be called for each message and will return a ParserRunnerResults object
+ * that contains a list of parsed messages and/or a list of errors.
+ * @param <T> The type of a successfully parsed message.
+ */
+public interface ParserRunner<T> {
+
+ /**
+ * Return a list of all sensor types that can be parsed with this ParserRunner.
+ * @return Sensor types
+ */
+ Set<String> getSensorTypes();
+
+ /**
+ *
+ * @param parserConfigSupplier Supplies parser configurations
+ * @param stellarContext Stellar context used to apply Stellar functions during field transformations
+ */
+ void init(Supplier<ParserConfigurations> parserConfigSupplier, Context stellarContext);
+
+ /**
+ * Parses a message and either returns the message or an error.
+ * @param sensorType Sensor type of the message
+ * @param rawMessage Raw message including metadata
+ * @param parserConfigurations Parser configurations
+ * @return ParserRunnerResults containing a list of messages and a list of errors
+ */
+ ParserRunnerResults<T> execute(String sensorType, RawMessage rawMessage, ParserConfigurations parserConfigurations);
+
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java
new file mode 100644
index 0000000..a986db7
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.FieldTransformer;
+import org.apache.metron.common.configuration.FieldValidator;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.message.metadata.RawMessage;
+import org.apache.metron.common.utils.ReflectionUtils;
+import org.apache.metron.parsers.filters.Filters;
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.apache.metron.parsers.topology.ParserComponent;
+import org.apache.metron.stellar.dsl.Context;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * The default implemention of a ParserRunner.
+ */
+public class ParserRunnerImpl implements ParserRunner<JSONObject>, Serializable {
+
+ class ProcessResult {
+
+ private JSONObject message;
+ private MetronError error;
+
+ public ProcessResult(JSONObject message) {
+ this.message = message;
+ }
+
+ public ProcessResult(MetronError error) {
+ this.error = error;
+ }
+
+ public JSONObject getMessage() {
+ return message;
+ }
+
+ public MetronError getError() {
+ return error;
+ }
+
+ public boolean isError() {
+ return error != null;
+ }
+ }
+
+ protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ protected transient Consumer<ParserRunnerResults> onSuccess;
+ protected transient Consumer<MetronError> onError;
+
+ private HashSet<String> sensorTypes;
+ private Map<String, ParserComponent> sensorToParserComponentMap;
+
+ // Stellar variables
+ private transient Context stellarContext;
+
+ public ParserRunnerImpl(HashSet<String> sensorTypes) {
+ this.sensorTypes = sensorTypes;
+ }
+
+ public Map<String, ParserComponent> getSensorToParserComponentMap() {
+ return sensorToParserComponentMap;
+ }
+
+ public void setSensorToParserComponentMap(Map<String, ParserComponent> sensorToParserComponentMap) {
+ this.sensorToParserComponentMap = sensorToParserComponentMap;
+ }
+
+ public Context getStellarContext() {
+ return stellarContext;
+ }
+
+ @Override
+ public Set<String> getSensorTypes() {
+ return sensorTypes;
+ }
+
+ @Override
+ public void init(Supplier<ParserConfigurations> parserConfigSupplier, Context stellarContext) {
+ if (parserConfigSupplier == null) {
+ throw new IllegalStateException("A parser config supplier must be set before initializing the ParserRunner.");
+ }
+ if (stellarContext == null) {
+ throw new IllegalStateException("A stellar context must be set before initializing the ParserRunner.");
+ }
+ this.stellarContext = stellarContext;
+ initializeParsers(parserConfigSupplier);
+ }
+
+ /**
+ * Parses messages with the appropriate MessageParser based on sensor type. The resulting list of messages are then
+ * post-processed and added to the ParserRunnerResults message list. Any errors that happen during post-processing are
+ * added to the ParserRunnerResults error list. Any exceptions (including a master exception) thrown by the MessageParser
+ * are also added to the ParserRunnerResults error list.
+ *
+ * @param sensorType Sensor type of the message
+ * @param rawMessage Raw message including metadata
+ * @param parserConfigurations Parser configurations
+ * @return ParserRunnerResults containing a list of messages and a list of errors
+ */
+ @Override
+ public ParserRunnerResults<JSONObject> execute(String sensorType, RawMessage rawMessage, ParserConfigurations parserConfigurations) {
+ DefaultParserRunnerResults parserRunnerResults = new DefaultParserRunnerResults();
+ SensorParserConfig sensorParserConfig = parserConfigurations.getSensorParserConfig(sensorType);
+ if (sensorParserConfig != null) {
+ MessageParser<JSONObject> parser = sensorToParserComponentMap.get(sensorType).getMessageParser();
+ Optional<MessageParserResult<JSONObject>> optionalMessageParserResult = parser.parseOptionalResult(rawMessage.getMessage());
+ if (optionalMessageParserResult.isPresent()) {
+ MessageParserResult<JSONObject> messageParserResult = optionalMessageParserResult.get();
+
+ // Process each message returned from the MessageParser
+ messageParserResult.getMessages().forEach(message -> {
+ Optional<ProcessResult> processResult = processMessage(sensorType, message, rawMessage, parser, parserConfigurations);
+ if (processResult.isPresent()) {
+ if (processResult.get().isError()) {
+ parserRunnerResults.addError(processResult.get().getError());
+ } else {
+ parserRunnerResults.addMessage(processResult.get().getMessage());
+ }
+ }
+ });
+
+ // If a master exception is thrown by the MessageParser, wrap it with a MetronError and add it to the list of errors
+ messageParserResult.getMasterThrowable().ifPresent(throwable -> parserRunnerResults.addError(new MetronError()
+ .withErrorType(Constants.ErrorType.PARSER_ERROR)
+ .withThrowable(throwable)
+ .withSensorType(Collections.singleton(sensorType))
+ .addRawMessage(rawMessage.getMessage())));
+
+ // If exceptions are thrown by the MessageParser, wrap them with MetronErrors and add them to the list of errors
+ parserRunnerResults.addErrors(messageParserResult.getMessageThrowables().entrySet().stream().map(entry -> new MetronError()
+ .withErrorType(Constants.ErrorType.PARSER_ERROR)
+ .withThrowable(entry.getValue())
+ .withSensorType(Collections.singleton(sensorType))
+ .addRawMessage(entry.getKey())).collect(Collectors.toList()));
+ }
+ } else {
+ throw new IllegalStateException(String.format("Could not execute parser. Cannot find configuration for sensor %s.",
+ sensorType));
+ }
+ return parserRunnerResults;
+ }
+
+ /**
+ * Initializes MessageParsers and MessageFilters for sensor types configured in this ParserRunner. Objects are created
+ * using reflection and the MessageParser configure and init methods are called.
+ * @param parserConfigSupplier Parser configurations
+ */
+ private void initializeParsers(Supplier<ParserConfigurations> parserConfigSupplier) {
+ LOG.info("Initializing parsers...");
+ sensorToParserComponentMap = new HashMap<>();
+ for(String sensorType: sensorTypes) {
+ if (parserConfigSupplier.get().getSensorParserConfig(sensorType) == null) {
+ throw new IllegalStateException(String.format("Could not initialize parsers. Cannot find configuration for sensor %s.",
+ sensorType));
+ }
+
+ SensorParserConfig parserConfig = parserConfigSupplier.get().getSensorParserConfig(sensorType);
+
+ LOG.info("Creating parser for sensor {} with parser class = {} and filter class = {} ",
+ sensorType, parserConfig.getParserClassName(), parserConfig.getFilterClassName());
+
+ // create message parser
+ MessageParser<JSONObject> parser = ReflectionUtils
+ .createInstance(parserConfig.getParserClassName());
+
+ // create message filter
+ MessageFilter<JSONObject> filter = null;
+ parserConfig.getParserConfig().putIfAbsent("stellarContext", stellarContext);
+ if (!StringUtils.isEmpty(parserConfig.getFilterClassName())) {
+ filter = Filters.get(
+ parserConfig.getFilterClassName(),
+ parserConfig.getParserConfig()
+ );
+ }
+
+ parser.configure(parserConfig.getParserConfig());
+ parser.init();
+ sensorToParserComponentMap.put(sensorType, new ParserComponent(parser, filter));
+ }
+ }
+
+ /**
+ * Post-processes parsed messages by:
+ * <ul>
+ * <li>Applying field transformations defined in the sensor parser config</li>
+ * <li>Filtering messages using the configured MessageFilter class</li>
+ * <li>Validating messages using the MessageParser validate method</li>
+ * </ul>
+ * If a message is successfully processed a message is returned in a ProcessResult. If a message fails
+ * validation, a MetronError object is created and returned in a ProcessResult. If a message is
+ * filtered out an empty Optional is returned.
+ *
+ * @param sensorType Sensor type of the message
+ * @param message Message parsed by the MessageParser
+ * @param rawMessage Raw message including metadata
+ * @param parser MessageParser for the sensor type
+ * @param parserConfigurations Parser configurations
+ */
+ @SuppressWarnings("unchecked")
+ protected Optional<ProcessResult> processMessage(String sensorType, JSONObject message, RawMessage rawMessage,
+ MessageParser<JSONObject> parser,
+ ParserConfigurations parserConfigurations
+ ) {
+ Optional<ProcessResult> processResult = Optional.empty();
+ SensorParserConfig sensorParserConfig = parserConfigurations.getSensorParserConfig(sensorType);
+ sensorParserConfig.getRawMessageStrategy().mergeMetadata(
+ message,
+ rawMessage.getMetadata(),
+ sensorParserConfig.getMergeMetadata(),
+ sensorParserConfig.getRawMessageStrategyConfig()
+ );
+ message.put(Constants.SENSOR_TYPE, sensorType);
+ applyFieldTransformations(message, rawMessage, sensorParserConfig);
+ if (!message.containsKey(Constants.GUID)) {
+ message.put(Constants.GUID, UUID.randomUUID().toString());
+ }
+ MessageFilter<JSONObject> filter = sensorToParserComponentMap.get(sensorType).getFilter();
+ if (filter == null || filter.emit(message, stellarContext)) {
+ boolean isInvalid = !parser.validate(message);
+ List<FieldValidator> failedValidators = null;
+ if (!isInvalid) {
+ failedValidators = getFailedValidators(message, parserConfigurations);
+ isInvalid = !failedValidators.isEmpty();
+ }
+ if (isInvalid) {
+ MetronError error = new MetronError()
+ .withErrorType(Constants.ErrorType.PARSER_INVALID)
+ .withSensorType(Collections.singleton(sensorType))
+ .addRawMessage(message);
+ Set<String> errorFields = failedValidators == null ? null : failedValidators.stream()
+ .flatMap(fieldValidator -> fieldValidator.getInput().stream())
+ .collect(Collectors.toSet());
+ if (errorFields != null && !errorFields.isEmpty()) {
+ error.withErrorFields(errorFields);
+ }
+ processResult = Optional.of(new ProcessResult(error));
+ } else {
+ processResult = Optional.of(new ProcessResult(message));
+ }
+ }
+ return processResult;
+ }
+
+ /**
+ * Applies Stellar field transformations defined in the sensor parser config.
+ * @param message Message parsed by the MessageParser
+ * @param rawMessage Raw message including metadata
+ * @param sensorParserConfig Sensor parser config
+ */
+ private void applyFieldTransformations(JSONObject message, RawMessage rawMessage, SensorParserConfig sensorParserConfig) {
+ for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) {
+ if (handler != null) {
+ if (!sensorParserConfig.getMergeMetadata()) {
+ //if we haven't merged metadata, then we need to pass them along as configuration params.
+ handler.transformAndUpdate(
+ message,
+ stellarContext,
+ sensorParserConfig.getParserConfig(),
+ rawMessage.getMetadata()
+ );
+ } else {
+ handler.transformAndUpdate(
+ message,
+ stellarContext,
+ sensorParserConfig.getParserConfig()
+ );
+ }
+ }
+ }
+ }
+
+ private List<FieldValidator> getFailedValidators(JSONObject message, ParserConfigurations parserConfigurations) {
+ List<FieldValidator> fieldValidations = parserConfigurations.getFieldValidations();
+ List<FieldValidator> failedValidators = new ArrayList<>();
+ for(FieldValidator validator : fieldValidations) {
+ if(!validator.isValid(message, parserConfigurations.getGlobalConfig(), stellarContext)) {
+ failedValidators.add(validator);
+ }
+ }
+ return failedValidators;
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java
new file mode 100644
index 0000000..7ca853c
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers;
+
+import org.apache.metron.common.error.MetronError;
+
+import java.util.List;
+
+/**
+ * Container for the results of parsing a message with a ParserRunner.
+ * @param <T> The type of a successfully parsed message.
+ */
+public interface ParserRunnerResults<T> {
+
+ List<T> getMessages();
+
+ List<MetronError> getErrors();
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index 05334c2..a9ee305 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -18,13 +18,18 @@
package org.apache.metron.parsers.bolt;
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.Function;
import com.github.benmanes.caffeine.cache.Cache;
-import org.apache.commons.lang3.StringUtils;
import org.apache.metron.common.Constants;
import org.apache.metron.common.bolt.ConfiguredParserBolt;
-import org.apache.metron.common.configuration.FieldTransformer;
-import org.apache.metron.common.configuration.FieldValidator;
+import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.metron.common.error.MetronError;
@@ -33,12 +38,8 @@ import org.apache.metron.common.message.MessageGetters;
import org.apache.metron.common.message.metadata.RawMessage;
import org.apache.metron.common.message.metadata.RawMessageUtil;
import org.apache.metron.common.utils.ErrorUtils;
-import org.apache.metron.parsers.filters.Filters;
-import org.apache.metron.parsers.interfaces.MessageFilter;
-import org.apache.metron.parsers.interfaces.MessageParser;
-import org.apache.metron.parsers.interfaces.MessageParserResult;
-import org.apache.metron.parsers.interfaces.MultilineMessageParser;
-import org.apache.metron.parsers.topology.ParserComponents;
+import org.apache.metron.parsers.ParserRunner;
+import org.apache.metron.parsers.ParserRunnerResults;
import org.apache.metron.stellar.common.CachingStellarProcessor;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.StellarFunctions;
@@ -56,45 +57,32 @@ import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Serializable;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
public class ParserBolt extends ConfiguredParserBolt implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private OutputCollector collector;
- private Map<String, ParserComponents> sensorToComponentMap;
+ private ParserRunner<JSONObject> parserRunner;
+ private Map<String, WriterHandler> sensorToWriterMap;
private Map<String, String> topicToSensorMap = new HashMap<>();
- private Context stellarContext;
private transient MessageGetStrategy messageGetStrategy;
- private transient Cache<CachingStellarProcessor.Key, Object> cache;
private int requestedTickFreqSecs;
private int defaultBatchTimeout;
private int batchTimeoutDivisor = 1;
public ParserBolt( String zookeeperUrl
- , Map<String, ParserComponents> sensorToComponentMap
+ , ParserRunner parserRunner
+ , Map<String, WriterHandler> sensorToWriterMap
) {
super(zookeeperUrl);
- this.sensorToComponentMap = sensorToComponentMap;
+ this.parserRunner = parserRunner;
+ this.sensorToWriterMap = sensorToWriterMap;
// Ensure that all sensors are either bulk sensors or not bulk sensors. Can't mix and match.
Boolean handleAcks = null;
- for (Map.Entry<String, ParserComponents> entry : sensorToComponentMap.entrySet()) {
- boolean writerHandleAck = entry.getValue().getWriter().handleAck();
+ for (Map.Entry<String, WriterHandler> entry : sensorToWriterMap.entrySet()) {
+ boolean writerHandleAck = entry.getValue().handleAck();
if (handleAcks == null) {
handleAcks = writerHandleAck;
} else if (!handleAcks.equals(writerHandleAck)) {
@@ -130,21 +118,44 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
/**
* Used only for unit testing
- * @param defaultBatchTimeout
*/
- protected void setDefaultBatchTimeout(int defaultBatchTimeout) {
- this.defaultBatchTimeout = defaultBatchTimeout;
+ public int getBatchTimeoutDivisor() {
+ return batchTimeoutDivisor;
}
/**
* Used only for unit testing
*/
- public int getDefaultBatchTimeout() {
- return defaultBatchTimeout;
+ protected void setSensorToWriterMap(Map<String, WriterHandler> sensorToWriterMap) {
+ this.sensorToWriterMap = sensorToWriterMap;
}
- public Map<String, ParserComponents> getSensorToComponentMap() {
- return sensorToComponentMap;
+ /**
+ * Used only for unit testing
+ */
+ protected Map<String, String> getTopicToSensorMap() {
+ return topicToSensorMap;
+ }
+
+ /**
+ * Used only for unit testing
+ */
+ protected void setTopicToSensorMap(Map<String, String> topicToSensorMap) {
+ this.topicToSensorMap = topicToSensorMap;
+ }
+
+ /**
+ * Used only for unit testing
+ */
+ public void setMessageGetStrategy(MessageGetStrategy messageGetStrategy) {
+ this.messageGetStrategy = messageGetStrategy;
+ }
+
+ /**
+ * Used only for unit testing
+ */
+ public void setOutputCollector(OutputCollector collector) {
+ this.collector = collector;
}
/**
@@ -159,7 +170,7 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
// to get the valid WriterConfiguration. But don't store any non-serializable objects,
// else Storm will throw a runtime error.
Function<WriterConfiguration, WriterConfiguration> configurationXform;
- WriterHandler writer = sensorToComponentMap.entrySet().iterator().next().getValue().getWriter();
+ WriterHandler writer = sensorToWriterMap.entrySet().iterator().next().getValue();
if (writer.isWriterToBulkWriter()) {
configurationXform = WriterToBulkWriter.TRANSFORMATION;
} else {
@@ -189,37 +200,11 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
super.prepare(stormConf, context, collector);
messageGetStrategy = MessageGetters.DEFAULT_BYTES_FROM_POSITION.get();
this.collector = collector;
-
- // Build the Stellar cache
- Map<String, Object> cacheConfig = new HashMap<>();
- for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) {
- String sensor = entry.getKey();
- SensorParserConfig config = getSensorParserConfig(sensor);
-
- if (config != null) {
- cacheConfig.putAll(config.getCacheConfig());
- }
- }
- cache = CachingStellarProcessor.createCache(cacheConfig);
+ this.parserRunner.init(this::getConfigurations, initializeStellar());
// Need to prep all sensors
- for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) {
+ for (Map.Entry<String, WriterHandler> entry: sensorToWriterMap.entrySet()) {
String sensor = entry.getKey();
- MessageParser<JSONObject> parser = entry.getValue().getMessageParser();
-
- initializeStellar();
- if (getSensorParserConfig(sensor) != null && sensorToComponentMap.get(sensor).getFilter() == null) {
- getSensorParserConfig(sensor).getParserConfig().putIfAbsent("stellarContext", stellarContext);
- if (!StringUtils.isEmpty(getSensorParserConfig(sensor).getFilterClassName())) {
- MessageFilter<JSONObject> filter = Filters.get(
- getSensorParserConfig(sensor).getFilterClassName(),
- getSensorParserConfig(sensor).getParserConfig()
- );
- getSensorToComponentMap().get(sensor).setFilter(filter);
- }
- }
-
- parser.init();
SensorParserConfig config = getSensorParserConfig(sensor);
if (config != null) {
@@ -229,9 +214,8 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
throw new IllegalStateException(
"Unable to retrieve a parser config for " + sensor);
}
- parser.configure(config.getParserConfig());
- WriterHandler writer = sensorToComponentMap.get(sensor).getWriter();
+ WriterHandler writer = sensorToWriterMap.get(sensor);
writer.init(stormConf, context, collector, getConfigurations());
if (defaultBatchTimeout == 0) {
//This means getComponentConfiguration was never called to initialize defaultBatchTimeout,
@@ -246,225 +230,106 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
}
}
- protected void initializeStellar() {
- Context.Builder builder = new Context.Builder()
- .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
- .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig())
- .with(Context.Capabilities.STELLAR_CONFIG, () -> getConfigurations().getGlobalConfig())
- ;
- if(cache != null) {
- builder = builder.with(Context.Capabilities.CACHE, () -> cache);
- }
- this.stellarContext = builder.build();
- StellarFunctions.initialize(stellarContext);
- }
-
@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {
if (TupleUtils.isTick(tuple)) {
- try {
- for (Entry<String, ParserComponents> entry : sensorToComponentMap.entrySet()) {
- entry.getValue().getWriter().flush(getConfigurations(), messageGetStrategy);
- }
- } catch (Exception e) {
- throw new RuntimeException(
- "This should have been caught in the writerHandler. If you see this, file a JIRA", e);
- } finally {
- collector.ack(tuple);
- }
+ handleTickTuple(tuple);
return;
}
-
byte[] originalMessage = (byte[]) messageGetStrategy.get(tuple);
+ String topic = tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName());
+ String sensorType = topicToSensorMap.get(topic);
try {
- SensorParserConfig sensorParserConfig;
- MessageParser<JSONObject> parser;
- String sensor;
- Map<String, Object> metadata;
- if (sensorToComponentMap.size() == 1) {
- // There's only one parser, so grab info directly
- Entry<String, ParserComponents> sensorParser = sensorToComponentMap.entrySet().iterator()
- .next();
- sensor = sensorParser.getKey();
- parser = sensorParser.getValue().getMessageParser();
- sensorParserConfig = getSensorParserConfig(sensor);
- } else {
- // There's multiple parsers, so pull the topic from the Tuple and look up the sensor
- String topic = tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName());
- sensor = topicToSensorMap.get(topic);
- parser = sensorToComponentMap.get(sensor).getMessageParser();
- sensorParserConfig = getSensorParserConfig(sensor);
- }
+ ParserConfigurations parserConfigurations = getConfigurations();
+ SensorParserConfig sensorParserConfig = parserConfigurations.getSensorParserConfig(sensorType);
+ RawMessage rawMessage = RawMessageUtil.INSTANCE.getRawMessage( sensorParserConfig.getRawMessageStrategy()
+ , tuple
+ , originalMessage
+ , sensorParserConfig.getReadMetadata()
+ , sensorParserConfig.getRawMessageStrategyConfig()
+ );
+ ParserRunnerResults<JSONObject> parserRunnerResults = parserRunner.execute(sensorType, rawMessage, parserConfigurations);
+ long numWritten = parserRunnerResults.getMessages().stream()
+ .map(message -> handleMessage(sensorType, originalMessage, tuple, message, collector))
+ .filter(result -> result)
+ .count();
+ parserRunnerResults.getErrors().forEach(error -> ErrorUtils.handleError(collector, error));
- List<FieldValidator> fieldValidations = getConfigurations().getFieldValidations();
- boolean ackTuple = false;
- int numWritten = 0;
- if (sensorParserConfig != null) {
- RawMessage rawMessage = RawMessageUtil.INSTANCE.getRawMessage( sensorParserConfig.getRawMessageStrategy()
- , tuple
- , originalMessage
- , sensorParserConfig.getReadMetadata()
- , sensorParserConfig.getRawMessageStrategyConfig()
- );
-
- metadata = rawMessage.getMetadata();
-
- MultilineMessageParser mmp = null;
- if (!(parser instanceof MultilineMessageParser)) {
- mmp = new MultilineMessageParser<JSONObject>() {
-
- @Override
- public void configure(Map<String, Object> config) {
- parser.configure(config);
- }
-
- @Override
- public void init() {
- parser.init();
- }
-
- @Override
- public boolean validate(JSONObject message) {
- return parser.validate(message);
- }
-
- @Override
- public List<JSONObject> parse(byte[] message) {
- return parser.parse(message);
- }
-
- @Override
- public Optional<List<JSONObject>> parseOptional(byte[] message) {
- return parser.parseOptional(message);
- }
- };
- } else {
- mmp = (MultilineMessageParser) parser;
- }
-
- Optional<MessageParserResult<JSONObject>> results = mmp.parseOptionalResult(rawMessage.getMessage());
-
- // check if there is a master error
- if (results.isPresent() && results.get().getMasterThrowable().isPresent()) {
- handleError(originalMessage, tuple, results.get().getMasterThrowable().get(), collector);
- return;
- }
-
- // Handle the message results
- List<JSONObject> messages = results.isPresent() ? results.get().getMessages() : Collections.EMPTY_LIST;
- for (JSONObject message : messages) {
- //we want to ack the tuple in the situation where we have are not doing a bulk write
- //otherwise we want to defer to the writerComponent who will ack on bulk commit.
- WriterHandler writer = sensorToComponentMap.get(sensor).getWriter();
- ackTuple = !writer.handleAck();
-
- sensorParserConfig.getRawMessageStrategy().mergeMetadata(
- message,
- metadata,
- sensorParserConfig.getMergeMetadata(),
- sensorParserConfig.getRawMessageStrategyConfig()
- );
- message.put(Constants.SENSOR_TYPE, sensor);
-
- for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) {
- if (handler != null) {
- if (!sensorParserConfig.getMergeMetadata()) {
- //if we haven't merged metadata, then we need to pass them along as configuration params.
- handler.transformAndUpdate(
- message,
- stellarContext,
- sensorParserConfig.getParserConfig(),
- metadata
- );
- } else {
- handler.transformAndUpdate(
- message,
- stellarContext,
- sensorParserConfig.getParserConfig()
- );
- }
- }
- }
- if (!message.containsKey(Constants.GUID)) {
- message.put(Constants.GUID, UUID.randomUUID().toString());
- }
-
- MessageFilter<JSONObject> filter = sensorToComponentMap.get(sensor).getFilter();
- if (filter == null || filter.emitTuple(message, stellarContext)) {
- boolean isInvalid = !parser.validate(message);
- List<FieldValidator> failedValidators = null;
- if (!isInvalid) {
- failedValidators = getFailedValidators(message, fieldValidations);
- isInvalid = !failedValidators.isEmpty();
- }
- if (isInvalid) {
- MetronError error = new MetronError()
- .withErrorType(Constants.ErrorType.PARSER_INVALID)
- .withSensorType(Collections.singleton(sensor))
- .addRawMessage(message);
- Set<String> errorFields = failedValidators == null ? null : failedValidators.stream()
- .flatMap(fieldValidator -> fieldValidator.getInput().stream())
- .collect(Collectors.toSet());
- if (errorFields != null && !errorFields.isEmpty()) {
- error.withErrorFields(errorFields);
- }
- ErrorUtils.handleError(collector, error);
- } else {
- numWritten++;
- writer.write(sensor, tuple, message, getConfigurations(), messageGetStrategy);
- }
- }
- }
-
- // Handle the error results
- Map<Object, Throwable> messageErrors = results.isPresent()
- ? results.get().getMessageThrowables() : Collections.EMPTY_MAP;
-
- for (Entry<Object,Throwable> entry : messageErrors.entrySet()) {
- MetronError error = new MetronError()
- .withErrorType(Constants.ErrorType.PARSER_ERROR)
- .withThrowable(entry.getValue())
- .withSensorType(sensorToComponentMap.keySet())
- .addRawMessage(originalMessage);
- ErrorUtils.handleError(collector, error);
- }
- }
//if we are supposed to ack the tuple OR if we've never passed this tuple to the bulk writer
//(meaning that none of the messages are valid either globally or locally)
//then we want to handle the ack ourselves.
- if (ackTuple || numWritten == 0) {
+ if (!sensorToWriterMap.get(sensorType).handleAck() || numWritten == 0) {
collector.ack(tuple);
}
} catch (Throwable ex) {
- handleError(originalMessage, tuple, ex, collector);
+ handleError(sensorType, originalMessage, tuple, ex, collector);
+ collector.ack(tuple);
}
}
- protected void handleError(byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) {
+ protected Context initializeStellar() {
+ Map<String, Object> cacheConfig = new HashMap<>();
+ for (String sensorType: this.parserRunner.getSensorTypes()) {
+ SensorParserConfig config = getSensorParserConfig(sensorType);
+
+ if (config != null) {
+ cacheConfig.putAll(config.getCacheConfig());
+ }
+ }
+ Cache<CachingStellarProcessor.Key, Object> cache = CachingStellarProcessor.createCache(cacheConfig);
+
+ Context.Builder builder = new Context.Builder()
+ .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
+ .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig())
+ .with(Context.Capabilities.STELLAR_CONFIG, () -> getConfigurations().getGlobalConfig())
+ ;
+ if(cache != null) {
+ builder = builder.with(Context.Capabilities.CACHE, () -> cache);
+ }
+ Context stellarContext = builder.build();
+ StellarFunctions.initialize(stellarContext);
+ return stellarContext;
+ }
+
+ protected void handleTickTuple(Tuple tuple) {
+ try {
+ for (Entry<String, WriterHandler> entry : sensorToWriterMap.entrySet()) {
+ entry.getValue().flush(getConfigurations(), messageGetStrategy);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "This should have been caught in the writerHandler. If you see this, file a JIRA", e);
+ } finally {
+ collector.ack(tuple);
+ }
+ }
+
+ protected boolean handleMessage(String sensorType, byte[] originalMessage, Tuple tuple, JSONObject message, OutputCollector collector) {
+ WriterHandler writer = sensorToWriterMap.get(sensorType);
+ try {
+ writer.write(sensorType, tuple, message, getConfigurations(), messageGetStrategy);
+ return true;
+ } catch (Exception ex) {
+ handleError(sensorType, originalMessage, tuple, ex, collector);
+ return false;
+ }
+ }
+
+ protected void handleError(String sensorType, byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) {
MetronError error = new MetronError()
.withErrorType(Constants.ErrorType.PARSER_ERROR)
.withThrowable(ex)
- .withSensorType(sensorToComponentMap.keySet())
+ .withSensorType(Collections.singleton(sensorType))
.addRawMessage(originalMessage);
ErrorUtils.handleError(collector, error);
- collector.ack(tuple);
- }
-
- private List<FieldValidator> getFailedValidators(JSONObject input, List<FieldValidator> validators) {
- List<FieldValidator> failedValidators = new ArrayList<>();
- for(FieldValidator validator : validators) {
- if(!validator.isValid(input, getConfigurations().getGlobalConfig(), stellarContext)) {
- failedValidators.add(validator);
- }
- }
- return failedValidators;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(Constants.ERROR_STREAM, new Fields("message"));
}
+
}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
index 9cdafa3..1fa1feb 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
@@ -67,7 +67,7 @@ public class BroMessageFilter implements MessageFilter<JSONObject>{
*/
@Override
- public boolean emitTuple(JSONObject message, Context context) {
+ public boolean emit(JSONObject message, Context context) {
String protocol = (String) message.get(_key);
return _known_protocols.contains(protocol);
}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java
index 15a035a..8300ff4 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java
@@ -54,7 +54,7 @@ public class StellarFilter implements MessageFilter<JSONObject> {
}
@Override
- public boolean emitTuple(JSONObject message, Context context) {
+ public boolean emit(JSONObject message, Context context) {
VariableResolver resolver = new MapVariableResolver(message);
return processor.parse(query, resolver, functionResolver, context);
}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
index b7b91c0..207c070 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
@@ -21,5 +21,5 @@ import org.apache.metron.stellar.dsl.Context;
public interface MessageFilter<T> extends Configurable{
- boolean emitTuple(T message, Context context);
+ boolean emit(T message, Context context);
}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
index 665076b..c9f8351 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
@@ -17,6 +17,7 @@
*/
package org.apache.metron.parsers.interfaces;
+import org.apache.commons.lang3.NotImplementedException;
import org.apache.metron.parsers.DefaultMessageParserResult;
import java.io.Serializable;
@@ -38,18 +39,42 @@ public interface MessageParser<T> extends Configurable {
* @param rawMessage the raw bytes of the message
* @return If null is returned, this is treated as an empty list.
*/
- List<T> parse(byte[] rawMessage);
+ @Deprecated
+ default List<T> parse(byte[] rawMessage) {
+ throw new NotImplementedException("parse is not implemented");
+ }
/**
* Take raw data and convert it to an optional list of messages.
* @param parseMessage the raw bytes of the message
* @return If null is returned, this is treated as an empty list.
*/
+ @Deprecated
default Optional<List<T>> parseOptional(byte[] parseMessage) {
return Optional.ofNullable(parse(parseMessage));
}
/**
+ * Take raw data and convert it to messages. Each raw message may produce multiple messages and therefore
+ * multiple errors. A {@link MessageParserResult} is returned, which will have both the messages produced
+ * and the errors.
+ * @param parseMessage the raw bytes of the message
+ * @return Optional of {@link MessageParserResult}
+ */
+ default Optional<MessageParserResult<T>> parseOptionalResult(byte[] parseMessage) {
+ Optional<MessageParserResult<T>> result = Optional.empty();
+ try {
+ Optional<List<T>> optionalMessages = parseOptional(parseMessage);
+ if (optionalMessages.isPresent()) {
+ result = Optional.of(new DefaultMessageParserResult<>(optionalMessages.get()));
+ }
+ } catch (Throwable t) {
+ return Optional.of(new DefaultMessageParserResult<>(t));
+ }
+ return result;
+ }
+
+ /**
* Validate the message to ensure that it's correct.
* @param message the message to validate
* @return true if the message is valid, false if not
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java
deleted file mode 100644
index 7818f9a..0000000
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.parsers.interfaces;
-
-import org.apache.commons.lang3.NotImplementedException;
-import org.apache.metron.parsers.DefaultMessageParserResult;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-
-public interface MultilineMessageParser<T> extends MessageParser<T> {
-
- default List<T> parse(byte[] rawMessage) {
- throw new NotImplementedException("parse is not implemented");
- }
-
- /**
- * Take raw data and convert it to messages. Each raw message may produce multiple messages and therefore
- * multiple errors. A {@link MessageParserResult} is returned, which will have both the messages produced
- * and the errors.
- * @param parseMessage the raw bytes of the message
- * @return Optional of {@link MessageParserResult}
- */
- default Optional<MessageParserResult<T>> parseOptionalResult(byte[] parseMessage) {
- List<T> list = new ArrayList<>();
- try {
- Optional<List<T>> optionalMessages = parseOptional(parseMessage);
- optionalMessages.ifPresent(list::addAll);
- } catch (Throwable t) {
- return Optional.of(new DefaultMessageParserResult<>(t));
- }
- return Optional.of(new DefaultMessageParserResult<T>(list));
- }
-}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
index 79a082a..5b62e85 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
@@ -26,8 +26,8 @@ import com.github.palindromicity.syslog.dsl.SyslogFieldKeys;
import org.apache.commons.lang3.StringUtils;
import org.apache.metron.parsers.BasicParser;
import org.apache.metron.parsers.DefaultMessageParserResult;
+import org.apache.metron.parsers.interfaces.MessageParser;
import org.apache.metron.parsers.interfaces.MessageParserResult;
-import org.apache.metron.parsers.interfaces.MultilineMessageParser;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,7 +52,7 @@ import java.util.Optional;
/**
* Parser for well structured RFC 5424 messages.
*/
-public class Syslog5424Parser implements MultilineMessageParser<JSONObject>, Serializable {
+public class Syslog5424Parser implements MessageParser<JSONObject>, Serializable {
protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String NIL_POLICY_CONFIG = "nilPolicy";
private transient SyslogParser syslogParser;
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java
new file mode 100644
index 0000000..eb5ff9f
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers.topology;
+
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.json.simple.JSONObject;
+
+import java.io.Serializable;
+
+public class ParserComponent implements Serializable {
+ private static final long serialVersionUID = 7880346740026374665L;
+
+ private MessageParser<JSONObject> messageParser;
+ private MessageFilter<JSONObject> filter;
+
+ public ParserComponent(
+ MessageParser<JSONObject> messageParser,
+ MessageFilter<JSONObject> filter) {
+ this.messageParser = messageParser;
+ this.filter = filter;
+ }
+
+ public MessageParser<JSONObject> getMessageParser() {
+ return messageParser;
+ }
+
+ public MessageFilter<JSONObject> getFilter() {
+ return filter;
+ }
+
+ public void setMessageParser(
+ MessageParser<JSONObject> messageParser) {
+ this.messageParser = messageParser;
+ }
+
+ public void setFilter(
+ MessageFilter<JSONObject> filter) {
+ this.filter = filter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java
deleted file mode 100644
index 32d56b9..0000000
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.parsers.topology;
-
-import java.io.Serializable;
-import org.apache.metron.parsers.bolt.WriterHandler;
-import org.apache.metron.parsers.interfaces.MessageFilter;
-import org.apache.metron.parsers.interfaces.MessageParser;
-import org.json.simple.JSONObject;
-
-public class ParserComponents implements Serializable {
- private static final long serialVersionUID = 7880346740026374665L;
-
- private MessageParser<JSONObject> messageParser;
- private MessageFilter<JSONObject> filter;
- private WriterHandler writer;
-
- public ParserComponents(
- MessageParser<JSONObject> messageParser,
- MessageFilter<JSONObject> filter,
- WriterHandler writer) {
- this.messageParser = messageParser;
- this.filter = filter;
- this.writer = writer;
- }
-
- public MessageParser<JSONObject> getMessageParser() {
- return messageParser;
- }
-
- public MessageFilter<JSONObject> getFilter() {
- return filter;
- }
-
- public WriterHandler getWriter() {
- return writer;
- }
-
- public void setMessageParser(
- MessageParser<JSONObject> messageParser) {
- this.messageParser = messageParser;
- }
-
- public void setFilter(
- MessageFilter<JSONObject> filter) {
- this.filter = filter;
- }
-
- public void setWriter(WriterHandler writer) {
- this.writer = writer;
- }
-}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index d20e1a5..9dc7b88 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,11 +21,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
-import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.metron.common.Constants;
@@ -37,12 +37,10 @@ import org.apache.metron.common.utils.KafkaUtils;
import org.apache.metron.common.utils.ReflectionUtils;
import org.apache.metron.common.writer.BulkMessageWriter;
import org.apache.metron.common.writer.MessageWriter;
+import org.apache.metron.parsers.ParserRunnerImpl;
import org.apache.metron.parsers.bolt.ParserBolt;
import org.apache.metron.parsers.bolt.WriterBolt;
import org.apache.metron.parsers.bolt.WriterHandler;
-import org.apache.metron.parsers.filters.Filters;
-import org.apache.metron.parsers.interfaces.MessageFilter;
-import org.apache.metron.parsers.interfaces.MessageParser;
import org.apache.metron.parsers.topology.config.ValueSupplier;
import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder;
import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
@@ -268,29 +266,14 @@ public class ParserTopologyBuilder {
Optional<String> securityProtocol,
ParserConfigurations configs,
Optional<String> outputTopic) {
-
- Map<String, ParserComponents> parserBoltConfigs = new HashMap<>();
+ Map<String, WriterHandler> writerConfigs = new HashMap<>();
for( Entry<String, SensorParserConfig> entry : sensorTypeToParserConfig.entrySet()) {
String sensorType = entry.getKey();
SensorParserConfig parserConfig = entry.getValue();
- // create message parser
- MessageParser<JSONObject> parser = ReflectionUtils
- .createInstance(parserConfig.getParserClassName());
- parser.configure(parserConfig.getParserConfig());
-
- // create message filter
- MessageFilter<JSONObject> filter = null;
- if (!StringUtils.isEmpty(parserConfig.getFilterClassName())) {
- filter = Filters.get(
- parserConfig.getFilterClassName(),
- parserConfig.getParserConfig()
- );
- }
// create a writer
AbstractWriter writer;
if (parserConfig.getWriterClassName() == null) {
-
// if not configured, use a sensible default
writer = createKafkaWriter(brokerUrl, zookeeperUrl, securityProtocol)
.withTopic(outputTopic.orElse(Constants.ENRICHMENT_TOPIC));
@@ -304,16 +287,10 @@ public class ParserTopologyBuilder {
// create a writer handler
WriterHandler writerHandler = createWriterHandler(writer);
-
- ParserComponents components = new ParserComponents(
- parser,
- filter,
- writerHandler
- );
- parserBoltConfigs.put(sensorType, components);
+ writerConfigs.put(sensorType, writerHandler);
}
- return new ParserBolt(zookeeperUrl, parserBoltConfigs);
+ return new ParserBolt(zookeeperUrl, new ParserRunnerImpl(new HashSet<>(sensorTypeToParserConfig.keySet())), writerConfigs);
}
/**
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
index 8441409..2f3784a 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
@@ -42,8 +42,8 @@ public class FiltersTest {
put("filter.query", "exists(foo)");
}};
MessageFilter<JSONObject> filter = Filters.get(Filters.STELLAR.name(), config);
- Assert.assertTrue(filter.emitTuple(new JSONObject(ImmutableMap.of("foo", 1)), Context.EMPTY_CONTEXT()));
- Assert.assertFalse(filter.emitTuple(new JSONObject(ImmutableMap.of("bar", 1)), Context.EMPTY_CONTEXT()));
+ Assert.assertTrue(filter.emit(new JSONObject(ImmutableMap.of("foo", 1)), Context.EMPTY_CONTEXT()));
+ Assert.assertFalse(filter.emit(new JSONObject(ImmutableMap.of("bar", 1)), Context.EMPTY_CONTEXT()));
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java
index 9769baa..4842a1f 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java
@@ -19,68 +19,118 @@
package org.apache.metron.parsers;
import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.json.simple.JSONObject;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class MessageParserTest {
- @Test
- public void testNullable() throws Exception {
- MessageParser parser = new MessageParser() {
- @Override
- public void init() {
- }
+ abstract class TestMessageParser implements MessageParser<JSONObject> {
+ @Override
+ public void init() {
+ }
- @Override
- public List parse(byte[] rawMessage) {
- return null;
- }
+ @Override
+ public boolean validate(JSONObject message) {
+ return false;
+ }
- @Override
- public boolean validate(Object message) {
- return false;
- }
+ @Override
+ public void configure(Map<String, Object> config) {
- @Override
- public void configure(Map<String, Object> config) {
+ }
+ }
+ @Test
+ public void testNullable() throws Exception {
+ MessageParser parser = new TestMessageParser() {
+ @Override
+ public List<JSONObject> parse(byte[] rawMessage) {
+ return null;
}
};
- Assert.assertNotNull(parser.parseOptional(null));
- Assert.assertFalse(parser.parseOptional(null).isPresent());
+ Assert.assertNotNull(parser.parseOptionalResult(null));
+ Assert.assertFalse(parser.parseOptionalResult(null).isPresent());
}
@Test
public void testNotNullable() throws Exception {
- MessageParser parser = new MessageParser() {
+ MessageParser<JSONObject> parser = new TestMessageParser() {
@Override
- public void init() {
-
+ public List<JSONObject> parse(byte[] rawMessage) {
+ return new ArrayList<>();
}
+ };
+ Assert.assertNotNull(parser.parseOptionalResult(null));
+ Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult(null);
+ Assert.assertTrue(ret.isPresent());
+ Assert.assertEquals(0, ret.get().getMessages().size());
+ }
+ @Test
+ public void testParse() {
+ JSONObject message = new JSONObject();
+ MessageParser<JSONObject> parser = new TestMessageParser() {
@Override
- public List parse(byte[] rawMessage) {
- return new ArrayList<>();
+ public List<JSONObject> parse(byte[] rawMessage) {
+ return Collections.singletonList(message);
}
+ };
+ Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult("message".getBytes());
+ Assert.assertTrue(ret.isPresent());
+ Assert.assertEquals(1, ret.get().getMessages().size());
+ Assert.assertEquals(message, ret.get().getMessages().get(0));
+ }
+ @Test
+ public void testParseOptional() {
+ JSONObject message = new JSONObject();
+ MessageParser<JSONObject> parser = new TestMessageParser() {
@Override
- public boolean validate(Object message) {
- return false;
+ public Optional<List<JSONObject>> parseOptional(byte[] rawMessage) {
+ return Optional.of(Collections.singletonList(message));
}
+ };
+ Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult("message".getBytes());
+ Assert.assertTrue(ret.isPresent());
+ Assert.assertEquals(1, ret.get().getMessages().size());
+ Assert.assertEquals(message, ret.get().getMessages().get(0));
+ }
+ @Test
+ public void testParseException() {
+ MessageParser<JSONObject> parser = new TestMessageParser() {
@Override
- public void configure(Map<String, Object> config) {
+ public List<JSONObject> parse(byte[] rawMessage) {
+ throw new RuntimeException("parse exception");
+ }
+ };
+ Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult("message".getBytes());
+ Assert.assertTrue(ret.isPresent());
+ Assert.assertTrue(ret.get().getMasterThrowable().isPresent());
+ Assert.assertEquals("parse exception", ret.get().getMasterThrowable().get().getMessage());
+ }
+ @Test
+ public void testParseOptionalException() {
+ MessageParser<JSONObject> parser = new TestMessageParser() {
+ @Override
+ public Optional<List<JSONObject>> parseOptional(byte[] rawMessage) {
+ throw new RuntimeException("parse exception");
}
};
- Assert.assertNotNull(parser.parseOptional(null));
- Optional<List> ret = parser.parseOptional(null);
+ Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult("message".getBytes());
Assert.assertTrue(ret.isPresent());
- Assert.assertEquals(0, ret.get().size());
+ Assert.assertTrue(ret.get().getMasterThrowable().isPresent());
+ Assert.assertEquals("parse exception", ret.get().getMasterThrowable().get().getMessage());
}
+
}