You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/05/14 19:28:29 UTC

[11/11] metron git commit: METRON-1549: Add empty object test to WriterBoltIntegrationTest implementation (mmiklavc via mmiklavc) closes apache/metron#1009

METRON-1549: Add empty object test to WriterBoltIntegrationTest implementation (mmiklavc via mmiklavc) closes apache/metron#1009


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/b9453aab
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/b9453aab
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/b9453aab

Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: b9453aabd781c7c67258d9506af176fbcab85be1
Parents: a17c1ad
Author: mmiklavc <mi...@gmail.com>
Authored: Fri May 11 12:04:01 2018 -0600
Committer: Michael Miklavcic <mi...@gmail.com>
Committed: Fri May 11 12:04:01 2018 -0600

----------------------------------------------------------------------
 .../integration/WriterBoltIntegrationTest.java  | 315 ++++++++++++++-----
 1 file changed, 231 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/b9453aab/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
index cde08bc..d565147 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
@@ -17,40 +17,56 @@
  */
 package org.apache.metron.writers.integration;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
 import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import javax.annotation.Nullable;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.SensorParserConfig;
-import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.common.field.validation.FieldValidation;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.enrichment.integration.components.ConfigUploadComponent;
-import org.apache.metron.integration.*;
+import org.apache.metron.integration.BaseIntegrationTest;
+import org.apache.metron.integration.ComponentRunner;
+import org.apache.metron.integration.Processor;
+import org.apache.metron.integration.ProcessorResult;
+import org.apache.metron.integration.ReadinessState;
 import org.apache.metron.integration.components.KafkaComponent;
-import org.apache.metron.integration.processors.KafkaMessageSet;
 import org.apache.metron.integration.components.ZKServerComponent;
+import org.apache.metron.integration.processors.KafkaMessageSet;
 import org.apache.metron.integration.processors.KafkaProcessor;
-import org.apache.metron.parsers.csv.CSVParser;
 import org.apache.metron.parsers.integration.components.ParserTopologyComponent;
-import org.apache.metron.test.utils.UnitTestHelper;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.stellar.dsl.Context;
 import org.json.simple.JSONObject;
-import org.json.simple.parser.ParseException;
 import org.junit.Assert;
 import org.junit.Test;
 
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.*;
-
 public class WriterBoltIntegrationTest extends BaseIntegrationTest {
+  private ZKServerComponent zkServerComponent;
+  private KafkaComponent kafkaComponent;
+  private ConfigUploadComponent configUploadComponent;
+  private ParserTopologyComponent parserTopologyComponent;
 
-  public static class MockValidator implements FieldValidation{
+  public static class MockValidator implements FieldValidation {
 
     @Override
     public boolean isValid(Map<String, Object> input, Map<String, Object> validationConfig, Map<String, Object> globalConfig, Context context) {
-      if(input.get("action").equals("invalid")) {
+      if (input.get("action").equals("invalid")) {
         return false;
       }
       return true;
@@ -60,6 +76,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
     public void initialize(Map<String, Object> validationConfig, Map<String, Object> globalConfig) {
     }
   }
+
   /**
    * {
    *   "fieldValidations" : [
@@ -68,7 +85,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
    * }
    */
   @Multiline
-  public static String globalConfig;
+  public static String globalConfigWithValidation;
 
   /**
    * {
@@ -88,57 +105,23 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
   public static String parserConfigJSON;
 
   @Test
-  public void test() throws UnableToStartException, IOException, ParseException {
-
-    UnitTestHelper.setLog4jLevel(CSVParser.class, org.apache.log4j.Level.FATAL);
+  public void parser_with_global_validations_writes_bad_records_to_error_topic() throws Exception {
     final String sensorType = "dummy";
-
     SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(parserConfigJSON, SensorParserConfig.class);
-
-    // the input messages to parser
     final List<byte[]> inputMessages = new ArrayList<byte[]>() {{
       add(Bytes.toBytes("valid,foo"));
       add(Bytes.toBytes("invalid,foo"));
       add(Bytes.toBytes("error"));
     }};
 
-    // setup external components; zookeeper, kafka
     final Properties topologyProperties = new Properties();
-    final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties);
-    final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{
-      add(new KafkaComponent.Topic(sensorType, 1));
-      add(new KafkaComponent.Topic(parserConfig.getErrorTopic(), 1));
-      add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
-    }});
-    topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
-
-    ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
-            .withTopologyProperties(topologyProperties)
-            .withGlobalConfig(globalConfig)
-            .withParserSensorConfig(sensorType, parserConfig);
-
-    ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder()
-            .withSensorType(sensorType)
-            .withTopologyProperties(topologyProperties)
-            .withBrokerUrl(kafkaComponent.getBrokerList())
-            .withErrorTopic(parserConfig.getErrorTopic())
-            .withOutputTopic(parserConfig.getOutputTopic())
-            .build();
-
-    ComponentRunner runner = new ComponentRunner.Builder()
-            .withComponent("zk", zkServerComponent)
-            .withComponent("kafka", kafkaComponent)
-            .withComponent("config", configUploadComponent)
-            .withComponent("org/apache/storm", parserTopologyComponent)
-            .withMillisecondsBetweenAttempts(5000)
-            .withNumRetries(10)
-            .withCustomShutdownOrder(new String[]{"org/apache/storm","config","kafka","zk"})
-            .build();
+    ComponentRunner runner = setupTopologyComponents(topologyProperties, sensorType, parserConfig, globalConfigWithValidation);
     try {
       runner.start();
       kafkaComponent.writeMessages(sensorType, inputMessages);
-      ProcessorResult<Map<String, List<JSONObject>>> result = runner.process(
-              getProcessor(parserConfig.getOutputTopic(), parserConfig.getErrorTopic()));
+      KafkaProcessor<Map<String, List<JSONObject>>> kafkaProcessor = getKafkaProcessor(
+          parserConfig.getOutputTopic(), parserConfig.getErrorTopic());
+      ProcessorResult<Map<String, List<JSONObject>>> result = runner.process(kafkaProcessor);
 
       // validate the output messages
       Map<String,List<JSONObject>> outputMessages = result.getResult();
@@ -166,45 +149,209 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
     }
   }
 
+  /**
+   * Setup external components (as side effects of invoking this method):
+   * zookeeper, kafka, config upload, parser topology, main runner.
+   *
+   * Modifies topology properties with relevant component properties, e.g. kafka.broker.
+   *
+   * @return runner
+   */
+  public ComponentRunner setupTopologyComponents(Properties topologyProperties, String sensorType,
+      SensorParserConfig parserConfig, String globalConfig) {
+    zkServerComponent = getZKServerComponent(topologyProperties);
+    kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{
+      add(new KafkaComponent.Topic(sensorType, 1));
+      add(new KafkaComponent.Topic(parserConfig.getErrorTopic(), 1));
+      add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
+    }});
+    topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
+
+    configUploadComponent = new ConfigUploadComponent()
+        .withTopologyProperties(topologyProperties)
+        .withGlobalConfig(globalConfig)
+        .withParserSensorConfig(sensorType, parserConfig);
+
+    parserTopologyComponent = new ParserTopologyComponent.Builder()
+        .withSensorType(sensorType)
+        .withTopologyProperties(topologyProperties)
+        .withBrokerUrl(kafkaComponent.getBrokerList())
+        .withErrorTopic(parserConfig.getErrorTopic())
+        .withOutputTopic(parserConfig.getOutputTopic())
+        .build();
+
+    return new ComponentRunner.Builder()
+        .withComponent("zk", zkServerComponent)
+        .withComponent("kafka", kafkaComponent)
+        .withComponent("config", configUploadComponent)
+        .withComponent("org/apache/storm", parserTopologyComponent)
+        .withMillisecondsBetweenAttempts(5000)
+        .withNumRetries(10)
+        .withCustomShutdownOrder(new String[]{"org/apache/storm","config","kafka","zk"})
+        .build();
+  }
+
+  @SuppressWarnings("unchecked")
+  private KafkaProcessor<Map<String, List<JSONObject>>> getKafkaProcessor(String outputTopic,
+      String errorTopic) {
+
+    return new KafkaProcessor<>()
+        .withKafkaComponentName("kafka")
+        .withReadTopic(outputTopic)
+        .withErrorTopic(errorTopic)
+        .withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() {
+          @Nullable
+          @Override
+          public Boolean apply(@Nullable KafkaMessageSet messageSet) {
+            return (messageSet.getMessages().size() == 1) && (messageSet.getErrors().size() == 2);
+          }
+        })
+        .withProvideResult(new Function<KafkaMessageSet, Map<String, List<JSONObject>>>() {
+          @Nullable
+          @Override
+          public Map<String, List<JSONObject>> apply(@Nullable KafkaMessageSet messageSet) {
+            return new HashMap<String, List<JSONObject>>() {{
+              put(Constants.ENRICHMENT_TOPIC, loadMessages(messageSet.getMessages()));
+              put(errorTopic, loadMessages(messageSet.getErrors()));
+            }};
+          }
+        });
+  }
+
   private static List<JSONObject> loadMessages(List<byte[]> outputMessages) {
     List<JSONObject> tmp = new ArrayList<>();
     Iterables.addAll(tmp,
-            Iterables.transform(outputMessages,
-                    message -> {
-                      try {
-                        return new JSONObject(JSONUtils.INSTANCE.load(new String(message), JSONUtils.MAP_SUPPLIER));
-                      } catch (Exception ex) {
-                        throw new IllegalStateException(ex);
-                      }
-                    }
-            )
+        Iterables.transform(outputMessages,
+            message -> {
+              try {
+                return new JSONObject(
+                    JSONUtils.INSTANCE.load(new String(message), JSONUtils.MAP_SUPPLIER));
+              } catch (Exception ex) {
+                throw new IllegalStateException(ex);
+              }
+            }
+        )
     );
     return tmp;
   }
 
-  @SuppressWarnings("unchecked")
-  private KafkaProcessor<Map<String,List<JSONObject>>> getProcessor(String outputTopic, String errorTopic){
+  /**
+   * { }
+   */
+  @Multiline
+  public static String globalConfigEmpty;
 
-    return new KafkaProcessor<>()
-            .withKafkaComponentName("kafka")
-            .withReadTopic(outputTopic)
-            .withErrorTopic(errorTopic)
-            .withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() {
-              @Nullable
-              @Override
-              public Boolean apply(@Nullable KafkaMessageSet messageSet) {
-                return (messageSet.getMessages().size() == 1) && (messageSet.getErrors().size() == 2);
-              }
-            })
-            .withProvideResult(new Function<KafkaMessageSet,Map<String,List<JSONObject>>>(){
-              @Nullable
-              @Override
-              public Map<String,List<JSONObject>> apply(@Nullable KafkaMessageSet messageSet) {
-                return new HashMap<String, List<JSONObject>>() {{
-                  put(Constants.ENRICHMENT_TOPIC, loadMessages(messageSet.getMessages()));
-                  put(errorTopic, loadMessages(messageSet.getErrors()));
-                }};
-              }
-            });
+  /**
+   * {
+   *    "parserClassName":"org.apache.metron.writers.integration.WriterBoltIntegrationTest$EmptyObjectParser",
+   *    "sensorTopic":"emptyobjectparser",
+   *    "outputTopic": "enrichments",
+   *    "errorTopic": "parser_error"
+   * }
+   */
+  @Multiline
+  public static String offsetParserConfigJSON;
+
+  @Test
+  public void commits_kafka_offsets_for_emtpy_objects() throws Exception {
+    final String sensorType = "emptyobjectparser";
+    SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(offsetParserConfigJSON, SensorParserConfig.class);
+    final List<byte[]> inputMessages = new ArrayList<byte[]>() {{
+      add(Bytes.toBytes("foo"));
+      add(Bytes.toBytes("bar"));
+      add(Bytes.toBytes("baz"));
+    }};
+    final Properties topologyProperties = new Properties();
+    ComponentRunner runner = setupTopologyComponents(topologyProperties, sensorType, parserConfig, globalConfigEmpty);
+    try {
+      runner.start();
+      kafkaComponent.writeMessages(sensorType, inputMessages);
+      Processor allResultsProcessor = new AllResultsProcessor(inputMessages, Constants.ENRICHMENT_TOPIC);
+      ProcessorResult<Set<JSONObject>> result = runner.process(allResultsProcessor);
+
+      // validate the output messages
+      assertThat("size should match", result.getResult().size(), equalTo(inputMessages.size()));
+      for (JSONObject record : result.getResult()) {
+        assertThat("record should have a guid", record.containsKey("guid"), equalTo(true));
+        assertThat("record should have correct source.type", record.get("source.type"),
+            equalTo(sensorType));
+      }
+    } finally {
+      if (runner != null) {
+        runner.stop();
+      }
+    }
+  }
+
+  /**
+   * Goal is to check returning an empty JSONObject in our List returned by parse.
+   */
+  public static class EmptyObjectParser implements MessageParser<JSONObject>, Serializable {
+
+    @Override
+    public void init() {
+    }
+
+    @Override
+    public List<JSONObject> parse(byte[] bytes) {
+      return ImmutableList.of(new JSONObject());
+    }
+
+    @Override
+    public boolean validate(JSONObject message) {
+      return true;
     }
+
+    @Override
+    public void configure(Map<String, Object> map) {
+    }
+  }
+
+  /**
+   * Verifies all messages in the provided List of input messages appears in the specified
+   * Kafka output topic
+   */
+  private class AllResultsProcessor implements  Processor<Set<JSONObject>> {
+
+    private final List<byte[]> inputMessages;
+    private String outputKafkaTopic;
+    // used for calculating readiness and returning result set
+    private final Set<JSONObject> outputMessages = new HashSet<>();
+
+    public AllResultsProcessor(List<byte[]> inputMessages, String outputKafkaTopic) {
+      this.inputMessages = inputMessages;
+      this.outputKafkaTopic = outputKafkaTopic;
+    }
+
+    @Override
+    public ReadinessState process(ComponentRunner runner) {
+      KafkaComponent kc = runner.getComponent("kafka", KafkaComponent.class);
+      outputMessages.addAll(readMessagesFromKafka(kc, outputKafkaTopic));
+      return calcReadiness(inputMessages.size(), outputMessages.size());
+    }
+
+    private Set<JSONObject> readMessagesFromKafka(KafkaComponent kc, String topic) {
+      Set<JSONObject> out = new HashSet<>();
+      for (byte[] b : kc.readMessages(topic)) {
+        try {
+          JSONObject m = new JSONObject(
+              JSONUtils.INSTANCE.load(new String(b), JSONUtils.MAP_SUPPLIER));
+          out.add(m);
+        } catch (IOException e) {
+          throw new IllegalStateException(e);
+        }
+      }
+      return out;
+    }
+
+    private ReadinessState calcReadiness(int in, int out) {
+      return in == out ? ReadinessState.READY : ReadinessState.NOT_READY;
+    }
+
+    @Override
+    public ProcessorResult<Set<JSONObject>> getResult() {
+      return new ProcessorResult<>(outputMessages, null);
+    }
+  }
+
 }