You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2018/10/18 19:00:06 UTC
[1/2] metron git commit: METRON-1681 Decouple the ParserBolt from the
Parse execution logic (merrimanr) closes apache/metron#1213
Repository: metron
Updated Branches:
refs/heads/master 08f3de0fe -> 28542ad64
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/ParserRunnerImplTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/ParserRunnerImplTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/ParserRunnerImplTest.java
new file mode 100644
index 0000000..5f05b24
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/ParserRunnerImplTest.java
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.message.metadata.RawMessage;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.utils.ReflectionUtils;
+import org.apache.metron.parsers.filters.Filters;
+import org.apache.metron.parsers.filters.StellarFilter;
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.apache.metron.parsers.topology.ParserComponent;
+import org.apache.metron.parsers.ParserRunnerImpl.ProcessResult;
+import org.apache.metron.stellar.dsl.Context;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ParserRunnerImpl.class, ReflectionUtils.class, Filters.class})
+public class ParserRunnerImplTest {
+
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+
+ /**
+ {
+ "fieldValidations" : [
+ {
+ "input" : [ "ip_src_addr", "ip_dst_addr"],
+ "validation" : "IP"
+ }
+ ]
+ }
+ */
+ @Multiline
+ private String globalConfigString;
+
+ /**
+ {
+ "parserClassName":"org.apache.metron.parsers.bro.BasicBroParser",
+ "filterClassName":"org.apache.metron.parsers.filters.StellarFilter",
+ "sensorTopic":"bro",
+ "parserConfig": {
+ "field": "value"
+ },
+ "fieldTransformations" : [
+ {
+ "input" : "field1",
+ "transformation" : "REMOVE"
+ }
+ ]
+ }
+ */
+ @Multiline
+ private String broConfigString;
+
+ /**
+ {
+ "parserClassName":"org.apache.metron.parsers.snort.BasicSnortParser",
+ "sensorTopic":"snort",
+ "parserConfig": {}
+ }
+ */
+ @Multiline
+ private String snortConfigString;
+
+ private ParserConfigurations parserConfigurations;
+ private MessageParser<JSONObject> broParser;
+ private MessageParser<JSONObject> snortParser;
+ private MessageFilter<JSONObject> stellarFilter;
+ private ParserRunnerImpl parserRunner;
+
+
+ @Before
+ public void setup() throws IOException {
+ parserConfigurations = new ParserConfigurations();
+ SensorParserConfig broConfig = SensorParserConfig.fromBytes(broConfigString.getBytes());
+ SensorParserConfig snortConfig = SensorParserConfig.fromBytes(snortConfigString.getBytes());
+ parserConfigurations.updateSensorParserConfig("bro", broConfig);
+ parserConfigurations.updateSensorParserConfig("snort", snortConfig);
+ parserConfigurations.updateGlobalConfig(JSONUtils.INSTANCE.load(globalConfigString, JSONUtils.MAP_SUPPLIER));
+ parserRunner = new ParserRunnerImpl(new HashSet<>(Arrays.asList("bro", "snort")));
+ broParser = mock(MessageParser.class);
+ snortParser = mock(MessageParser.class);
+ stellarFilter = mock(StellarFilter.class);
+ mockStatic(ReflectionUtils.class);
+ mockStatic(Filters.class);
+
+ when(ReflectionUtils.createInstance("org.apache.metron.parsers.bro.BasicBroParser")).thenReturn(broParser);
+ when(ReflectionUtils.createInstance("org.apache.metron.parsers.snort.BasicSnortParser")).thenReturn(snortParser);
+ when(Filters.get("org.apache.metron.parsers.filters.StellarFilter", broConfig.getParserConfig()))
+ .thenReturn(stellarFilter);
+
+ }
+
+ @Test
+ public void shouldThrowExceptionOnEmptyParserSupplier() {
+ exception.expect(IllegalStateException.class);
+ exception.expectMessage("A parser config supplier must be set before initializing the ParserRunner.");
+
+ parserRunner.init(null, null);
+ }
+
+ @Test
+ public void shouldThrowExceptionOnEmptyStellarContext() {
+ exception.expect(IllegalStateException.class);
+ exception.expectMessage("A stellar context must be set before initializing the ParserRunner.");
+
+ parserRunner.init(() -> parserConfigurations, null);
+ }
+
+ @Test
+ public void initShouldThrowExceptionOnMissingSensorParserConfig() {
+ exception.expect(IllegalStateException.class);
+ exception.expectMessage("Could not initialize parsers. Cannot find configuration for sensor test.");
+
+ parserRunner = new ParserRunnerImpl(new HashSet<String>() {{
+ add("test");
+ }});
+
+ parserRunner.init(() -> parserConfigurations, mock(Context.class));
+ }
+
+ @Test
+ public void executeShouldThrowExceptionOnMissingSensorParserConfig() {
+ exception.expect(IllegalStateException.class);
+ exception.expectMessage("Could not execute parser. Cannot find configuration for sensor test.");
+
+ parserRunner = new ParserRunnerImpl(new HashSet<String>() {{
+ add("test");
+ }});
+
+ parserRunner.execute("test", mock(RawMessage.class), parserConfigurations);
+ }
+
+ @Test
+ public void shouldInit() throws Exception {
+ Context stellarContext = mock(Context.class);
+ Map<String, Object> broParserConfig = parserConfigurations.getSensorParserConfig("bro").getParserConfig();
+ Map<String, Object> snortParserConfig = parserConfigurations.getSensorParserConfig("snort").getParserConfig();
+
+ parserRunner.init(() -> parserConfigurations, stellarContext);
+
+ {
+ // Verify Stellar context
+ Assert.assertEquals(stellarContext, parserRunner.getStellarContext());
+ }
+
+ Map<String, ParserComponent> sensorToParserComponentMap = parserRunner.getSensorToParserComponentMap();
+ {
+ // Verify Bro parser initialization
+ Assert.assertEquals(2, sensorToParserComponentMap.size());
+ ParserComponent broComponent = sensorToParserComponentMap.get("bro");
+ Assert.assertEquals(broParser, broComponent.getMessageParser());
+ Assert.assertEquals(stellarFilter, broComponent.getFilter());
+ verify(broParser, times(1)).init();
+ verify(broParser, times(1)).configure(broParserConfig);
+ verifyNoMoreInteractions(broParser);
+ verifyNoMoreInteractions(stellarFilter);
+ }
+ {
+ // Verify Snort parser initialization
+ ParserComponent snortComponent = sensorToParserComponentMap.get("snort");
+ Assert.assertEquals(snortParser, snortComponent.getMessageParser());
+ Assert.assertNull(snortComponent.getFilter());
+ verify(snortParser, times(1)).init();
+ verify(snortParser, times(1)).configure(snortParserConfig);
+ verifyNoMoreInteractions(snortParser);
+ }
+ }
+
+ @Test
+ public void shouldExecute() {
+ parserRunner = spy(parserRunner);
+ RawMessage rawMessage = new RawMessage("raw_message".getBytes(), new HashMap<>());
+ JSONObject parsedMessage1 = new JSONObject();
+ parsedMessage1.put("field", "parsedMessage1");
+ JSONObject parsedMessage2 = new JSONObject();
+ parsedMessage2.put("field", "parsedMessage2");
+ Object rawMessage1 = new RawMessage("raw_message1".getBytes(), new HashMap<>());
+ Object rawMessage2 = new RawMessage("raw_message2".getBytes(), new HashMap<>());
+ Throwable throwable1 = mock(Throwable.class);
+ Throwable throwable2 = mock(Throwable.class);
+ MessageParserResult<JSONObject> messageParserResult = new DefaultMessageParserResult<>(Arrays.asList(parsedMessage1, parsedMessage2),
+ new HashMap<Object, Throwable>(){{
+ put(rawMessage1, throwable1);
+ put(rawMessage2, throwable2);
+ }});
+ JSONObject processedMessage = new JSONObject();
+ processedMessage.put("field", "processedMessage1");
+ MetronError processedError = new MetronError().withMessage("processedError");
+ ProcessResult processedMessageResult = mock(ProcessResult.class);
+ ProcessResult processedErrorResult = mock(ProcessResult.class);
+
+ when(broParser.parseOptionalResult(rawMessage.getMessage())).thenReturn(Optional.of(messageParserResult));
+ when(processedMessageResult.getMessage()).thenReturn(processedMessage);
+ when(processedErrorResult.isError()).thenReturn(true);
+ when(processedErrorResult.getError()).thenReturn(processedError);
+ doReturn(Optional.of(processedMessageResult)).when(parserRunner)
+ .processMessage("bro", parsedMessage1, rawMessage, broParser, parserConfigurations);
+ doReturn(Optional.of(processedErrorResult)).when(parserRunner)
+ .processMessage("bro", parsedMessage2, rawMessage, broParser, parserConfigurations);
+
+ MetronError expectedParseError1 = new MetronError()
+ .withErrorType(Constants.ErrorType.PARSER_ERROR)
+ .withThrowable(throwable1)
+ .withSensorType(Collections.singleton("bro"))
+ .addRawMessage(rawMessage1);
+ MetronError expectedParseError2 = new MetronError()
+ .withErrorType(Constants.ErrorType.PARSER_ERROR)
+ .withThrowable(throwable2)
+ .withSensorType(Collections.singleton("bro"))
+ .addRawMessage(rawMessage2);
+
+ parserRunner.setSensorToParserComponentMap(new HashMap<String, ParserComponent>() {{
+ put("bro", new ParserComponent(broParser, stellarFilter));
+ }});
+ ParserRunnerResults<JSONObject> parserRunnerResults = parserRunner.execute("bro", rawMessage, parserConfigurations);
+
+ Assert.assertEquals(1, parserRunnerResults.getMessages().size());
+ Assert.assertTrue(parserRunnerResults.getMessages().contains(processedMessage));
+ Assert.assertEquals(3, parserRunnerResults.getErrors().size());
+ Assert.assertTrue(parserRunnerResults.getErrors().contains(processedError));
+ Assert.assertTrue(parserRunnerResults.getErrors().contains(expectedParseError1));
+ Assert.assertTrue(parserRunnerResults.getErrors().contains(expectedParseError2));
+ }
+
+ @Test
+ public void shouldExecuteWithMasterThrowable() {
+ parserRunner = spy(parserRunner);
+ RawMessage rawMessage = new RawMessage("raw_message".getBytes(), new HashMap<>());
+ Throwable masterThrowable = mock(Throwable.class);
+ MessageParserResult<JSONObject> messageParserResult = new DefaultMessageParserResult<>(masterThrowable);
+
+
+ when(broParser.parseOptionalResult(rawMessage.getMessage())).thenReturn(Optional.of(messageParserResult));
+
+ parserRunner.setSensorToParserComponentMap(new HashMap<String, ParserComponent>() {{
+ put("bro", new ParserComponent(broParser, stellarFilter));
+ }});
+ ParserRunnerResults<JSONObject> parserRunnerResults = parserRunner.execute("bro", rawMessage, parserConfigurations);
+
+ verify(parserRunner, times(0))
+ .processMessage(any(), any(), any(), any(), any());
+
+ MetronError expectedError = new MetronError()
+ .withErrorType(Constants.ErrorType.PARSER_ERROR)
+ .withThrowable(masterThrowable)
+ .withSensorType(Collections.singleton("bro"))
+ .addRawMessage(rawMessage.getMessage());
+ Assert.assertEquals(1, parserRunnerResults.getErrors().size());
+ Assert.assertTrue(parserRunnerResults.getErrors().contains(expectedError));
+ }
+
+ @Test
+ public void shouldPopulateMessagesOnProcessMessage() {
+ JSONObject inputMessage = new JSONObject();
+ inputMessage.put("guid", "guid");
+ inputMessage.put("ip_src_addr", "192.168.1.1");
+ inputMessage.put("ip_dst_addr", "192.168.1.2");
+ inputMessage.put("field1", "value");
+ RawMessage rawMessage = new RawMessage("raw_message".getBytes(), new HashMap<>());
+
+ JSONObject expectedOutput = new JSONObject();
+ expectedOutput.put("guid", "guid");
+ expectedOutput.put("source.type", "bro");
+ expectedOutput.put("ip_src_addr", "192.168.1.1");
+ expectedOutput.put("ip_dst_addr", "192.168.1.2");
+
+ when(stellarFilter.emit(expectedOutput, parserRunner.getStellarContext())).thenReturn(true);
+ when(broParser.validate(expectedOutput)).thenReturn(true);
+
+ parserRunner.setSensorToParserComponentMap(new HashMap<String, ParserComponent>() {{
+ put("bro", new ParserComponent(broParser, stellarFilter));
+ }});
+
+ Optional<ParserRunnerImpl.ProcessResult> processResult = parserRunner.processMessage("bro", inputMessage, rawMessage, broParser, parserConfigurations);
+
+ Assert.assertTrue(processResult.isPresent());
+ Assert.assertFalse(processResult.get().isError());
+ Assert.assertEquals(expectedOutput, processResult.get().getMessage());
+ }
+
+ @Test
+ public void shouldReturnMetronErrorOnInvalidMessage() {
+ JSONObject inputMessage = new JSONObject();
+ inputMessage.put("guid", "guid");
+ RawMessage rawMessage = new RawMessage("raw_message".getBytes(), new HashMap<>());
+
+ JSONObject expectedOutput = new JSONObject();
+ expectedOutput.put("guid", "guid");
+ expectedOutput.put("source.type", "bro");
+ MetronError expectedMetronError = new MetronError()
+ .withErrorType(Constants.ErrorType.PARSER_INVALID)
+ .withSensorType(Collections.singleton("bro"))
+ .addRawMessage(inputMessage);
+
+ when(stellarFilter.emit(expectedOutput, parserRunner.getStellarContext())).thenReturn(true);
+ when(broParser.validate(expectedOutput)).thenReturn(false);
+
+ parserRunner.setSensorToParserComponentMap(new HashMap<String, ParserComponent>() {{
+ put("bro", new ParserComponent(broParser, stellarFilter));
+ }});
+
+ Optional<ParserRunnerImpl.ProcessResult> processResult = parserRunner.processMessage("bro", inputMessage, rawMessage, broParser, parserConfigurations);
+
+ Assert.assertTrue(processResult.isPresent());
+ Assert.assertTrue(processResult.get().isError());
+ Assert.assertEquals(expectedMetronError, processResult.get().getError());
+ }
+
+ @Test
+ public void shouldReturnMetronErrorOnFailedFieldValidator() {
+ JSONObject inputMessage = new JSONObject();
+ inputMessage.put("guid", "guid");
+ inputMessage.put("ip_src_addr", "test");
+ inputMessage.put("ip_dst_addr", "test");
+ RawMessage rawMessage = new RawMessage("raw_message".getBytes(), new HashMap<>());
+
+ JSONObject expectedOutput = new JSONObject();
+ expectedOutput.put("guid", "guid");
+ expectedOutput.put("ip_src_addr", "test");
+ expectedOutput.put("ip_dst_addr", "test");
+ expectedOutput.put("source.type", "bro");
+ MetronError expectedMetronError = new MetronError()
+ .withErrorType(Constants.ErrorType.PARSER_INVALID)
+ .withSensorType(Collections.singleton("bro"))
+ .addRawMessage(inputMessage)
+ .withErrorFields(new HashSet<>(Arrays.asList("ip_src_addr", "ip_dst_addr")));
+
+ when(stellarFilter.emit(expectedOutput, parserRunner.getStellarContext())).thenReturn(true);
+ when(broParser.validate(expectedOutput)).thenReturn(true);
+
+ parserRunner.setSensorToParserComponentMap(new HashMap<String, ParserComponent>() {{
+ put("bro", new ParserComponent(broParser, stellarFilter));
+ }});
+
+ Optional<ParserRunnerImpl.ProcessResult> processResult = parserRunner.processMessage("bro", inputMessage, rawMessage, broParser, parserConfigurations);
+
+ Assert.assertTrue(processResult.isPresent());
+ Assert.assertTrue(processResult.get().isError());
+ Assert.assertEquals(expectedMetronError, processResult.get().getError());
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index e5e7180..9f58d1c 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -17,719 +17,428 @@
*/
package org.apache.metron.parsers.bolt;
-import com.google.common.collect.ImmutableList;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.metron.common.Constants;
-import org.apache.metron.common.configuration.ConfigurationType;
import org.apache.metron.common.configuration.IndexingConfigurations;
import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.SensorParserConfig;
-import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.metron.common.error.MetronError;
-import org.apache.metron.common.writer.BulkMessageWriter;
-import org.apache.metron.common.writer.BulkWriterResponse;
-import org.apache.metron.common.writer.MessageWriter;
-import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
-import org.apache.metron.parsers.DefaultMessageParserResult;
-import org.apache.metron.parsers.interfaces.MessageFilter;
-import org.apache.metron.parsers.interfaces.MessageParserResult;
-import org.apache.metron.parsers.interfaces.MultilineMessageParser;
-import org.apache.metron.parsers.topology.ParserComponents;
+import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.common.message.metadata.RawMessage;
+import org.apache.metron.parsers.DefaultParserRunnerResults;
+import org.apache.metron.parsers.ParserRunnerResults;
+import org.apache.metron.parsers.ParserRunnerImpl;
import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration;
import org.apache.metron.test.bolt.BaseBoltTest;
import org.apache.metron.test.error.MetronErrorJSONMatcher;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
+import org.apache.storm.Config;
import org.apache.storm.tuple.Tuple;
import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.mockito.Mock;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import java.util.function.Supplier;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ParserBoltTest extends BaseBoltTest {
- @Mock
- private MultilineMessageParser<JSONObject> parser;
-
- @Mock
- private MessageWriter<JSONObject> writer;
-
- @Mock
- private BulkMessageWriter<JSONObject> batchWriter;
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
@Mock
- private MessageFilter<JSONObject> filter;
+ private Tuple t1;
@Mock
- private Tuple t1;
+ private ParserRunnerImpl parserRunner;
@Mock
- private Tuple t2;
+ private WriterHandler writerHandler;
@Mock
- private Tuple t3;
+ private WriterHandler writerHandlerHandleAck;
@Mock
- private Tuple t4;
+ private MessageGetStrategy messageGetStrategy;
@Mock
- private Tuple t5;
+ private Context stellarContext;
- private static class RecordingWriter implements BulkMessageWriter<JSONObject> {
- List<JSONObject> records = new ArrayList<>();
+ private class MockParserRunner extends ParserRunnerImpl {
- @Override
- public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration config) throws Exception {
+ private boolean isInvalid = false;
+ private RawMessage rawMessage;
+ private JSONObject message;
+ public MockParserRunner(HashSet<String> sensorTypes) {
+ super(sensorTypes);
}
@Override
- public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
- records.addAll(messages);
- BulkWriterResponse ret = new BulkWriterResponse();
- ret.addAllSuccesses(tuples);
- return ret;
+ public ParserRunnerResults<JSONObject> execute(String sensorType, RawMessage rawMessage, ParserConfigurations parserConfigurations) {
+ DefaultParserRunnerResults parserRunnerResults = new DefaultParserRunnerResults();
+ this.rawMessage = rawMessage;
+ if (!isInvalid) {
+ parserRunnerResults.addMessage(message);
+ } else {
+ MetronError error = new MetronError()
+ .withErrorType(Constants.ErrorType.PARSER_INVALID)
+ .withSensorType(Collections.singleton(sensorType))
+ .addRawMessage(message);
+ parserRunnerResults.addError(error);
+ }
+ return parserRunnerResults;
}
- @Override
- public String getName() {
- return "recording";
+ protected void setInvalid(boolean isInvalid) {
+ this.isInvalid = isInvalid;
}
- @Override
- public void close() throws Exception {
-
+ protected void setMessage(JSONObject message) {
+ this.message = message;
}
- public List<JSONObject> getRecords() {
- return records;
+ protected RawMessage getRawMessage() {
+ return rawMessage;
}
}
- private static ConfigurationsUpdater<ParserConfigurations> createUpdater() {
- return createUpdater(Optional.empty());
- }
- private static ConfigurationsUpdater<ParserConfigurations> createUpdater(Optional<Integer> batchSize) {
- return new ConfigurationsUpdater<ParserConfigurations>(null, null) {
- @Override
- public void update(CuratorFramework client, String path, byte[] data) throws IOException { }
+ @Before
+ public void setup() {
+ when(writerHandler.handleAck()).thenReturn(false);
+ when(writerHandlerHandleAck.handleAck()).thenReturn(true);
- @Override
- public void delete(CuratorFramework client, String path, byte[] data) throws IOException { }
-
- @Override
- public ConfigurationType getType() {
- return ConfigurationType.PARSER;
- }
-
- @Override
- public void update(String name, byte[] data) throws IOException { }
-
- @Override
- public void delete(String name) { }
-
- @Override
- public Class<ParserConfigurations> getConfigurationClass() {
- return ParserConfigurations.class;
- }
-
- @Override
- public void forceUpdate(CuratorFramework client) { }
-
- @Override
- public ParserConfigurations defaultConfigurations() {
- return new ParserConfigurations() {
- @Override
- public SensorParserConfig getSensorParserConfig(String sensorType) {
- return new SensorParserConfig() {
- @Override
- public Map<String, Object> getParserConfig() {
- return new HashMap<String, Object>() {{
- if(batchSize.isPresent()) {
- put(IndexingConfigurations.BATCH_SIZE_CONF, batchSize.get());
- }
- }};
- }
- };
- }
- };
- }
- };
}
-
@Test
- public void testEmpty() throws Exception {
- String sensorType = "yaf";
- Map<String, ParserComponents> parserMap = Collections.singletonMap(
- sensorType,
- new ParserComponents(
- parser,
- null,
- new WriterHandler(writer)
- )
- );
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
- @Override
- protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
- return ParserBoltTest.createUpdater();
- }
- };
-
- parserBolt.setCuratorFramework(client);
- parserBolt.setZKCache(cache);
- parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
- verify(parser, times(1)).init();
- verify(writer, times(1)).init();
- byte[] sampleBinary = "some binary message".getBytes();
-
- when(tuple.getBinary(0)).thenReturn(sampleBinary);
- when(parser.parseOptionalResult(sampleBinary)).thenReturn(null);
- parserBolt.execute(tuple);
- verify(parser, times(0)).validate(any());
- verify(writer, times(0)).write(eq(sensorType), any(ParserWriterConfiguration.class), eq(tuple), any());
- verify(outputCollector, times(1)).ack(tuple);
-
- MetronError error = new MetronError()
- .withErrorType(Constants.ErrorType.PARSER_ERROR)
- .withThrowable(new NullPointerException())
- .withSensorType(Collections.singleton(sensorType))
- .addRawMessage(sampleBinary);
- verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
+ public void shouldThrowExceptionOnDifferentHandleAck() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("All writers must match when calling handleAck()");
+
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
+ put("yaf", writerHandler);
+ put("bro", writerHandlerHandleAck);
+ }});
}
@Test
- public void testInvalid() throws Exception {
- String sensorType = "yaf";
- Map<String, ParserComponents> parserMap = Collections.singletonMap(
- sensorType,
- new ParserComponents(
- parser,
- null,
- new WriterHandler(writer)
- )
- );
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
- @Override
- protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
- return ParserBoltTest.createUpdater();
- }
- };
+ public void withBatchTimeoutDivisorShouldSetBatchTimeoutDivisor() {
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
+ put("yaf", writerHandler);
+ }}).withBatchTimeoutDivisor(5);
- buildGlobalConfig(parserBolt);
+ Assert.assertEquals(5, parserBolt.getBatchTimeoutDivisor());
+ }
- parserBolt.setCuratorFramework(client);
- parserBolt.setZKCache(cache);
- parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
- byte[] sampleBinary = "some binary message".getBytes();
-
- when(tuple.getBinary(0)).thenReturn(sampleBinary);
- JSONObject parsedMessage = new JSONObject();
- parsedMessage.put("field", "invalidValue");
- parsedMessage.put("guid", "this-is-unique-identifier-for-tuple");
- List<JSONObject> messageList = new ArrayList<>();
- messageList.add(parsedMessage);
- when(parser.parseOptionalResult(sampleBinary)).thenReturn(Optional.of(new DefaultMessageParserResult<>(messageList)));
- when(parser.validate(parsedMessage)).thenReturn(true);
- parserBolt.execute(tuple);
+ @Test
+ public void shouldThrowExceptionOnInvalidBatchTimeoutDivisor() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("batchTimeoutDivisor must be positive. Value provided was -1");
- MetronError error = new MetronError()
- .withErrorType(Constants.ErrorType.PARSER_INVALID)
- .withSensorType(Collections.singleton(sensorType))
- .withErrorFields(new HashSet<String>() {{ add("field"); }})
- .addRawMessage(new JSONObject(){{
- put("field", "invalidValue");
- put("source.type", "yaf");
- put("guid", "this-is-unique-identifier-for-tuple");
- }});
- verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
+ put("yaf", writerHandler);
+ }}).withBatchTimeoutDivisor(-1);
}
@Test
- public void test() throws Exception {
- String sensorType = "yaf";
- Map<String, ParserComponents> parserMap = Collections.singletonMap(
- sensorType,
- new ParserComponents(
- parser,
- null,
- new WriterHandler(writer)
- )
- );
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
+ public void shouldGetComponentConfiguration() {
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
+ put("yaf", writerHandler);
+ }}) {
+
@Override
- protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
- return ParserBoltTest.createUpdater();
+ public ParserConfigurations getConfigurations() {
+ ParserConfigurations configurations = new ParserConfigurations();
+ SensorParserConfig sensorParserConfig = new SensorParserConfig();
+ sensorParserConfig.setParserConfig(new HashMap<String, Object>() {{
+ put(IndexingConfigurations.BATCH_SIZE_CONF, 10);
+ }});
+ configurations.updateSensorParserConfig("yaf", sensorParserConfig);
+ return configurations;
}
};
- parserBolt.setCuratorFramework(client);
- parserBolt.setZKCache(cache);
- parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
- verify(parser, times(1)).init();
- verify(writer, times(1)).init();
- byte[] sampleBinary = "some binary message".getBytes();
- JSONParser jsonParser = new JSONParser();
- final JSONObject sampleMessage1 = (JSONObject) jsonParser.parse("{ \"field1\":\"value1\", \"guid\": \"this-is-unique-identifier-for-tuple\" }");
- final JSONObject sampleMessage2 = (JSONObject) jsonParser.parse("{ \"field2\":\"value2\", \"guid\": \"this-is-unique-identifier-for-tuple\" }");
- List<JSONObject> messages = new ArrayList<JSONObject>() {{
- add(sampleMessage1);
- add(sampleMessage2);
- }};
- final JSONObject finalMessage1 = (JSONObject) jsonParser.parse("{ \"field1\":\"value1\", \"source.type\":\"" + sensorType + "\", \"guid\": \"this-is-unique-identifier-for-tuple\" }");
- final JSONObject finalMessage2 = (JSONObject) jsonParser.parse("{ \"field2\":\"value2\", \"source.type\":\"" + sensorType + "\", \"guid\": \"this-is-unique-identifier-for-tuple\" }");
- when(tuple.getBinary(0)).thenReturn(sampleBinary);
- when(parser.parseOptionalResult(sampleBinary)).thenReturn(Optional.of(new DefaultMessageParserResult<>(messages)));
- when(parser.validate(eq(messages.get(0)))).thenReturn(true);
- when(parser.validate(eq(messages.get(1)))).thenReturn(false);
- parserBolt.execute(tuple);
- verify(writer, times(1)).write(eq(sensorType), any(ParserWriterConfiguration.class), eq(tuple), eq(finalMessage1));
- verify(outputCollector, times(1)).ack(tuple);
- when(parser.validate(eq(messages.get(0)))).thenReturn(true);
- when(parser.validate(eq(messages.get(1)))).thenReturn(true);
- when(filter.emitTuple(eq(messages.get(0)), any())).thenReturn(false);
- when(filter.emitTuple(eq(messages.get(1)), any())).thenReturn(true);
- parserBolt.execute(tuple);
- verify(writer, times(1)).write(eq(sensorType), any(ParserWriterConfiguration.class), eq(tuple), eq(finalMessage2));
- verify(outputCollector, times(2)).ack(tuple);
- doThrow(new Exception()).when(writer).write(eq(sensorType), any(ParserWriterConfiguration.class), eq(tuple), eq(finalMessage2));
- parserBolt.execute(tuple);
- verify(outputCollector, times(1)).reportError(any(Throwable.class));
- }
- /**
- {
- "filterClassName" : "STELLAR"
- ,"parserConfig" : {
- "filter.query" : "exists(field1)"
- }
- }
- */
- @Multiline
- public static String sensorParserConfig;
-
- /**
- * Tests to ensure that a message that is unfiltered results in one write and an ack.
- * @throws Exception
- */
- @Test
- public void testFilterSuccess() throws Exception {
- String sensorType = "yaf";
- Map<String, ParserComponents> parserMap = Collections.singletonMap(
- sensorType,
- new ParserComponents(
- parser,
- null,
- new WriterHandler(batchWriter)
- )
- );
- ParserBolt parserBolt = buildParserBolt(parserMap, sensorParserConfig);
-
- parserBolt.setCuratorFramework(client);
- parserBolt.setZKCache(cache);
- parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
- verify(parser, times(1)).init();
- verify(batchWriter, times(1)).init(any(), any(), any());
- BulkWriterResponse successResponse = mock(BulkWriterResponse.class);
- when(successResponse.getSuccesses()).thenReturn(ImmutableList.of(t1));
- when(batchWriter.write(any(), any(), any(), any())).thenReturn(successResponse);
- when(parser.validate(any())).thenReturn(true);
- when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject(new HashMap<String, Object>() {{
- put("field1", "blah");
- }})))));
- parserBolt.execute(t1);
- verify(batchWriter, times(1)).write(any(), any(), any(), any());
- verify(outputCollector, times(1)).ack(t1);
+ Map<String, Object> componentConfiguration = parserBolt.getComponentConfiguration();
+ Assert.assertEquals(1, componentConfiguration.size());
+ Assert.assertEquals( 14, componentConfiguration.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS));
}
-
- /**
- * Tests to ensure that a message filtered out results in no writes, but an ack.
- * @throws Exception
- */
@Test
- public void testFilterFailure() throws Exception {
- String sensorType = "yaf";
- Map<String, ParserComponents> parserMap = Collections.singletonMap(
- sensorType,
- new ParserComponents(
- parser,
- null,
- new WriterHandler(batchWriter)
- )
- );
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
+ public void shouldPrepare() {
+ Map stormConf = mock(Map.class);
+ SensorParserConfig yafConfig = mock(SensorParserConfig.class);
+ when(yafConfig.getSensorTopic()).thenReturn("yafTopic");
+ when(yafConfig.getParserConfig()).thenReturn(new HashMap<String, Object>() {{
+ put(IndexingConfigurations.BATCH_SIZE_CONF, 10);
+ }});
+ ParserConfigurations parserConfigurations = mock(ParserConfigurations.class);
+
+ ParserBolt parserBolt = spy(new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
+ put("yaf", writerHandler);
+ }}) {
+
@Override
protected SensorParserConfig getSensorParserConfig(String sensorType) {
- try {
- return SensorParserConfig.fromBytes(Bytes.toBytes(sensorParserConfig));
- } catch (IOException e) {
- throw new RuntimeException(e);
+ if ("yaf".equals(sensorType)) {
+ return yafConfig;
}
+ return null;
}
@Override
- protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
- return ParserBoltTest.createUpdater();
+ public ParserConfigurations getConfigurations() {
+ return parserConfigurations;
}
- };
+ });
+ doReturn(stellarContext).when(parserBolt).initializeStellar();
parserBolt.setCuratorFramework(client);
parserBolt.setZKCache(cache);
- parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
- verify(parser, times(1)).init();
- verify(batchWriter, times(1)).init(any(), any(), any());
- when(parser.validate(any())).thenReturn(true);
- when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject(new HashMap<String, Object>() {{
- put("field2", "blah");
- }})))));
- parserBolt.execute(t1);
- verify(batchWriter, times(0)).write(any(), any(), any(), any());
- verify(outputCollector, times(1)).ack(t1);
- }
- /**
- {
- "sensorTopic":"dummy"
- ,"parserConfig": {
- "batchSize" : 1
- }
- ,"fieldTransformations" : [
- {
- "transformation" : "STELLAR"
- ,"output" : "timestamp"
- ,"config" : {
- "timestamp" : "TO_EPOCH_TIMESTAMP(timestampstr, 'yyyy-MM-dd HH:mm:ss', 'UTC')"
- }
- }
- ]
- }
- */
- @Multiline
- public static String csvWithFieldTransformations;
- @Test
- public void testFieldTransformationPriorToValidation() {
- String sensorType = "dummy";
- RecordingWriter recordingWriter = new RecordingWriter();
- //create a parser which acts like a basic parser but returns no timestamp field.
- MultilineMessageParser<JSONObject> dummyParser = new MultilineMessageParser<JSONObject>() {
- @Override
- public void configure(Map<String, Object> config) {
- }
+ parserBolt.prepare(stormConf, topologyContext, outputCollector);
- @Override
- public void init() {
- }
+ verify(parserRunner, times(1)).init(any(Supplier.class), eq(stellarContext));
+ verify(yafConfig, times(1)).init();
+ Map<String, String> topicToSensorMap = parserBolt.getTopicToSensorMap();
+ Assert.assertEquals(1, topicToSensorMap.size());
+ Assert.assertEquals("yaf", topicToSensorMap.get("yafTopic"));
+ verify(writerHandler).init(stormConf, topologyContext, outputCollector, parserConfigurations);
+ verify(writerHandler).setDefaultBatchTimeout(14);
+ }
- @Override
- public boolean validate(JSONObject message) {
- Object timestampObject = message.get(Constants.Fields.TIMESTAMP.getName());
- if (timestampObject instanceof Long) {
- Long timestamp = (Long) timestampObject;
- return timestamp > 0;
- }
- return false;
- }
+ @Test
+ public void shouldThrowExceptionOnMissingConfig() {
+ exception.expect(IllegalStateException.class);
+ exception.expectMessage("Unable to retrieve a parser config for yaf");
- @Override
- @SuppressWarnings("unchecked")
- public Optional<MessageParserResult<JSONObject>> parseOptionalResult(byte[] rawMessage) {
- return Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject() {{
- put("data", "foo");
- put("timestampstr", "2016-01-05 17:02:30");
- put("original_string", "blah");
- }})));
- }
- };
+ Map stormConf = mock(Map.class);
- Map<String, ParserComponents> parserMap = Collections.singletonMap(
- sensorType,
- new ParserComponents(
- dummyParser,
- null,
- new WriterHandler(recordingWriter)
- )
- );
- ParserBolt parserBolt = buildParserBolt(parserMap, csvWithFieldTransformations);
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
+ put("yaf", writerHandler);
+ }});
parserBolt.setCuratorFramework(client);
parserBolt.setZKCache(cache);
- parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
- when(t1.getBinary(0)).thenReturn(new byte[] {});
- parserBolt.execute(t1);
- Assert.assertEquals(1, recordingWriter.getRecords().size());
- long expected = 1452013350000L;
- Assert.assertEquals(expected, recordingWriter.getRecords().get(0).get("timestamp"));
+
+ parserBolt.prepare(stormConf, topologyContext, outputCollector);
}
+
@Test
- public void testDefaultBatchSize() throws Exception {
- String sensorType = "yaf";
- Map<String, ParserComponents> parserMap = Collections.singletonMap(
- sensorType,
- new ParserComponents(
- parser,
- filter,
- new WriterHandler(batchWriter)
- )
- );
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
+ public void executeShouldHandleTickTuple() throws Exception {
+ when(t1.getSourceComponent()).thenReturn("__system");
+ when(t1.getSourceStreamId()).thenReturn("__tick");
+ ParserConfigurations parserConfigurations = mock(ParserConfigurations.class);
+
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
+ put("yaf", writerHandler);
+ }}) {
+
@Override
- protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
- // this uses default batch size
- return ParserBoltTest.createUpdater();
+ public ParserConfigurations getConfigurations() {
+ return parserConfigurations;
}
};
- parserBolt.setCuratorFramework(client);
- parserBolt.setZKCache(cache);
- parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
- verify(parser, times(1)).init();
- verify(batchWriter, times(1)).init(any(), any(), any());
- when(parser.validate(any())).thenReturn(true);
- when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject()))));
- when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
- BulkWriterResponse response = new BulkWriterResponse();
- Tuple[] uniqueTuples = new Tuple[ParserConfigurations.DEFAULT_KAFKA_BATCH_SIZE];
- for (int i=0; i < uniqueTuples.length; i++) {
- uniqueTuples[i] = mock(Tuple.class);
- response.addSuccess(uniqueTuples[i]);
- }
- when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(new HashSet<>(Arrays.asList(uniqueTuples))), any())).thenReturn(response);
- for (Tuple tuple : uniqueTuples) {
- parserBolt.execute(tuple);
- }
- for (Tuple uniqueTuple : uniqueTuples) {
- verify(outputCollector, times(1)).ack(uniqueTuple);
- }
+ parserBolt.setMessageGetStrategy(messageGetStrategy);
+ parserBolt.setOutputCollector(outputCollector);
+
+ parserBolt.execute(t1);
+
+ verify(writerHandler, times(1)).flush(parserConfigurations, messageGetStrategy);
+ verify(outputCollector, times(1)).ack(t1);
}
@Test
- public void testLessRecordsThanDefaultBatchSize() throws Exception {
- String sensorType = "yaf";
- Map<String, ParserComponents> parserMap = Collections.singletonMap(
- sensorType,
- new ParserComponents(
- parser,
- filter,
- new WriterHandler(batchWriter)
- )
- );
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
+ public void shouldExecuteOnSuccess() throws Exception {
+ when(messageGetStrategy.get(t1)).thenReturn("originalMessage".getBytes(StandardCharsets.UTF_8));
+ when(t1.getStringByField(FieldsConfiguration.TOPIC.getFieldName())).thenReturn("yafTopic");
+ MockParserRunner mockParserRunner = new MockParserRunner(new HashSet<String>() {{ add("yaf"); }});
+ ParserConfigurations parserConfigurations = new ParserConfigurations();
+ parserConfigurations.updateSensorParserConfig("yaf", new SensorParserConfig());
+
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", mockParserRunner, new HashMap<String, WriterHandler>() {{
+ put("yaf", writerHandler);
+ }}) {
+
@Override
- protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
- // this uses default batch size
- return ParserBoltTest.createUpdater();
+ public ParserConfigurations getConfigurations() {
+ return parserConfigurations;
}
};
- parserBolt.setCuratorFramework(client);
- parserBolt.setZKCache(cache);
- parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
- verify(parser, times(1)).init();
- verify(batchWriter, times(1)).init(any(), any(), any());
- when(parser.validate(any())).thenReturn(true);
- when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject()))));
- when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
- int oneLessThanDefaultBatchSize = ParserConfigurations.DEFAULT_KAFKA_BATCH_SIZE - 1;
- BulkWriterResponse response = new BulkWriterResponse();
- Tuple[] uniqueTuples = new Tuple[oneLessThanDefaultBatchSize];
- for (int i=0; i < uniqueTuples.length; i++) {
- uniqueTuples[i] = mock(Tuple.class);
- response.addSuccess(uniqueTuples[i]);
+ parserBolt.setMessageGetStrategy(messageGetStrategy);
+ parserBolt.setOutputCollector(outputCollector);
+ parserBolt.setTopicToSensorMap(new HashMap<String, String>() {{
+ put("yafTopic", "yaf");
+ }});
+ JSONObject message = new JSONObject();
+ message.put("field", "value");
+ mockParserRunner.setMessage(message);
+ RawMessage expectedRawMessage = new RawMessage("originalMessage".getBytes(StandardCharsets.UTF_8), new HashMap<>());
+
+ {
+ // Verify the correct message is written and ack is handled
+ parserBolt.execute(t1);
+
+ Assert.assertEquals(expectedRawMessage, mockParserRunner.getRawMessage());
+ verify(writerHandler, times(1)).write("yaf", t1, message, parserConfigurations, messageGetStrategy);
+ verify(outputCollector, times(1)).ack(t1);
}
- for (Tuple tuple : uniqueTuples) {
- parserBolt.execute(tuple);
+ {
+ // Verify the tuple is not acked when the writer is set to handle ack
+ reset(outputCollector);
+ parserBolt.setSensorToWriterMap(new HashMap<String, WriterHandler>() {{
+ put("yaf", writerHandlerHandleAck);
+ }});
+
+ parserBolt.execute(t1);
+
+ verify(writerHandlerHandleAck, times(1)).write("yaf", t1, message, parserConfigurations, messageGetStrategy);
+ verify(outputCollector, times(0)).ack(t1);
}
- // should have no acking yet - batch size not fulfilled
- verify(outputCollector, never()).ack(any(Tuple.class));
- response.addSuccess(t1); // used to achieve count in final verify
- Iterable<Tuple> tuples = new HashSet(Arrays.asList(uniqueTuples)) {{
- add(t1);
- }};
- when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(tuples), any())).thenReturn(response);
- // meet batch size requirement and now it should ack
- parserBolt.execute(t1);
- verify(outputCollector, times(ParserConfigurations.DEFAULT_KAFKA_BATCH_SIZE)).ack(any(Tuple.class));
}
@Test
- public void testBatchOfOne() throws Exception {
- String sensorType = "yaf";
- Map<String, ParserComponents> parserMap = Collections.singletonMap(
- sensorType,
- new ParserComponents(
- parser,
- filter,
- new WriterHandler(batchWriter)
- )
- );
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
+ public void shouldExecuteOnError() throws Exception {
+ when(messageGetStrategy.get(t1)).thenReturn("originalMessage".getBytes(StandardCharsets.UTF_8));
+ when(t1.getStringByField(FieldsConfiguration.TOPIC.getFieldName())).thenReturn("yafTopic");
+ MockParserRunner mockParserRunner = new MockParserRunner(new HashSet<String>() {{
+ add("yaf");
+ }});
+ mockParserRunner.setInvalid(true);
+ ParserConfigurations parserConfigurations = new ParserConfigurations();
+ parserConfigurations.updateSensorParserConfig("yaf", new SensorParserConfig());
+
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", mockParserRunner, new HashMap<String, WriterHandler>() {{
+ put("yaf", writerHandler);
+ }}) {
+
@Override
- protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
- return ParserBoltTest.createUpdater(Optional.of(1));
+ public ParserConfigurations getConfigurations() {
+ return parserConfigurations;
}
};
- parserBolt.setCuratorFramework(client);
- parserBolt.setZKCache(cache);
- parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
- verify(parser, times(1)).init();
- verify(batchWriter, times(1)).init(any(), any(), any());
- when(parser.validate(any())).thenReturn(true);
- when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject()))));
- when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
- BulkWriterResponse response = new BulkWriterResponse();
- response.addSuccess(t1);
- when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(Collections.singleton(t1)), any())).thenReturn(response);
+ parserBolt.setMessageGetStrategy(messageGetStrategy);
+ parserBolt.setOutputCollector(outputCollector);
+ parserBolt.setTopicToSensorMap(new HashMap<String, String>() {{
+ put("yafTopic", "yaf");
+ }});
+ JSONObject message = new JSONObject();
+ message.put("field", "value");
+ mockParserRunner.setMessage(message);
+ RawMessage expectedRawMessage = new RawMessage("originalMessage".getBytes(StandardCharsets.UTF_8), new HashMap<>());
+ MetronError error = new MetronError()
+ .withErrorType(Constants.ErrorType.PARSER_INVALID)
+ .withSensorType(Collections.singleton("yaf"))
+ .addRawMessage(message);
+
parserBolt.execute(t1);
+
+ Assert.assertEquals(expectedRawMessage, mockParserRunner.getRawMessage());
+ verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM),
+ argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
verify(outputCollector, times(1)).ack(t1);
+
}
@Test
- public void testBatchOfFive() throws Exception {
- String sensorType = "yaf";
- Map<String, ParserComponents> parserMap = Collections.singletonMap(
- sensorType,
- new ParserComponents(
- parser,
- filter,
- new WriterHandler(batchWriter)
- )
- );
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
- @Override
- protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
- return ParserBoltTest.createUpdater(Optional.of(5));
- }
- } ;
-
- parserBolt.setCuratorFramework(client);
- parserBolt.setZKCache(cache);
- parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
- verify(parser, times(1)).init();
- verify(batchWriter, times(1)).init(any(), any(), any());
- when(parser.validate(any())).thenReturn(true);
- when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject()))));
- when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
- Set<Tuple> tuples = Stream.of(t1, t2, t3, t4, t5).collect(Collectors.toSet());
- BulkWriterResponse response = new BulkWriterResponse();
- response.addAllSuccesses(tuples);
- when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(tuples), any())).thenReturn(response);
- writeNonBatch(outputCollector, parserBolt, t1);
- writeNonBatch(outputCollector, parserBolt, t2);
- writeNonBatch(outputCollector, parserBolt, t3);
- writeNonBatch(outputCollector, parserBolt, t4);
- parserBolt.execute(t5);
- verify(batchWriter, times(1)).write(eq(sensorType), any(WriterConfiguration.class), eq(tuples), any());
- verify(outputCollector, times(1)).ack(t1);
- verify(outputCollector, times(1)).ack(t2);
- verify(outputCollector, times(1)).ack(t3);
- verify(outputCollector, times(1)).ack(t4);
- verify(outputCollector, times(1)).ack(t5);
+ public void shouldThrowExceptionOnFailedExecute() {
+ when(messageGetStrategy.get(t1)).thenReturn("originalMessage".getBytes(StandardCharsets.UTF_8));
+ when(t1.getStringByField(FieldsConfiguration.TOPIC.getFieldName())).thenReturn("yafTopic");
+ ParserConfigurations parserConfigurations = new ParserConfigurations();
+ parserConfigurations.updateSensorParserConfig("yaf", new SensorParserConfig());
+ doThrow(new IllegalStateException("parserRunner.execute failed")).when(parserRunner).execute(eq("yaf"), any(), eq(parserConfigurations));
- }
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
+ put("yaf", writerHandler);
+ }}) {
- @Test
- public void testBatchOfFiveWithError() throws Exception {
- String sensorType = "yaf";
- Map<String, ParserComponents> parserMap = Collections.singletonMap(
- sensorType,
- new ParserComponents(
- parser,
- filter,
- new WriterHandler(batchWriter)
- )
- );
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
@Override
- protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
- return ParserBoltTest.createUpdater(Optional.of(5));
+ public ParserConfigurations getConfigurations() {
+ return parserConfigurations;
}
};
- parserBolt.setCuratorFramework(client);
- parserBolt.setZKCache(cache);
- parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
- verify(parser, times(1)).init();
- verify(batchWriter, times(1)).init(any(), any(), any());
-
- doThrow(new Exception()).when(batchWriter).write(any(), any(), any(), any());
- when(parser.validate(any())).thenReturn(true);
- when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject()))));
- when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
- parserBolt.execute(t1);
- parserBolt.execute(t2);
- parserBolt.execute(t3);
- parserBolt.execute(t4);
- parserBolt.execute(t5);
- verify(batchWriter, times(1)).write(any(), any(), any(), any());
- verify(outputCollector, times(1)).ack(t1);
- verify(outputCollector, times(1)).ack(t2);
- verify(outputCollector, times(1)).ack(t3);
- verify(outputCollector, times(1)).ack(t4);
- verify(outputCollector, times(1)).ack(t5);
+ parserBolt.setMessageGetStrategy(messageGetStrategy);
+ parserBolt.setOutputCollector(outputCollector);
+ parserBolt.setTopicToSensorMap(new HashMap<String, String>() {{
+ put("yafTopic", "yaf");
+ }});
+ MetronError error = new MetronError()
+ .withErrorType(Constants.ErrorType.PARSER_ERROR)
+ .withThrowable(new IllegalStateException("parserRunner.execute failed"))
+ .withSensorType(Collections.singleton("yaf"))
+ .addRawMessage("originalMessage".getBytes(StandardCharsets.UTF_8));
- }
+ parserBolt.execute(t1);
- protected void buildGlobalConfig(ParserBolt parserBolt) {
- HashMap<String, Object> globalConfig = new HashMap<>();
- Map<String, Object> fieldValidation = new HashMap<>();
- fieldValidation.put("input", Arrays.asList("field"));
- fieldValidation.put("validation", "STELLAR");
- fieldValidation.put("config", new HashMap<String, String>(){{ put("condition", "field != 'invalidValue'"); }});
- globalConfig.put("fieldValidations", Arrays.asList(fieldValidation));
- parserBolt.getConfigurations().updateGlobalConfig(globalConfig);
+ verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM),
+ argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
+ verify(outputCollector, times(1)).reportError(any(IllegalStateException.class));
+ verify(outputCollector, times(1)).ack(t1);
}
- private ParserBolt buildParserBolt(Map<String, ParserComponents> parserMap,
- String csvWithFieldTransformations) {
- return new ParserBolt("zookeeperUrl", parserMap) {
- @Override
- protected SensorParserConfig getSensorParserConfig(String sensorType) {
- try {
- return SensorParserConfig.fromBytes(Bytes.toBytes(csvWithFieldTransformations));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
+ @Test
+ public void shouldThrowExceptionOnFailedWrite() throws Exception {
+ when(messageGetStrategy.get(t1)).thenReturn("originalMessage".getBytes(StandardCharsets.UTF_8));
+ when(t1.getStringByField(FieldsConfiguration.TOPIC.getFieldName())).thenReturn("yafTopic");
+ MockParserRunner mockParserRunner = new MockParserRunner(new HashSet<String>() {{ add("yaf"); }});
+ ParserConfigurations parserConfigurations = new ParserConfigurations();
+ parserConfigurations.updateSensorParserConfig("yaf", new SensorParserConfig());
+ doThrow(new IllegalStateException("write failed")).when(writerHandler).write(any(), any(), any(), any(), any());
+
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", mockParserRunner, new HashMap<String, WriterHandler>() {{
+ put("yaf", writerHandler);
+ }}) {
@Override
- protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
- return ParserBoltTest.createUpdater(Optional.of(1));
+ public ParserConfigurations getConfigurations() {
+ return parserConfigurations;
}
};
- }
- private static void writeNonBatch(OutputCollector collector, ParserBolt bolt, Tuple t) {
- bolt.execute(t);
- }
+ parserBolt.setMessageGetStrategy(messageGetStrategy);
+ parserBolt.setOutputCollector(outputCollector);
+ parserBolt.setTopicToSensorMap(new HashMap<String, String>() {{
+ put("yafTopic", "yaf");
+ }});
+ JSONObject message = new JSONObject();
+ message.put("field", "value");
+ mockParserRunner.setMessage(message);
+
+ MetronError error = new MetronError()
+ .withErrorType(Constants.ErrorType.PARSER_ERROR)
+ .withThrowable(new IllegalStateException("write failed"))
+ .withSensorType(Collections.singleton("yaf"))
+ .addRawMessage("originalMessage".getBytes(StandardCharsets.UTF_8));
+ parserBolt.execute(t1);
+
+ verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM),
+ argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
+ verify(outputCollector, times(1)).reportError(any(IllegalStateException.class));
+ verify(outputCollector, times(1)).ack(t1);
+ }
}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
index 0d6eef8..31d87a7 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
@@ -25,21 +25,20 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
+
import org.apache.commons.lang.SerializationUtils;
-import org.apache.metron.common.configuration.FieldValidator;
import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.metron.common.utils.JSONUtils;
-import org.apache.metron.common.utils.ReflectionUtils;
import org.apache.metron.common.writer.MessageWriter;
import org.apache.metron.integration.ProcessorResult;
+import org.apache.metron.parsers.ParserRunner;
+import org.apache.metron.parsers.ParserRunnerImpl;
import org.apache.metron.parsers.bolt.ParserBolt;
import org.apache.metron.parsers.bolt.WriterHandler;
-import org.apache.metron.parsers.topology.ParserComponents;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.tuple.Tuple;
import org.json.simple.JSONObject;
@@ -83,41 +82,13 @@ public class ParserDriver implements Serializable {
List<byte[]> errors = new ArrayList<>();
public ShimParserBolt(List<byte[]> output) {
- super(null
- , Collections.singletonMap(
- sensorType == null ? config.getSensorTopic() : sensorType,
- new ParserComponents(
- ReflectionUtils.createInstance(config.getParserClassName()),
- null,
- new WriterHandler(new CollectingWriter(output))
- )
- )
- );
+ super(null, parserRunner, Collections.singletonMap(sensorType, new WriterHandler(new CollectingWriter(output))));
this.output = output;
- Map<String, ParserComponents> sensorToComponentMap = getSensorToComponentMap();
- for(Entry<String, ParserComponents> sensorToComponents : sensorToComponentMap.entrySet()) {
- sensorToComponents.getValue().getMessageParser().configure(config.getParserConfig());
- }
}
@Override
public ParserConfigurations getConfigurations() {
- return new ParserConfigurations() {
- @Override
- public SensorParserConfig getSensorParserConfig(String sensorType) {
- return config;
- }
-
- @Override
- public Map<String, Object> getGlobalConfig() {
- return globalConfig;
- }
-
- @Override
- public List<FieldValidator> getFieldValidations() {
- return new ArrayList<>();
- }
- };
+ return config;
}
@Override
@@ -125,7 +96,7 @@ public class ParserDriver implements Serializable {
}
@Override
- protected void handleError(byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) {
+ protected void handleError(String sensorType, byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) {
errors.add(originalMessage);
LOG.error("Error parsing message: " + ex.getMessage(), ex);
}
@@ -139,14 +110,20 @@ public class ParserDriver implements Serializable {
}
- private SensorParserConfig config;
- private Map<String, Object> globalConfig;
+ private ParserConfigurations config;
private String sensorType;
+ private ParserRunner parserRunner;
public ParserDriver(String sensorType, String parserConfig, String globalConfig) throws IOException {
- config = SensorParserConfig.fromBytes(parserConfig.getBytes());
- this.sensorType = sensorType;
- this.globalConfig = JSONUtils.INSTANCE.load(globalConfig, JSONUtils.MAP_SUPPLIER);
+ SensorParserConfig sensorParserConfig = SensorParserConfig.fromBytes(parserConfig.getBytes());
+ this.sensorType = sensorType == null ? sensorParserConfig.getSensorTopic() : sensorType;
+ config = new ParserConfigurations();
+ config.updateSensorParserConfig(this.sensorType, SensorParserConfig.fromBytes(parserConfig.getBytes()));
+ config.updateGlobalConfig(JSONUtils.INSTANCE.load(globalConfig, JSONUtils.MAP_SUPPLIER));
+
+ parserRunner = new ParserRunnerImpl(new HashSet<String>() {{
+ add(sensorType);
+ }});
}
public ProcessorResult<List<byte[]>> run(Iterable<byte[]> in) {
@@ -163,6 +140,7 @@ public class ParserDriver implements Serializable {
public Tuple toTuple(byte[] record) {
Tuple ret = mock(Tuple.class);
+ when(ret.getStringByField("topic")).thenReturn(sensorType);
when(ret.getBinary(eq(0))).thenReturn(record);
return ret;
}
[2/2] metron git commit: METRON-1681 Decouple the ParserBolt from the
Parse execution logic (merrimanr) closes apache/metron#1213
Posted by rm...@apache.org.
METRON-1681 Decouple the ParserBolt from the Parse execution logic (merrimanr) closes apache/metron#1213
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/28542ad6
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/28542ad6
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/28542ad6
Branch: refs/heads/master
Commit: 28542ad64cf63f17b728b4b1c0e995a8973767f7
Parents: 08f3de0
Author: merrimanr <me...@gmail.com>
Authored: Thu Oct 18 13:59:52 2018 -0500
Committer: rmerriman <me...@gmail.com>
Committed: Thu Oct 18 13:59:52 2018 -0500
----------------------------------------------------------------------
.../impl/SensorParserConfigServiceImpl.java | 51 +-
.../parsers/DefaultParserRunnerResults.java | 71 ++
.../org/apache/metron/parsers/GrokParser.java | 3 +-
.../org/apache/metron/parsers/ParserRunner.java | 60 ++
.../apache/metron/parsers/ParserRunnerImpl.java | 322 +++++++
.../metron/parsers/ParserRunnerResults.java | 33 +
.../apache/metron/parsers/bolt/ParserBolt.java | 381 +++-----
.../parsers/filters/BroMessageFilter.java | 2 +-
.../metron/parsers/filters/StellarFilter.java | 2 +-
.../parsers/interfaces/MessageFilter.java | 2 +-
.../parsers/interfaces/MessageParser.java | 27 +-
.../interfaces/MultilineMessageParser.java | 51 --
.../metron/parsers/syslog/Syslog5424Parser.java | 4 +-
.../parsers/topology/ParserComponent.java | 56 ++
.../parsers/topology/ParserComponents.java | 67 --
.../parsers/topology/ParserTopologyBuilder.java | 39 +-
.../org/apache/metron/filters/FiltersTest.java | 4 +-
.../metron/parsers/MessageParserTest.java | 108 ++-
.../metron/parsers/ParserRunnerImplTest.java | 390 +++++++++
.../metron/parsers/bolt/ParserBoltTest.java | 859 ++++++-------------
.../parsers/integration/ParserDriver.java | 60 +-
21 files changed, 1481 insertions(+), 1111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
index 4cd272e..d0e4b3d 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
@@ -20,12 +20,10 @@ package org.apache.metron.rest.service.impl;
import static org.apache.metron.rest.MetronRestConstants.GROK_CLASS_NAME;
import com.fasterxml.jackson.databind.ObjectMapper;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import org.apache.curator.framework.CuratorFramework;
import org.apache.hadoop.fs.Path;
import org.apache.metron.common.configuration.ConfigurationType;
@@ -35,18 +33,14 @@ import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.common.zookeeper.ConfigurationsCache;
import org.apache.metron.parsers.interfaces.MessageParser;
import org.apache.metron.parsers.interfaces.MessageParserResult;
-import org.apache.metron.parsers.interfaces.MultilineMessageParser;
import org.apache.metron.rest.MetronRestConstants;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.model.ParseMessageRequest;
import org.apache.metron.rest.service.GrokService;
import org.apache.metron.rest.service.SensorParserConfigService;
import org.apache.metron.rest.util.ParserIndex;
-import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
import org.apache.zookeeper.KeeperException;
import org.json.simple.JSONObject;
-import org.reflections.Reflections;
-import org.reflections.util.ConfigurationBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -141,53 +135,13 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService
} else if (sensorParserConfig.getParserClassName() == null) {
throw new RestException("SensorParserConfig must have a parserClassName");
} else {
- MultilineMessageParser<JSONObject> parser;
- Object parserObject;
+ MessageParser<JSONObject> parser;
try {
- parserObject = Class.forName(sensorParserConfig.getParserClassName())
+ parser = (MessageParser<JSONObject>) Class.forName(sensorParserConfig.getParserClassName())
.newInstance();
} catch (Exception e) {
throw new RestException(e.toString(), e.getCause());
}
-
- if (!(parserObject instanceof MultilineMessageParser)) {
- parser = new MultilineMessageParser<JSONObject>() {
-
- @Override
- @SuppressWarnings("unchecked")
- public void configure(Map<String, Object> config) {
- ((MessageParser<JSONObject>)parserObject).configure(config);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void init() {
- ((MessageParser<JSONObject>)parserObject).init();
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public boolean validate(JSONObject message) {
- return ((MessageParser<JSONObject>)parserObject).validate(message);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public List<JSONObject> parse(byte[] message) {
- return ((MessageParser<JSONObject>)parserObject).parse(message);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public Optional<List<JSONObject>> parseOptional(byte[] message) {
- return ((MessageParser<JSONObject>)parserObject).parseOptional(message);
- }
- };
- } else {
- parser = (MultilineMessageParser<JSONObject>)parserObject;
- }
-
-
Path temporaryGrokPath = null;
if (isGrokConfig(sensorParserConfig)) {
String name = parseMessageRequest.getSensorParserConfig().getSensorTopic();
@@ -195,7 +149,6 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService
sensorParserConfig.getParserConfig()
.put(MetronRestConstants.GROK_PATH_KEY, new Path(temporaryGrokPath, name).toString());
}
-
parser.configure(sensorParserConfig.getParserConfig());
parser.init();
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java
new file mode 100644
index 0000000..79a9b5d
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers;
+
+import org.apache.metron.common.error.MetronError;
+import org.json.simple.JSONObject;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Default implementation of ParserRunnerResults.
+ */
+public class DefaultParserRunnerResults implements ParserRunnerResults<JSONObject> {
+
+ private List<JSONObject> messages = new ArrayList<>();
+ private List<MetronError> errors = new ArrayList<>();
+
+ public List<JSONObject> getMessages() {
+ return messages;
+ }
+
+ public List<MetronError> getErrors() {
+ return errors;
+ }
+
+ public void addMessage(JSONObject message) {
+ this.messages.add(message);
+ }
+
+ public void addError(MetronError error) {
+ this.errors.add(error);
+ }
+
+ public void addErrors(List<MetronError> errors) {
+ this.errors.addAll(errors);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ParserRunnerResults parserResult = (ParserRunnerResults) o;
+ return Objects.equals(messages, parserResult.getMessages()) &&
+ Objects.equals(errors, parserResult.getErrors());
+ }
+
+ @Override
+ public int hashCode() {
+ int result = messages != null ? messages.hashCode() : 0;
+ result = 31 * result + (errors != null ? errors.hashCode() : 0);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
index a81149d..6bdfb81 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.metron.common.Constants;
import org.apache.metron.parsers.interfaces.MessageParser;
import org.apache.metron.parsers.interfaces.MessageParserResult;
-import org.apache.metron.parsers.interfaces.MultilineMessageParser;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,7 +53,7 @@ import java.util.Optional;
import java.util.TimeZone;
-public class GrokParser implements MultilineMessageParser<JSONObject>, Serializable {
+public class GrokParser implements MessageParser<JSONObject>, Serializable {
protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java
new file mode 100644
index 0000000..f9123b1
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers;
+
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.message.metadata.RawMessage;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.apache.metron.stellar.dsl.Context;
+
+import java.util.List;
+import java.util.Set;
+import java.util.function.Supplier;
+
+/**
+ * A ParserRunner is responsible for initializing MessageParsers and parsing messages with the appropriate MessageParser.
+ * The information needed to initialize a MessageParser is supplied by the parser config supplier. After the parsers
+ * are initialized, the execute method can then be called for each message and will return a ParserRunnerResults object
+ * that contains a list of parsed messages and/or a list of errors.
+ * @param <T> The type of a successfully parsed message.
+ */
+public interface ParserRunner<T> {
+
+ /**
+ * Return a list of all sensor types that can be parsed with this ParserRunner.
+ * @return Sensor types
+ */
+ Set<String> getSensorTypes();
+
+ /**
+ *
+ * @param parserConfigSupplier Supplies parser configurations
+ * @param stellarContext Stellar context used to apply Stellar functions during field transformations
+ */
+ void init(Supplier<ParserConfigurations> parserConfigSupplier, Context stellarContext);
+
+ /**
+ * Parses a message and either returns the message or an error.
+ * @param sensorType Sensor type of the message
+ * @param rawMessage Raw message including metadata
+ * @param parserConfigurations Parser configurations
+ * @return ParserRunnerResults containing a list of messages and a list of errors
+ */
+ ParserRunnerResults<T> execute(String sensorType, RawMessage rawMessage, ParserConfigurations parserConfigurations);
+
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java
new file mode 100644
index 0000000..a986db7
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.FieldTransformer;
+import org.apache.metron.common.configuration.FieldValidator;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.message.metadata.RawMessage;
+import org.apache.metron.common.utils.ReflectionUtils;
+import org.apache.metron.parsers.filters.Filters;
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.apache.metron.parsers.topology.ParserComponent;
+import org.apache.metron.stellar.dsl.Context;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * The default implemention of a ParserRunner.
+ */
+public class ParserRunnerImpl implements ParserRunner<JSONObject>, Serializable {
+
+ class ProcessResult {
+
+ private JSONObject message;
+ private MetronError error;
+
+ public ProcessResult(JSONObject message) {
+ this.message = message;
+ }
+
+ public ProcessResult(MetronError error) {
+ this.error = error;
+ }
+
+ public JSONObject getMessage() {
+ return message;
+ }
+
+ public MetronError getError() {
+ return error;
+ }
+
+ public boolean isError() {
+ return error != null;
+ }
+ }
+
+ protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ protected transient Consumer<ParserRunnerResults> onSuccess;
+ protected transient Consumer<MetronError> onError;
+
+ private HashSet<String> sensorTypes;
+ private Map<String, ParserComponent> sensorToParserComponentMap;
+
+ // Stellar variables
+ private transient Context stellarContext;
+
+ public ParserRunnerImpl(HashSet<String> sensorTypes) {
+ this.sensorTypes = sensorTypes;
+ }
+
+ public Map<String, ParserComponent> getSensorToParserComponentMap() {
+ return sensorToParserComponentMap;
+ }
+
+ public void setSensorToParserComponentMap(Map<String, ParserComponent> sensorToParserComponentMap) {
+ this.sensorToParserComponentMap = sensorToParserComponentMap;
+ }
+
+ public Context getStellarContext() {
+ return stellarContext;
+ }
+
+ @Override
+ public Set<String> getSensorTypes() {
+ return sensorTypes;
+ }
+
+ @Override
+ public void init(Supplier<ParserConfigurations> parserConfigSupplier, Context stellarContext) {
+ if (parserConfigSupplier == null) {
+ throw new IllegalStateException("A parser config supplier must be set before initializing the ParserRunner.");
+ }
+ if (stellarContext == null) {
+ throw new IllegalStateException("A stellar context must be set before initializing the ParserRunner.");
+ }
+ this.stellarContext = stellarContext;
+ initializeParsers(parserConfigSupplier);
+ }
+
+ /**
+ * Parses messages with the appropriate MessageParser based on sensor type. The resulting list of messages are then
+ * post-processed and added to the ParserRunnerResults message list. Any errors that happen during post-processing are
+ * added to the ParserRunnerResults error list. Any exceptions (including a master exception) thrown by the MessageParser
+ * are also added to the ParserRunnerResults error list.
+ *
+ * @param sensorType Sensor type of the message
+ * @param rawMessage Raw message including metadata
+ * @param parserConfigurations Parser configurations
+ * @return ParserRunnerResults containing a list of messages and a list of errors
+ */
+ @Override
+ public ParserRunnerResults<JSONObject> execute(String sensorType, RawMessage rawMessage, ParserConfigurations parserConfigurations) {
+ DefaultParserRunnerResults parserRunnerResults = new DefaultParserRunnerResults();
+ SensorParserConfig sensorParserConfig = parserConfigurations.getSensorParserConfig(sensorType);
+ if (sensorParserConfig != null) {
+ MessageParser<JSONObject> parser = sensorToParserComponentMap.get(sensorType).getMessageParser();
+ Optional<MessageParserResult<JSONObject>> optionalMessageParserResult = parser.parseOptionalResult(rawMessage.getMessage());
+ if (optionalMessageParserResult.isPresent()) {
+ MessageParserResult<JSONObject> messageParserResult = optionalMessageParserResult.get();
+
+ // Process each message returned from the MessageParser
+ messageParserResult.getMessages().forEach(message -> {
+ Optional<ProcessResult> processResult = processMessage(sensorType, message, rawMessage, parser, parserConfigurations);
+ if (processResult.isPresent()) {
+ if (processResult.get().isError()) {
+ parserRunnerResults.addError(processResult.get().getError());
+ } else {
+ parserRunnerResults.addMessage(processResult.get().getMessage());
+ }
+ }
+ });
+
+ // If a master exception is thrown by the MessageParser, wrap it with a MetronError and add it to the list of errors
+ messageParserResult.getMasterThrowable().ifPresent(throwable -> parserRunnerResults.addError(new MetronError()
+ .withErrorType(Constants.ErrorType.PARSER_ERROR)
+ .withThrowable(throwable)
+ .withSensorType(Collections.singleton(sensorType))
+ .addRawMessage(rawMessage.getMessage())));
+
+ // If exceptions are thrown by the MessageParser, wrap them with MetronErrors and add them to the list of errors
+ parserRunnerResults.addErrors(messageParserResult.getMessageThrowables().entrySet().stream().map(entry -> new MetronError()
+ .withErrorType(Constants.ErrorType.PARSER_ERROR)
+ .withThrowable(entry.getValue())
+ .withSensorType(Collections.singleton(sensorType))
+ .addRawMessage(entry.getKey())).collect(Collectors.toList()));
+ }
+ } else {
+ throw new IllegalStateException(String.format("Could not execute parser. Cannot find configuration for sensor %s.",
+ sensorType));
+ }
+ return parserRunnerResults;
+ }
+
+ /**
+ * Initializes MessageParsers and MessageFilters for sensor types configured in this ParserRunner. Objects are created
+ * using reflection and the MessageParser configure and init methods are called.
+ * @param parserConfigSupplier Parser configurations
+ */
+ private void initializeParsers(Supplier<ParserConfigurations> parserConfigSupplier) {
+ LOG.info("Initializing parsers...");
+ sensorToParserComponentMap = new HashMap<>();
+ for(String sensorType: sensorTypes) {
+ if (parserConfigSupplier.get().getSensorParserConfig(sensorType) == null) {
+ throw new IllegalStateException(String.format("Could not initialize parsers. Cannot find configuration for sensor %s.",
+ sensorType));
+ }
+
+ SensorParserConfig parserConfig = parserConfigSupplier.get().getSensorParserConfig(sensorType);
+
+ LOG.info("Creating parser for sensor {} with parser class = {} and filter class = {} ",
+ sensorType, parserConfig.getParserClassName(), parserConfig.getFilterClassName());
+
+ // create message parser
+ MessageParser<JSONObject> parser = ReflectionUtils
+ .createInstance(parserConfig.getParserClassName());
+
+ // create message filter
+ MessageFilter<JSONObject> filter = null;
+ parserConfig.getParserConfig().putIfAbsent("stellarContext", stellarContext);
+ if (!StringUtils.isEmpty(parserConfig.getFilterClassName())) {
+ filter = Filters.get(
+ parserConfig.getFilterClassName(),
+ parserConfig.getParserConfig()
+ );
+ }
+
+ parser.configure(parserConfig.getParserConfig());
+ parser.init();
+ sensorToParserComponentMap.put(sensorType, new ParserComponent(parser, filter));
+ }
+ }
+
+ /**
+ * Post-processes parsed messages by:
+ * <ul>
+ * <li>Applying field transformations defined in the sensor parser config</li>
+ * <li>Filtering messages using the configured MessageFilter class</li>
+ * <li>Validating messages using the MessageParser validate method</li>
+ * </ul>
+ * If a message is successfully processed a message is returned in a ProcessResult. If a message fails
+ * validation, a MetronError object is created and returned in a ProcessResult. If a message is
+ * filtered out an empty Optional is returned.
+ *
+ * @param sensorType Sensor type of the message
+ * @param message Message parsed by the MessageParser
+ * @param rawMessage Raw message including metadata
+ * @param parser MessageParser for the sensor type
+ * @param parserConfigurations Parser configurations
+ */
+ @SuppressWarnings("unchecked")
+ protected Optional<ProcessResult> processMessage(String sensorType, JSONObject message, RawMessage rawMessage,
+ MessageParser<JSONObject> parser,
+ ParserConfigurations parserConfigurations
+ ) {
+ Optional<ProcessResult> processResult = Optional.empty();
+ SensorParserConfig sensorParserConfig = parserConfigurations.getSensorParserConfig(sensorType);
+ sensorParserConfig.getRawMessageStrategy().mergeMetadata(
+ message,
+ rawMessage.getMetadata(),
+ sensorParserConfig.getMergeMetadata(),
+ sensorParserConfig.getRawMessageStrategyConfig()
+ );
+ message.put(Constants.SENSOR_TYPE, sensorType);
+ applyFieldTransformations(message, rawMessage, sensorParserConfig);
+ if (!message.containsKey(Constants.GUID)) {
+ message.put(Constants.GUID, UUID.randomUUID().toString());
+ }
+ MessageFilter<JSONObject> filter = sensorToParserComponentMap.get(sensorType).getFilter();
+ if (filter == null || filter.emit(message, stellarContext)) {
+ boolean isInvalid = !parser.validate(message);
+ List<FieldValidator> failedValidators = null;
+ if (!isInvalid) {
+ failedValidators = getFailedValidators(message, parserConfigurations);
+ isInvalid = !failedValidators.isEmpty();
+ }
+ if (isInvalid) {
+ MetronError error = new MetronError()
+ .withErrorType(Constants.ErrorType.PARSER_INVALID)
+ .withSensorType(Collections.singleton(sensorType))
+ .addRawMessage(message);
+ Set<String> errorFields = failedValidators == null ? null : failedValidators.stream()
+ .flatMap(fieldValidator -> fieldValidator.getInput().stream())
+ .collect(Collectors.toSet());
+ if (errorFields != null && !errorFields.isEmpty()) {
+ error.withErrorFields(errorFields);
+ }
+ processResult = Optional.of(new ProcessResult(error));
+ } else {
+ processResult = Optional.of(new ProcessResult(message));
+ }
+ }
+ return processResult;
+ }
+
+ /**
+ * Applies Stellar field transformations defined in the sensor parser config.
+ * @param message Message parsed by the MessageParser
+ * @param rawMessage Raw message including metadata
+ * @param sensorParserConfig Sensor parser config
+ */
+ private void applyFieldTransformations(JSONObject message, RawMessage rawMessage, SensorParserConfig sensorParserConfig) {
+ for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) {
+ if (handler != null) {
+ if (!sensorParserConfig.getMergeMetadata()) {
+ //if we haven't merged metadata, then we need to pass them along as configuration params.
+ handler.transformAndUpdate(
+ message,
+ stellarContext,
+ sensorParserConfig.getParserConfig(),
+ rawMessage.getMetadata()
+ );
+ } else {
+ handler.transformAndUpdate(
+ message,
+ stellarContext,
+ sensorParserConfig.getParserConfig()
+ );
+ }
+ }
+ }
+ }
+
+ private List<FieldValidator> getFailedValidators(JSONObject message, ParserConfigurations parserConfigurations) {
+ List<FieldValidator> fieldValidations = parserConfigurations.getFieldValidations();
+ List<FieldValidator> failedValidators = new ArrayList<>();
+ for(FieldValidator validator : fieldValidations) {
+ if(!validator.isValid(message, parserConfigurations.getGlobalConfig(), stellarContext)) {
+ failedValidators.add(validator);
+ }
+ }
+ return failedValidators;
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java
new file mode 100644
index 0000000..7ca853c
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers;
+
+import org.apache.metron.common.error.MetronError;
+
+import java.util.List;
+
+/**
+ * Container for the results of parsing a message with a ParserRunner.
+ * @param <T> The type of a successfully parsed message.
+ */
+public interface ParserRunnerResults<T> {
+
+ List<T> getMessages();
+
+ List<MetronError> getErrors();
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index 05334c2..a9ee305 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -18,13 +18,18 @@
package org.apache.metron.parsers.bolt;
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.Function;
import com.github.benmanes.caffeine.cache.Cache;
-import org.apache.commons.lang3.StringUtils;
import org.apache.metron.common.Constants;
import org.apache.metron.common.bolt.ConfiguredParserBolt;
-import org.apache.metron.common.configuration.FieldTransformer;
-import org.apache.metron.common.configuration.FieldValidator;
+import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.metron.common.error.MetronError;
@@ -33,12 +38,8 @@ import org.apache.metron.common.message.MessageGetters;
import org.apache.metron.common.message.metadata.RawMessage;
import org.apache.metron.common.message.metadata.RawMessageUtil;
import org.apache.metron.common.utils.ErrorUtils;
-import org.apache.metron.parsers.filters.Filters;
-import org.apache.metron.parsers.interfaces.MessageFilter;
-import org.apache.metron.parsers.interfaces.MessageParser;
-import org.apache.metron.parsers.interfaces.MessageParserResult;
-import org.apache.metron.parsers.interfaces.MultilineMessageParser;
-import org.apache.metron.parsers.topology.ParserComponents;
+import org.apache.metron.parsers.ParserRunner;
+import org.apache.metron.parsers.ParserRunnerResults;
import org.apache.metron.stellar.common.CachingStellarProcessor;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.StellarFunctions;
@@ -56,45 +57,32 @@ import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Serializable;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
public class ParserBolt extends ConfiguredParserBolt implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private OutputCollector collector;
- private Map<String, ParserComponents> sensorToComponentMap;
+ private ParserRunner<JSONObject> parserRunner;
+ private Map<String, WriterHandler> sensorToWriterMap;
private Map<String, String> topicToSensorMap = new HashMap<>();
- private Context stellarContext;
private transient MessageGetStrategy messageGetStrategy;
- private transient Cache<CachingStellarProcessor.Key, Object> cache;
private int requestedTickFreqSecs;
private int defaultBatchTimeout;
private int batchTimeoutDivisor = 1;
public ParserBolt( String zookeeperUrl
- , Map<String, ParserComponents> sensorToComponentMap
+ , ParserRunner parserRunner
+ , Map<String, WriterHandler> sensorToWriterMap
) {
super(zookeeperUrl);
- this.sensorToComponentMap = sensorToComponentMap;
+ this.parserRunner = parserRunner;
+ this.sensorToWriterMap = sensorToWriterMap;
// Ensure that all sensors are either bulk sensors or not bulk sensors. Can't mix and match.
Boolean handleAcks = null;
- for (Map.Entry<String, ParserComponents> entry : sensorToComponentMap.entrySet()) {
- boolean writerHandleAck = entry.getValue().getWriter().handleAck();
+ for (Map.Entry<String, WriterHandler> entry : sensorToWriterMap.entrySet()) {
+ boolean writerHandleAck = entry.getValue().handleAck();
if (handleAcks == null) {
handleAcks = writerHandleAck;
} else if (!handleAcks.equals(writerHandleAck)) {
@@ -130,21 +118,44 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
/**
* Used only for unit testing
- * @param defaultBatchTimeout
*/
- protected void setDefaultBatchTimeout(int defaultBatchTimeout) {
- this.defaultBatchTimeout = defaultBatchTimeout;
+ public int getBatchTimeoutDivisor() {
+ return batchTimeoutDivisor;
}
/**
* Used only for unit testing
*/
- public int getDefaultBatchTimeout() {
- return defaultBatchTimeout;
+ protected void setSensorToWriterMap(Map<String, WriterHandler> sensorToWriterMap) {
+ this.sensorToWriterMap = sensorToWriterMap;
}
- public Map<String, ParserComponents> getSensorToComponentMap() {
- return sensorToComponentMap;
+ /**
+ * Used only for unit testing
+ */
+ protected Map<String, String> getTopicToSensorMap() {
+ return topicToSensorMap;
+ }
+
+ /**
+ * Used only for unit testing
+ */
+ protected void setTopicToSensorMap(Map<String, String> topicToSensorMap) {
+ this.topicToSensorMap = topicToSensorMap;
+ }
+
+ /**
+ * Used only for unit testing
+ */
+ public void setMessageGetStrategy(MessageGetStrategy messageGetStrategy) {
+ this.messageGetStrategy = messageGetStrategy;
+ }
+
+ /**
+ * Used only for unit testing
+ */
+ public void setOutputCollector(OutputCollector collector) {
+ this.collector = collector;
}
/**
@@ -159,7 +170,7 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
// to get the valid WriterConfiguration. But don't store any non-serializable objects,
// else Storm will throw a runtime error.
Function<WriterConfiguration, WriterConfiguration> configurationXform;
- WriterHandler writer = sensorToComponentMap.entrySet().iterator().next().getValue().getWriter();
+ WriterHandler writer = sensorToWriterMap.entrySet().iterator().next().getValue();
if (writer.isWriterToBulkWriter()) {
configurationXform = WriterToBulkWriter.TRANSFORMATION;
} else {
@@ -189,37 +200,11 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
super.prepare(stormConf, context, collector);
messageGetStrategy = MessageGetters.DEFAULT_BYTES_FROM_POSITION.get();
this.collector = collector;
-
- // Build the Stellar cache
- Map<String, Object> cacheConfig = new HashMap<>();
- for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) {
- String sensor = entry.getKey();
- SensorParserConfig config = getSensorParserConfig(sensor);
-
- if (config != null) {
- cacheConfig.putAll(config.getCacheConfig());
- }
- }
- cache = CachingStellarProcessor.createCache(cacheConfig);
+ this.parserRunner.init(this::getConfigurations, initializeStellar());
// Need to prep all sensors
- for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) {
+ for (Map.Entry<String, WriterHandler> entry: sensorToWriterMap.entrySet()) {
String sensor = entry.getKey();
- MessageParser<JSONObject> parser = entry.getValue().getMessageParser();
-
- initializeStellar();
- if (getSensorParserConfig(sensor) != null && sensorToComponentMap.get(sensor).getFilter() == null) {
- getSensorParserConfig(sensor).getParserConfig().putIfAbsent("stellarContext", stellarContext);
- if (!StringUtils.isEmpty(getSensorParserConfig(sensor).getFilterClassName())) {
- MessageFilter<JSONObject> filter = Filters.get(
- getSensorParserConfig(sensor).getFilterClassName(),
- getSensorParserConfig(sensor).getParserConfig()
- );
- getSensorToComponentMap().get(sensor).setFilter(filter);
- }
- }
-
- parser.init();
SensorParserConfig config = getSensorParserConfig(sensor);
if (config != null) {
@@ -229,9 +214,8 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
throw new IllegalStateException(
"Unable to retrieve a parser config for " + sensor);
}
- parser.configure(config.getParserConfig());
- WriterHandler writer = sensorToComponentMap.get(sensor).getWriter();
+ WriterHandler writer = sensorToWriterMap.get(sensor);
writer.init(stormConf, context, collector, getConfigurations());
if (defaultBatchTimeout == 0) {
//This means getComponentConfiguration was never called to initialize defaultBatchTimeout,
@@ -246,225 +230,106 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
}
}
- protected void initializeStellar() {
- Context.Builder builder = new Context.Builder()
- .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
- .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig())
- .with(Context.Capabilities.STELLAR_CONFIG, () -> getConfigurations().getGlobalConfig())
- ;
- if(cache != null) {
- builder = builder.with(Context.Capabilities.CACHE, () -> cache);
- }
- this.stellarContext = builder.build();
- StellarFunctions.initialize(stellarContext);
- }
-
@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {
if (TupleUtils.isTick(tuple)) {
- try {
- for (Entry<String, ParserComponents> entry : sensorToComponentMap.entrySet()) {
- entry.getValue().getWriter().flush(getConfigurations(), messageGetStrategy);
- }
- } catch (Exception e) {
- throw new RuntimeException(
- "This should have been caught in the writerHandler. If you see this, file a JIRA", e);
- } finally {
- collector.ack(tuple);
- }
+ handleTickTuple(tuple);
return;
}
-
byte[] originalMessage = (byte[]) messageGetStrategy.get(tuple);
+ String topic = tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName());
+ String sensorType = topicToSensorMap.get(topic);
try {
- SensorParserConfig sensorParserConfig;
- MessageParser<JSONObject> parser;
- String sensor;
- Map<String, Object> metadata;
- if (sensorToComponentMap.size() == 1) {
- // There's only one parser, so grab info directly
- Entry<String, ParserComponents> sensorParser = sensorToComponentMap.entrySet().iterator()
- .next();
- sensor = sensorParser.getKey();
- parser = sensorParser.getValue().getMessageParser();
- sensorParserConfig = getSensorParserConfig(sensor);
- } else {
- // There's multiple parsers, so pull the topic from the Tuple and look up the sensor
- String topic = tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName());
- sensor = topicToSensorMap.get(topic);
- parser = sensorToComponentMap.get(sensor).getMessageParser();
- sensorParserConfig = getSensorParserConfig(sensor);
- }
+ ParserConfigurations parserConfigurations = getConfigurations();
+ SensorParserConfig sensorParserConfig = parserConfigurations.getSensorParserConfig(sensorType);
+ RawMessage rawMessage = RawMessageUtil.INSTANCE.getRawMessage( sensorParserConfig.getRawMessageStrategy()
+ , tuple
+ , originalMessage
+ , sensorParserConfig.getReadMetadata()
+ , sensorParserConfig.getRawMessageStrategyConfig()
+ );
+ ParserRunnerResults<JSONObject> parserRunnerResults = parserRunner.execute(sensorType, rawMessage, parserConfigurations);
+ long numWritten = parserRunnerResults.getMessages().stream()
+ .map(message -> handleMessage(sensorType, originalMessage, tuple, message, collector))
+ .filter(result -> result)
+ .count();
+ parserRunnerResults.getErrors().forEach(error -> ErrorUtils.handleError(collector, error));
- List<FieldValidator> fieldValidations = getConfigurations().getFieldValidations();
- boolean ackTuple = false;
- int numWritten = 0;
- if (sensorParserConfig != null) {
- RawMessage rawMessage = RawMessageUtil.INSTANCE.getRawMessage( sensorParserConfig.getRawMessageStrategy()
- , tuple
- , originalMessage
- , sensorParserConfig.getReadMetadata()
- , sensorParserConfig.getRawMessageStrategyConfig()
- );
-
- metadata = rawMessage.getMetadata();
-
- MultilineMessageParser mmp = null;
- if (!(parser instanceof MultilineMessageParser)) {
- mmp = new MultilineMessageParser<JSONObject>() {
-
- @Override
- public void configure(Map<String, Object> config) {
- parser.configure(config);
- }
-
- @Override
- public void init() {
- parser.init();
- }
-
- @Override
- public boolean validate(JSONObject message) {
- return parser.validate(message);
- }
-
- @Override
- public List<JSONObject> parse(byte[] message) {
- return parser.parse(message);
- }
-
- @Override
- public Optional<List<JSONObject>> parseOptional(byte[] message) {
- return parser.parseOptional(message);
- }
- };
- } else {
- mmp = (MultilineMessageParser) parser;
- }
-
- Optional<MessageParserResult<JSONObject>> results = mmp.parseOptionalResult(rawMessage.getMessage());
-
- // check if there is a master error
- if (results.isPresent() && results.get().getMasterThrowable().isPresent()) {
- handleError(originalMessage, tuple, results.get().getMasterThrowable().get(), collector);
- return;
- }
-
- // Handle the message results
- List<JSONObject> messages = results.isPresent() ? results.get().getMessages() : Collections.EMPTY_LIST;
- for (JSONObject message : messages) {
- //we want to ack the tuple in the situation where we have are not doing a bulk write
- //otherwise we want to defer to the writerComponent who will ack on bulk commit.
- WriterHandler writer = sensorToComponentMap.get(sensor).getWriter();
- ackTuple = !writer.handleAck();
-
- sensorParserConfig.getRawMessageStrategy().mergeMetadata(
- message,
- metadata,
- sensorParserConfig.getMergeMetadata(),
- sensorParserConfig.getRawMessageStrategyConfig()
- );
- message.put(Constants.SENSOR_TYPE, sensor);
-
- for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) {
- if (handler != null) {
- if (!sensorParserConfig.getMergeMetadata()) {
- //if we haven't merged metadata, then we need to pass them along as configuration params.
- handler.transformAndUpdate(
- message,
- stellarContext,
- sensorParserConfig.getParserConfig(),
- metadata
- );
- } else {
- handler.transformAndUpdate(
- message,
- stellarContext,
- sensorParserConfig.getParserConfig()
- );
- }
- }
- }
- if (!message.containsKey(Constants.GUID)) {
- message.put(Constants.GUID, UUID.randomUUID().toString());
- }
-
- MessageFilter<JSONObject> filter = sensorToComponentMap.get(sensor).getFilter();
- if (filter == null || filter.emitTuple(message, stellarContext)) {
- boolean isInvalid = !parser.validate(message);
- List<FieldValidator> failedValidators = null;
- if (!isInvalid) {
- failedValidators = getFailedValidators(message, fieldValidations);
- isInvalid = !failedValidators.isEmpty();
- }
- if (isInvalid) {
- MetronError error = new MetronError()
- .withErrorType(Constants.ErrorType.PARSER_INVALID)
- .withSensorType(Collections.singleton(sensor))
- .addRawMessage(message);
- Set<String> errorFields = failedValidators == null ? null : failedValidators.stream()
- .flatMap(fieldValidator -> fieldValidator.getInput().stream())
- .collect(Collectors.toSet());
- if (errorFields != null && !errorFields.isEmpty()) {
- error.withErrorFields(errorFields);
- }
- ErrorUtils.handleError(collector, error);
- } else {
- numWritten++;
- writer.write(sensor, tuple, message, getConfigurations(), messageGetStrategy);
- }
- }
- }
-
- // Handle the error results
- Map<Object, Throwable> messageErrors = results.isPresent()
- ? results.get().getMessageThrowables() : Collections.EMPTY_MAP;
-
- for (Entry<Object,Throwable> entry : messageErrors.entrySet()) {
- MetronError error = new MetronError()
- .withErrorType(Constants.ErrorType.PARSER_ERROR)
- .withThrowable(entry.getValue())
- .withSensorType(sensorToComponentMap.keySet())
- .addRawMessage(originalMessage);
- ErrorUtils.handleError(collector, error);
- }
- }
//if we are supposed to ack the tuple OR if we've never passed this tuple to the bulk writer
//(meaning that none of the messages are valid either globally or locally)
//then we want to handle the ack ourselves.
- if (ackTuple || numWritten == 0) {
+ if (!sensorToWriterMap.get(sensorType).handleAck() || numWritten == 0) {
collector.ack(tuple);
}
} catch (Throwable ex) {
- handleError(originalMessage, tuple, ex, collector);
+ handleError(sensorType, originalMessage, tuple, ex, collector);
+ collector.ack(tuple);
}
}
- protected void handleError(byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) {
+ protected Context initializeStellar() {
+ Map<String, Object> cacheConfig = new HashMap<>();
+ for (String sensorType: this.parserRunner.getSensorTypes()) {
+ SensorParserConfig config = getSensorParserConfig(sensorType);
+
+ if (config != null) {
+ cacheConfig.putAll(config.getCacheConfig());
+ }
+ }
+ Cache<CachingStellarProcessor.Key, Object> cache = CachingStellarProcessor.createCache(cacheConfig);
+
+ Context.Builder builder = new Context.Builder()
+ .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
+ .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig())
+ .with(Context.Capabilities.STELLAR_CONFIG, () -> getConfigurations().getGlobalConfig())
+ ;
+ if(cache != null) {
+ builder = builder.with(Context.Capabilities.CACHE, () -> cache);
+ }
+ Context stellarContext = builder.build();
+ StellarFunctions.initialize(stellarContext);
+ return stellarContext;
+ }
+
+ protected void handleTickTuple(Tuple tuple) {
+ try {
+ for (Entry<String, WriterHandler> entry : sensorToWriterMap.entrySet()) {
+ entry.getValue().flush(getConfigurations(), messageGetStrategy);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "This should have been caught in the writerHandler. If you see this, file a JIRA", e);
+ } finally {
+ collector.ack(tuple);
+ }
+ }
+
+ protected boolean handleMessage(String sensorType, byte[] originalMessage, Tuple tuple, JSONObject message, OutputCollector collector) {
+ WriterHandler writer = sensorToWriterMap.get(sensorType);
+ try {
+ writer.write(sensorType, tuple, message, getConfigurations(), messageGetStrategy);
+ return true;
+ } catch (Exception ex) {
+ handleError(sensorType, originalMessage, tuple, ex, collector);
+ return false;
+ }
+ }
+
+ protected void handleError(String sensorType, byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) {
MetronError error = new MetronError()
.withErrorType(Constants.ErrorType.PARSER_ERROR)
.withThrowable(ex)
- .withSensorType(sensorToComponentMap.keySet())
+ .withSensorType(Collections.singleton(sensorType))
.addRawMessage(originalMessage);
ErrorUtils.handleError(collector, error);
- collector.ack(tuple);
- }
-
- private List<FieldValidator> getFailedValidators(JSONObject input, List<FieldValidator> validators) {
- List<FieldValidator> failedValidators = new ArrayList<>();
- for(FieldValidator validator : validators) {
- if(!validator.isValid(input, getConfigurations().getGlobalConfig(), stellarContext)) {
- failedValidators.add(validator);
- }
- }
- return failedValidators;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(Constants.ERROR_STREAM, new Fields("message"));
}
+
}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
index 9cdafa3..1fa1feb 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
@@ -67,7 +67,7 @@ public class BroMessageFilter implements MessageFilter<JSONObject>{
*/
@Override
- public boolean emitTuple(JSONObject message, Context context) {
+ public boolean emit(JSONObject message, Context context) {
String protocol = (String) message.get(_key);
return _known_protocols.contains(protocol);
}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java
index 15a035a..8300ff4 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java
@@ -54,7 +54,7 @@ public class StellarFilter implements MessageFilter<JSONObject> {
}
@Override
- public boolean emitTuple(JSONObject message, Context context) {
+ public boolean emit(JSONObject message, Context context) {
VariableResolver resolver = new MapVariableResolver(message);
return processor.parse(query, resolver, functionResolver, context);
}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
index b7b91c0..207c070 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
@@ -21,5 +21,5 @@ import org.apache.metron.stellar.dsl.Context;
public interface MessageFilter<T> extends Configurable{
- boolean emitTuple(T message, Context context);
+ boolean emit(T message, Context context);
}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
index 665076b..c9f8351 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
@@ -17,6 +17,7 @@
*/
package org.apache.metron.parsers.interfaces;
+import org.apache.commons.lang3.NotImplementedException;
import org.apache.metron.parsers.DefaultMessageParserResult;
import java.io.Serializable;
@@ -38,18 +39,42 @@ public interface MessageParser<T> extends Configurable {
* @param rawMessage the raw bytes of the message
* @return If null is returned, this is treated as an empty list.
*/
- List<T> parse(byte[] rawMessage);
+ @Deprecated
+ default List<T> parse(byte[] rawMessage) {
+ throw new NotImplementedException("parse is not implemented");
+ }
/**
* Take raw data and convert it to an optional list of messages.
* @param parseMessage the raw bytes of the message
* @return If null is returned, this is treated as an empty list.
*/
+ @Deprecated
default Optional<List<T>> parseOptional(byte[] parseMessage) {
return Optional.ofNullable(parse(parseMessage));
}
/**
+ * Take raw data and convert it to messages. Each raw message may produce multiple messages and therefore
+ * multiple errors. A {@link MessageParserResult} is returned, which will have both the messages produced
+ * and the errors.
+ * @param parseMessage the raw bytes of the message
+ * @return Optional of {@link MessageParserResult}
+ */
+ default Optional<MessageParserResult<T>> parseOptionalResult(byte[] parseMessage) {
+ Optional<MessageParserResult<T>> result = Optional.empty();
+ try {
+ Optional<List<T>> optionalMessages = parseOptional(parseMessage);
+ if (optionalMessages.isPresent()) {
+ result = Optional.of(new DefaultMessageParserResult<>(optionalMessages.get()));
+ }
+ } catch (Throwable t) {
+ return Optional.of(new DefaultMessageParserResult<>(t));
+ }
+ return result;
+ }
+
+ /**
* Validate the message to ensure that it's correct.
* @param message the message to validate
* @return true if the message is valid, false if not
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java
deleted file mode 100644
index 7818f9a..0000000
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.parsers.interfaces;
-
-import org.apache.commons.lang3.NotImplementedException;
-import org.apache.metron.parsers.DefaultMessageParserResult;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-
-public interface MultilineMessageParser<T> extends MessageParser<T> {
-
- default List<T> parse(byte[] rawMessage) {
- throw new NotImplementedException("parse is not implemented");
- }
-
- /**
- * Take raw data and convert it to messages. Each raw message may produce multiple messages and therefore
- * multiple errors. A {@link MessageParserResult} is returned, which will have both the messages produced
- * and the errors.
- * @param parseMessage the raw bytes of the message
- * @return Optional of {@link MessageParserResult}
- */
- default Optional<MessageParserResult<T>> parseOptionalResult(byte[] parseMessage) {
- List<T> list = new ArrayList<>();
- try {
- Optional<List<T>> optionalMessages = parseOptional(parseMessage);
- optionalMessages.ifPresent(list::addAll);
- } catch (Throwable t) {
- return Optional.of(new DefaultMessageParserResult<>(t));
- }
- return Optional.of(new DefaultMessageParserResult<T>(list));
- }
-}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
index 79a082a..5b62e85 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
@@ -26,8 +26,8 @@ import com.github.palindromicity.syslog.dsl.SyslogFieldKeys;
import org.apache.commons.lang3.StringUtils;
import org.apache.metron.parsers.BasicParser;
import org.apache.metron.parsers.DefaultMessageParserResult;
+import org.apache.metron.parsers.interfaces.MessageParser;
import org.apache.metron.parsers.interfaces.MessageParserResult;
-import org.apache.metron.parsers.interfaces.MultilineMessageParser;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,7 +52,7 @@ import java.util.Optional;
/**
* Parser for well structured RFC 5424 messages.
*/
-public class Syslog5424Parser implements MultilineMessageParser<JSONObject>, Serializable {
+public class Syslog5424Parser implements MessageParser<JSONObject>, Serializable {
protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String NIL_POLICY_CONFIG = "nilPolicy";
private transient SyslogParser syslogParser;
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java
new file mode 100644
index 0000000..eb5ff9f
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers.topology;
+
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.json.simple.JSONObject;
+
+import java.io.Serializable;
+
+public class ParserComponent implements Serializable {
+ private static final long serialVersionUID = 7880346740026374665L;
+
+ private MessageParser<JSONObject> messageParser;
+ private MessageFilter<JSONObject> filter;
+
+ public ParserComponent(
+ MessageParser<JSONObject> messageParser,
+ MessageFilter<JSONObject> filter) {
+ this.messageParser = messageParser;
+ this.filter = filter;
+ }
+
+ public MessageParser<JSONObject> getMessageParser() {
+ return messageParser;
+ }
+
+ public MessageFilter<JSONObject> getFilter() {
+ return filter;
+ }
+
+ public void setMessageParser(
+ MessageParser<JSONObject> messageParser) {
+ this.messageParser = messageParser;
+ }
+
+ public void setFilter(
+ MessageFilter<JSONObject> filter) {
+ this.filter = filter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java
deleted file mode 100644
index 32d56b9..0000000
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.parsers.topology;
-
-import java.io.Serializable;
-import org.apache.metron.parsers.bolt.WriterHandler;
-import org.apache.metron.parsers.interfaces.MessageFilter;
-import org.apache.metron.parsers.interfaces.MessageParser;
-import org.json.simple.JSONObject;
-
-public class ParserComponents implements Serializable {
- private static final long serialVersionUID = 7880346740026374665L;
-
- private MessageParser<JSONObject> messageParser;
- private MessageFilter<JSONObject> filter;
- private WriterHandler writer;
-
- public ParserComponents(
- MessageParser<JSONObject> messageParser,
- MessageFilter<JSONObject> filter,
- WriterHandler writer) {
- this.messageParser = messageParser;
- this.filter = filter;
- this.writer = writer;
- }
-
- public MessageParser<JSONObject> getMessageParser() {
- return messageParser;
- }
-
- public MessageFilter<JSONObject> getFilter() {
- return filter;
- }
-
- public WriterHandler getWriter() {
- return writer;
- }
-
- public void setMessageParser(
- MessageParser<JSONObject> messageParser) {
- this.messageParser = messageParser;
- }
-
- public void setFilter(
- MessageFilter<JSONObject> filter) {
- this.filter = filter;
- }
-
- public void setWriter(WriterHandler writer) {
- this.writer = writer;
- }
-}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index d20e1a5..9dc7b88 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,11 +21,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
-import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.metron.common.Constants;
@@ -37,12 +37,10 @@ import org.apache.metron.common.utils.KafkaUtils;
import org.apache.metron.common.utils.ReflectionUtils;
import org.apache.metron.common.writer.BulkMessageWriter;
import org.apache.metron.common.writer.MessageWriter;
+import org.apache.metron.parsers.ParserRunnerImpl;
import org.apache.metron.parsers.bolt.ParserBolt;
import org.apache.metron.parsers.bolt.WriterBolt;
import org.apache.metron.parsers.bolt.WriterHandler;
-import org.apache.metron.parsers.filters.Filters;
-import org.apache.metron.parsers.interfaces.MessageFilter;
-import org.apache.metron.parsers.interfaces.MessageParser;
import org.apache.metron.parsers.topology.config.ValueSupplier;
import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder;
import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
@@ -268,29 +266,14 @@ public class ParserTopologyBuilder {
Optional<String> securityProtocol,
ParserConfigurations configs,
Optional<String> outputTopic) {
-
- Map<String, ParserComponents> parserBoltConfigs = new HashMap<>();
+ Map<String, WriterHandler> writerConfigs = new HashMap<>();
for( Entry<String, SensorParserConfig> entry : sensorTypeToParserConfig.entrySet()) {
String sensorType = entry.getKey();
SensorParserConfig parserConfig = entry.getValue();
- // create message parser
- MessageParser<JSONObject> parser = ReflectionUtils
- .createInstance(parserConfig.getParserClassName());
- parser.configure(parserConfig.getParserConfig());
-
- // create message filter
- MessageFilter<JSONObject> filter = null;
- if (!StringUtils.isEmpty(parserConfig.getFilterClassName())) {
- filter = Filters.get(
- parserConfig.getFilterClassName(),
- parserConfig.getParserConfig()
- );
- }
// create a writer
AbstractWriter writer;
if (parserConfig.getWriterClassName() == null) {
-
// if not configured, use a sensible default
writer = createKafkaWriter(brokerUrl, zookeeperUrl, securityProtocol)
.withTopic(outputTopic.orElse(Constants.ENRICHMENT_TOPIC));
@@ -304,16 +287,10 @@ public class ParserTopologyBuilder {
// create a writer handler
WriterHandler writerHandler = createWriterHandler(writer);
-
- ParserComponents components = new ParserComponents(
- parser,
- filter,
- writerHandler
- );
- parserBoltConfigs.put(sensorType, components);
+ writerConfigs.put(sensorType, writerHandler);
}
- return new ParserBolt(zookeeperUrl, parserBoltConfigs);
+ return new ParserBolt(zookeeperUrl, new ParserRunnerImpl(new HashSet<>(sensorTypeToParserConfig.keySet())), writerConfigs);
}
/**
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
index 8441409..2f3784a 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
@@ -42,8 +42,8 @@ public class FiltersTest {
put("filter.query", "exists(foo)");
}};
MessageFilter<JSONObject> filter = Filters.get(Filters.STELLAR.name(), config);
- Assert.assertTrue(filter.emitTuple(new JSONObject(ImmutableMap.of("foo", 1)), Context.EMPTY_CONTEXT()));
- Assert.assertFalse(filter.emitTuple(new JSONObject(ImmutableMap.of("bar", 1)), Context.EMPTY_CONTEXT()));
+ Assert.assertTrue(filter.emit(new JSONObject(ImmutableMap.of("foo", 1)), Context.EMPTY_CONTEXT()));
+ Assert.assertFalse(filter.emit(new JSONObject(ImmutableMap.of("bar", 1)), Context.EMPTY_CONTEXT()));
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java
index 9769baa..4842a1f 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java
@@ -19,68 +19,118 @@
package org.apache.metron.parsers;
import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.json.simple.JSONObject;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class MessageParserTest {
- @Test
- public void testNullable() throws Exception {
- MessageParser parser = new MessageParser() {
- @Override
- public void init() {
- }
+ abstract class TestMessageParser implements MessageParser<JSONObject> {
+ @Override
+ public void init() {
+ }
- @Override
- public List parse(byte[] rawMessage) {
- return null;
- }
+ @Override
+ public boolean validate(JSONObject message) {
+ return false;
+ }
- @Override
- public boolean validate(Object message) {
- return false;
- }
+ @Override
+ public void configure(Map<String, Object> config) {
- @Override
- public void configure(Map<String, Object> config) {
+ }
+ }
+ @Test
+ public void testNullable() throws Exception {
+ MessageParser parser = new TestMessageParser() {
+ @Override
+ public List<JSONObject> parse(byte[] rawMessage) {
+ return null;
}
};
- Assert.assertNotNull(parser.parseOptional(null));
- Assert.assertFalse(parser.parseOptional(null).isPresent());
+ Assert.assertNotNull(parser.parseOptionalResult(null));
+ Assert.assertFalse(parser.parseOptionalResult(null).isPresent());
}
@Test
public void testNotNullable() throws Exception {
- MessageParser parser = new MessageParser() {
+ MessageParser<JSONObject> parser = new TestMessageParser() {
@Override
- public void init() {
-
+ public List<JSONObject> parse(byte[] rawMessage) {
+ return new ArrayList<>();
}
+ };
+ Assert.assertNotNull(parser.parseOptionalResult(null));
+ Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult(null);
+ Assert.assertTrue(ret.isPresent());
+ Assert.assertEquals(0, ret.get().getMessages().size());
+ }
+ @Test
+ public void testParse() {
+ JSONObject message = new JSONObject();
+ MessageParser<JSONObject> parser = new TestMessageParser() {
@Override
- public List parse(byte[] rawMessage) {
- return new ArrayList<>();
+ public List<JSONObject> parse(byte[] rawMessage) {
+ return Collections.singletonList(message);
}
+ };
+ Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult("message".getBytes());
+ Assert.assertTrue(ret.isPresent());
+ Assert.assertEquals(1, ret.get().getMessages().size());
+ Assert.assertEquals(message, ret.get().getMessages().get(0));
+ }
+ @Test
+ public void testParseOptional() {
+ JSONObject message = new JSONObject();
+ MessageParser<JSONObject> parser = new TestMessageParser() {
@Override
- public boolean validate(Object message) {
- return false;
+ public Optional<List<JSONObject>> parseOptional(byte[] rawMessage) {
+ return Optional.of(Collections.singletonList(message));
}
+ };
+ Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult("message".getBytes());
+ Assert.assertTrue(ret.isPresent());
+ Assert.assertEquals(1, ret.get().getMessages().size());
+ Assert.assertEquals(message, ret.get().getMessages().get(0));
+ }
+ @Test
+ public void testParseException() {
+ MessageParser<JSONObject> parser = new TestMessageParser() {
@Override
- public void configure(Map<String, Object> config) {
+ public List<JSONObject> parse(byte[] rawMessage) {
+ throw new RuntimeException("parse exception");
+ }
+ };
+ Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult("message".getBytes());
+ Assert.assertTrue(ret.isPresent());
+ Assert.assertTrue(ret.get().getMasterThrowable().isPresent());
+ Assert.assertEquals("parse exception", ret.get().getMasterThrowable().get().getMessage());
+ }
+ @Test
+ public void testParseOptionalException() {
+ MessageParser<JSONObject> parser = new TestMessageParser() {
+ @Override
+ public Optional<List<JSONObject>> parseOptional(byte[] rawMessage) {
+ throw new RuntimeException("parse exception");
}
};
- Assert.assertNotNull(parser.parseOptional(null));
- Optional<List> ret = parser.parseOptional(null);
+ Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult("message".getBytes());
Assert.assertTrue(ret.isPresent());
- Assert.assertEquals(0, ret.get().size());
+ Assert.assertTrue(ret.get().getMasterThrowable().isPresent());
+ Assert.assertEquals("parse exception", ret.get().getMasterThrowable().get().getMessage());
}
+
}