You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/04/18 14:59:55 UTC
[25/52] [abbrv] metron git commit: METRON-1501 Parser messages that
fail to validate are dropped silently (cestella via justinleet) closes
apache/metron#972
METRON-1501 Parser messages that fail to validate are dropped silently (cestella via justinleet) closes apache/metron#972
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/0d847cf5
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/0d847cf5
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/0d847cf5
Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: 0d847cf5f91dc0d3b3b6838eb4b4de4aa2cf2fec
Parents: 19b237d
Author: cestella <ce...@gmail.com>
Authored: Tue Apr 3 10:29:19 2018 -0400
Committer: leet <le...@apache.org>
Committed: Tue Apr 3 10:29:19 2018 -0400
----------------------------------------------------------------------
metron-platform/metron-parsers/README.md | 29 +++++++++++++++++++-
.../apache/metron/parsers/bolt/ParserBolt.java | 17 ++++++++----
.../metron/parsers/bolt/ParserBoltTest.java | 16 ++++++++---
3 files changed, 51 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/0d847cf5/metron-platform/metron-parsers/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md
index 3d9fdfe..6b9d62e 100644
--- a/metron-platform/metron-parsers/README.md
+++ b/metron-platform/metron-parsers/README.md
@@ -45,7 +45,34 @@ There are two general types types of parsers:
* `ERROR` : Throw an error when a multidimensional map is encountered
* `jsonpQuery` : A [JSON Path](#json_path) query string. If present, the result of the JSON Path query should be a list of messages. This is useful if you have a JSON document which contains a list or array of messages embedded in it, and you do not have another means of splitting the message.
* A field called `timestamp` is expected to exist and, if it does not, then current time is inserted.
-
+
+## Parser Error Routing
+
+Currently, we have a few mechanisms for either deferring processing of
+messages or marking messages as invalid.
+
+### Invalidation Errors
+
+There are two reasons a message will be marked as invalid:
+* Fail [global validation](../metron-common#validation-framework)
+* Fail the parser's validate function (generally that means to not have a `timestamp` field or a `original_string` field.
+
+Those messages which are marked as invalid are sent to the error queue
+with an indication that they are invalid in the error message.
+
+### Parser Errors
+
+Errors, which are defined as unexpected exceptions happening during the
+parse, are sent along to the error queue with a message indicating that
+there was an error in parse along with a stacktrace. This is to
+distinguish from the invalid messages.
+
+## Filtered
+
+One can also filter a message by specifying a `filterClassName` in the
+parser config. Filtered messages are just dropped rather than passed
+through.
+
## Parser Architecture
![Architecture](parser_arch.png)
http://git-wip-us.apache.org/repos/asf/metron/blob/0d847cf5/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index 6fc4ed7..e996f14 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -193,23 +193,28 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
message.put(Constants.GUID, UUID.randomUUID().toString());
}
- if (parser.validate(message) && (filter == null || filter.emitTuple(message, stellarContext))) {
- numWritten++;
- List<FieldValidator> failedValidators = getFailedValidators(message, fieldValidations);
- if(failedValidators.size() > 0) {
+ if (filter == null || filter.emitTuple(message, stellarContext)) {
+ boolean isInvalid = !parser.validate(message);
+ List<FieldValidator> failedValidators = null;
+ if(!isInvalid) {
+ failedValidators = getFailedValidators(message, fieldValidations);
+ isInvalid = !failedValidators.isEmpty();
+ }
+ if( isInvalid) {
MetronError error = new MetronError()
.withErrorType(Constants.ErrorType.PARSER_INVALID)
.withSensorType(getSensorType())
.addRawMessage(message);
- Set<String> errorFields = failedValidators.stream()
+ Set<String> errorFields = failedValidators == null?null:failedValidators.stream()
.flatMap(fieldValidator -> fieldValidator.getInput().stream())
.collect(Collectors.toSet());
- if (!errorFields.isEmpty()) {
+ if (errorFields != null && !errorFields.isEmpty()) {
error.withErrorFields(errorFields);
}
ErrorUtils.handleError(collector, error);
}
else {
+ numWritten++;
writer.write(getSensorType(), tuple, message, getConfigurations(), messageGetStrategy);
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/0d847cf5/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index 3316b32..6439b2b 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -118,6 +118,9 @@ public class ParserBoltTest extends BaseBoltTest {
}
private static ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+ return createUpdater(Optional.empty());
+ }
+ private static ConfigurationsUpdater<ParserConfigurations> createUpdater(Optional<Integer> batchSize) {
return new ConfigurationsUpdater<ParserConfigurations>(null, null) {
@Override
public void update(CuratorFramework client, String path, byte[] data) throws IOException { }
@@ -153,6 +156,9 @@ public class ParserBoltTest extends BaseBoltTest {
@Override
public Map<String, Object> getParserConfig() {
return new HashMap<String, Object>() {{
+ if(batchSize.isPresent()) {
+ put(IndexingConfigurations.BATCH_SIZE_CONF, batchSize.get());
+ }
}};
}
};
@@ -502,9 +508,9 @@ public void testImplicitBatchOfOne() throws Exception {
ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
@Override
protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
- return ParserBoltTest.createUpdater();
+ return ParserBoltTest.createUpdater(Optional.of(5));
}
- };
+ } ;
parserBolt.setCuratorFramework(client);
parserBolt.setZKCache(cache);
@@ -524,6 +530,7 @@ public void testImplicitBatchOfOne() throws Exception {
writeNonBatch(outputCollector, parserBolt, t3);
writeNonBatch(outputCollector, parserBolt, t4);
parserBolt.execute(t5);
+ verify(batchWriter, times(1)).write(eq(sensorType), any(WriterConfiguration.class), eq(tuples), any());
verify(outputCollector, times(1)).ack(t1);
verify(outputCollector, times(1)).ack(t2);
verify(outputCollector, times(1)).ack(t3);
@@ -540,7 +547,7 @@ public void testImplicitBatchOfOne() throws Exception {
ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
@Override
protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
- return ParserBoltTest.createUpdater();
+ return ParserBoltTest.createUpdater(Optional.of(5));
}
};
@@ -552,7 +559,7 @@ public void testImplicitBatchOfOne() throws Exception {
doThrow(new Exception()).when(batchWriter).write(any(), any(), any(), any());
when(parser.validate(any())).thenReturn(true);
- when(parser.parse(any())).thenReturn(ImmutableList.of(new JSONObject()));
+ when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject())));
when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
parserBolt.withMessageFilter(filter);
parserBolt.execute(t1);
@@ -560,6 +567,7 @@ public void testImplicitBatchOfOne() throws Exception {
parserBolt.execute(t3);
parserBolt.execute(t4);
parserBolt.execute(t5);
+ verify(batchWriter, times(1)).write(any(), any(), any(), any());
verify(outputCollector, times(1)).ack(t1);
verify(outputCollector, times(1)).ack(t2);
verify(outputCollector, times(1)).ack(t3);