You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2019/01/28 14:38:44 UTC
[metron] branch master updated: METRON-1948 Dropped messages from
REGEX_SELECT parser field transformation are not acked in Storm (merrimanr)
closes apache/metron#1321
This is an automated email from the ASF dual-hosted git repository.
rmerriman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push:
new 01d6688 METRON-1948 Dropped messages from REGEX_SELECT parser field transformation are not acked in Storm (merrimanr) closes apache/metron#1321
01d6688 is described below
commit 01d66880d99f5f46770b0f02755f4e9d86980cf5
Author: merrimanr <me...@gmail.com>
AuthorDate: Mon Jan 28 08:38:33 2019 -0600
METRON-1948 Dropped messages from REGEX_SELECT parser field transformation are not acked in Storm (merrimanr) closes apache/metron#1321
---
.../integration/validation/StormParserDriver.java | 18 ++++++++---
.../apache/metron/writer/BulkWriterComponent.java | 16 ++++++++--
.../metron/writer/BulkWriterComponentTest.java | 36 +++++++++++++++++++++-
3 files changed, 63 insertions(+), 7 deletions(-)
diff --git a/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/integration/validation/StormParserDriver.java b/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/integration/validation/StormParserDriver.java
index dfc454f..bfa1467 100644
--- a/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/integration/validation/StormParserDriver.java
+++ b/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/integration/validation/StormParserDriver.java
@@ -25,14 +25,20 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+
import org.apache.commons.lang.SerializationUtils;
+import org.apache.metron.common.configuration.IndexingConfigurations;
import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.writer.BulkMessageWriter;
+import org.apache.metron.common.writer.BulkWriterResponse;
import org.apache.metron.common.writer.MessageWriter;
import org.apache.metron.integration.ProcessorResult;
import org.apache.metron.parsers.bolt.ParserBolt;
import org.apache.metron.parsers.bolt.WriterHandler;
import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
@@ -41,7 +47,7 @@ import org.slf4j.LoggerFactory;
public class StormParserDriver extends ParserDriver {
private static final Logger LOG = LoggerFactory.getLogger(StormParserDriver.class);
- public static class CollectingWriter implements MessageWriter<JSONObject> {
+ public static class CollectingWriter implements BulkMessageWriter<JSONObject> {
List<byte[]> output;
public CollectingWriter(List<byte[]> output) {
@@ -49,13 +55,16 @@ public class StormParserDriver extends ParserDriver {
}
@Override
- public void init() {
+ public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration config) throws Exception {
}
@Override
- public void write(String sensorType, WriterConfiguration configurations, Tuple tuple, JSONObject message) throws Exception {
- output.add(message.toJSONString().getBytes());
+ public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
+ messages.forEach(message -> output.add(message.toJSONString().getBytes()));
+ BulkWriterResponse bulkWriterResponse = new BulkWriterResponse();
+ bulkWriterResponse.addAllSuccesses(tuples);
+ return bulkWriterResponse;
}
@Override
@@ -83,6 +92,7 @@ public class StormParserDriver extends ParserDriver {
@Override
public ParserConfigurations getConfigurations() {
+ config.getSensorParserConfig(sensorType).getParserConfig().put(IndexingConfigurations.BATCH_SIZE_CONF, 1);
return config;
}
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
index 68585c5..15e59d3 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
@@ -126,8 +127,10 @@ public class BulkWriterComponent<MESSAGE_T> {
.withThrowable(e)
.addRawMessage(messageGetStrategy.get(t));
collector.emit(Constants.ERROR_STREAM, new Values(error.getJSONObject()));
- collector.ack(t);
});
+ if (handleCommit) {
+ commit(tuples);
+ }
// there is only one error to report for all of the failed tuples
collector.reportError(e);
@@ -260,7 +263,7 @@ public class BulkWriterComponent<MESSAGE_T> {
}
}
- private void flush( String sensorType
+ protected void flush( String sensorType
, BulkMessageWriter<MESSAGE_T> bulkMessageWriter
, WriterConfiguration configurations
, MessageGetStrategy messageGetStrategy
@@ -282,6 +285,15 @@ public class BulkWriterComponent<MESSAGE_T> {
} else if (response.hasErrors()) {
throw new IllegalStateException("Unhandled bulk errors in response: " + response.getErrors());
}
+
+ // Make sure all tuples are acked by acking any tuples not returned in the BulkWriterResponse
+ if (handleCommit) {
+ Set<Tuple> tuplesToAck = new HashSet<>(tupleList);
+ tuplesToAck.removeAll(response.getSuccesses());
+ response.getErrors().values().forEach(tuplesToAck::removeAll);
+ commit(tuplesToAck);
+ }
+
} catch (Throwable e) {
if(handleError) {
error(sensorType, e, tupleList, messageGetStrategy);
diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
index decf3a5..1a05ba4 100644
--- a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
+++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
@@ -28,10 +28,13 @@ import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.powermock.api.mockito.PowerMockito.verifyStatic;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.metron.common.error.MetronError;
import org.apache.metron.common.message.MessageGetStrategy;
@@ -235,5 +238,36 @@ public class BulkWriterComponentTest {
verify(bulkMessageWriter, times(0)).write(sensorType, configurations, Collections.singletonList(tuple1), Collections.singletonList(message1));
}
-
+ @Test
+ public void flushShouldAckMissingTuples() throws Exception{
+ BulkMessageWriter<JSONObject> bulkMessageWriter = mock(BulkMessageWriter.class);
+ Tuple successTuple = mock(Tuple.class);
+ Tuple errorTuple = mock(Tuple.class);
+ Tuple missingTuple = mock(Tuple.class);
+ Collection<Tuple> tupleList = Arrays.asList(successTuple, errorTuple, missingTuple);
+ JSONObject successMessage = new JSONObject();
+ successMessage.put("name", "success");
+ JSONObject errorMessage = new JSONObject();
+ errorMessage.put("name", "error");
+ JSONObject missingMessage = new JSONObject();
+ missingMessage.put("name", "missing");
+ List<JSONObject> messageList = Arrays.asList(successMessage, errorMessage, missingMessage);
+ OutputCollector collector = mock(OutputCollector.class);
+ BulkWriterResponse bulkWriterResponse = new BulkWriterResponse();
+ bulkWriterResponse.addSuccess(successTuple);
+ Throwable throwable = mock(Throwable.class);
+ bulkWriterResponse.addError(throwable, errorTuple);
+
+ when(bulkMessageWriter.write(sensorType, configurations, tupleList, messageList)).thenReturn(bulkWriterResponse);
+
+ BulkWriterComponent bulkWriterComponent = new BulkWriterComponent(collector, true, true);
+ bulkWriterComponent.flush(sensorType, bulkMessageWriter, configurations, messageGetStrategy, tupleList, messageList);
+
+ verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM), any(Values.class));
+ verify(collector, times(1)).reportError(throwable);
+ verify(collector, times(1)).ack(successTuple);
+ verify(collector, times(1)).ack(errorTuple);
+ verify(collector, times(1)).ack(missingTuple);
+ verifyNoMoreInteractions(collector);
+ }
}