You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2019/01/28 14:38:44 UTC

[metron] branch master updated: METRON-1948 Dropped messages from REGEX_SELECT parser field transformation are not acked in Storm (merrimanr) closes apache/metron#1321

This is an automated email from the ASF dual-hosted git repository.

rmerriman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git


The following commit(s) were added to refs/heads/master by this push:
     new 01d6688  METRON-1948 Dropped messages from REGEX_SELECT parser field transformation are not acked in Storm (merrimanr) closes apache/metron#1321
01d6688 is described below

commit 01d66880d99f5f46770b0f02755f4e9d86980cf5
Author: merrimanr <me...@gmail.com>
AuthorDate: Mon Jan 28 08:38:33 2019 -0600

    METRON-1948 Dropped messages from REGEX_SELECT parser field transformation are not acked in Storm (merrimanr) closes apache/metron#1321
---
 .../integration/validation/StormParserDriver.java  | 18 ++++++++---
 .../apache/metron/writer/BulkWriterComponent.java  | 16 ++++++++--
 .../metron/writer/BulkWriterComponentTest.java     | 36 +++++++++++++++++++++-
 3 files changed, 63 insertions(+), 7 deletions(-)

diff --git a/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/integration/validation/StormParserDriver.java b/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/integration/validation/StormParserDriver.java
index dfc454f..bfa1467 100644
--- a/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/integration/validation/StormParserDriver.java
+++ b/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/integration/validation/StormParserDriver.java
@@ -25,14 +25,20 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+
 import org.apache.commons.lang.SerializationUtils;
+import org.apache.metron.common.configuration.IndexingConfigurations;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.writer.BulkMessageWriter;
+import org.apache.metron.common.writer.BulkWriterResponse;
 import org.apache.metron.common.writer.MessageWriter;
 import org.apache.metron.integration.ProcessorResult;
 import org.apache.metron.parsers.bolt.ParserBolt;
 import org.apache.metron.parsers.bolt.WriterHandler;
 import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
@@ -41,7 +47,7 @@ import org.slf4j.LoggerFactory;
 public class StormParserDriver extends ParserDriver {
   private static final Logger LOG = LoggerFactory.getLogger(StormParserDriver.class);
 
-  public static class CollectingWriter implements MessageWriter<JSONObject> {
+  public static class CollectingWriter implements BulkMessageWriter<JSONObject> {
 
     List<byte[]> output;
     public CollectingWriter(List<byte[]> output) {
@@ -49,13 +55,16 @@ public class StormParserDriver extends ParserDriver {
     }
 
     @Override
-    public void init() {
+    public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration config) throws Exception {
 
     }
 
     @Override
-    public void write(String sensorType, WriterConfiguration configurations, Tuple tuple, JSONObject message) throws Exception {
-      output.add(message.toJSONString().getBytes());
+    public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
+      messages.forEach(message -> output.add(message.toJSONString().getBytes()));
+      BulkWriterResponse bulkWriterResponse = new BulkWriterResponse();
+      bulkWriterResponse.addAllSuccesses(tuples);
+      return bulkWriterResponse;
     }
 
     @Override
@@ -83,6 +92,7 @@ public class StormParserDriver extends ParserDriver {
 
     @Override
     public ParserConfigurations getConfigurations() {
+      config.getSensorParserConfig(sensorType).getParserConfig().put(IndexingConfigurations.BATCH_SIZE_CONF, 1);
       return config;
     }
 
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
index 68585c5..15e59d3 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
@@ -126,8 +127,10 @@ public class BulkWriterComponent<MESSAGE_T> {
               .withThrowable(e)
               .addRawMessage(messageGetStrategy.get(t));
       collector.emit(Constants.ERROR_STREAM, new Values(error.getJSONObject()));
-      collector.ack(t);
     });
+    if (handleCommit) {
+      commit(tuples);
+    }
     // there is only one error to report for all of the failed tuples
     collector.reportError(e);
 
@@ -260,7 +263,7 @@ public class BulkWriterComponent<MESSAGE_T> {
     }
   }
 
-  private void flush( String sensorType
+  protected void flush( String sensorType
                     , BulkMessageWriter<MESSAGE_T> bulkMessageWriter
                     , WriterConfiguration configurations
 		                , MessageGetStrategy messageGetStrategy
@@ -282,6 +285,15 @@ public class BulkWriterComponent<MESSAGE_T> {
       } else if (response.hasErrors()) {
         throw new IllegalStateException("Unhandled bulk errors in response: " + response.getErrors());
       }
+
+      // Make sure all tuples are acked by acking any tuples not returned in the BulkWriterResponse
+      if (handleCommit) {
+        Set<Tuple> tuplesToAck = new HashSet<>(tupleList);
+        tuplesToAck.removeAll(response.getSuccesses());
+        response.getErrors().values().forEach(tuplesToAck::removeAll);
+        commit(tuplesToAck);
+      }
+
     } catch (Throwable e) {
       if(handleError) {
         error(sensorType, e, tupleList, messageGetStrategy);
diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
index decf3a5..1a05ba4 100644
--- a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
+++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
@@ -28,10 +28,13 @@ import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.powermock.api.mockito.PowerMockito.verifyStatic;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.error.MetronError;
 import org.apache.metron.common.message.MessageGetStrategy;
@@ -235,5 +238,36 @@ public class BulkWriterComponentTest {
     verify(bulkMessageWriter, times(0)).write(sensorType, configurations, Collections.singletonList(tuple1), Collections.singletonList(message1));
   }
 
-
+  @Test
+  public void flushShouldAckMissingTuples() throws Exception{
+    BulkMessageWriter<JSONObject> bulkMessageWriter = mock(BulkMessageWriter.class);
+    Tuple successTuple = mock(Tuple.class);
+    Tuple errorTuple = mock(Tuple.class);
+    Tuple missingTuple = mock(Tuple.class);
+    Collection<Tuple> tupleList = Arrays.asList(successTuple, errorTuple, missingTuple);
+    JSONObject successMessage = new JSONObject();
+    successMessage.put("name", "success");
+    JSONObject errorMessage = new JSONObject();
+    errorMessage.put("name", "error");
+    JSONObject missingMessage = new JSONObject();
+    missingMessage.put("name", "missing");
+    List<JSONObject> messageList = Arrays.asList(successMessage, errorMessage, missingMessage);
+    OutputCollector collector = mock(OutputCollector.class);
+    BulkWriterResponse bulkWriterResponse = new BulkWriterResponse();
+    bulkWriterResponse.addSuccess(successTuple);
+    Throwable throwable = mock(Throwable.class);
+    bulkWriterResponse.addError(throwable, errorTuple);
+
+    when(bulkMessageWriter.write(sensorType, configurations, tupleList, messageList)).thenReturn(bulkWriterResponse);
+
+    BulkWriterComponent bulkWriterComponent = new BulkWriterComponent(collector, true, true);
+    bulkWriterComponent.flush(sensorType, bulkMessageWriter, configurations, messageGetStrategy, tupleList, messageList);
+
+    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM), any(Values.class));
+    verify(collector, times(1)).reportError(throwable);
+    verify(collector, times(1)).ack(successTuple);
+    verify(collector, times(1)).ack(errorTuple);
+    verify(collector, times(1)).ack(missingTuple);
+    verifyNoMoreInteractions(collector);
+  }
 }