You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by mm...@apache.org on 2018/07/11 01:32:58 UTC

[42/50] [abbrv] metron git commit: METRON-1642: KafkaWriter should be able choose the topic from a field in addition to topology construction time closes apache/incubator-metron#1082

METRON-1642: KafkaWriter should be able choose the topic from a field in addition to topology construction time closes apache/incubator-metron#1082


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/097ce950
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/097ce950
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/097ce950

Branch: refs/heads/feature/METRON-1554-pcap-query-panel
Commit: 097ce95030e31bf9bd2af74aa56dc03348f7fef7
Parents: fc9ff85
Author: cstella <ce...@gmail.com>
Authored: Tue Jul 3 09:29:47 2018 -0400
Committer: cstella <ce...@gmail.com>
Committed: Tue Jul 3 09:29:47 2018 -0400

----------------------------------------------------------------------
 metron-platform/metron-parsers/README.md        |  1 +
 .../integration/WriterBoltIntegrationTest.java  | 75 +++++++++++++++++++-
 metron-platform/metron-writer/README.md         | 24 +++++++
 .../apache/metron/writer/kafka/KafkaWriter.java | 43 +++++++++--
 .../metron/writer/kafka/KafkaWriterTest.java    | 63 ++++++++++++++++
 5 files changed, 199 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/097ce950/metron-platform/metron-parsers/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md
index d79b9ce..7ddfdea 100644
--- a/metron-platform/metron-parsers/README.md
+++ b/metron-platform/metron-parsers/README.md
@@ -166,6 +166,7 @@ then it is assumed to be a regex and will match any topic matching the pattern (
   * `batchTimeout` : The timeout after which a batch will be flushed even if batchSize has not been met.  Optional.
     If unspecified, or set to `0`, it defaults to a system-determined duration which is a fraction of the Storm
     parameter `topology.message.timeout.secs`.  Ignored if batchSize is `1`, since this disables batching.
+  * The kafka writer can be configured within the parser config as well.  (This is all configured a priori, but this is convenient for overriding the settings).  See [here](../metron-writer/README.md#kafka-writer)
 * `fieldTransformations` : An array of complex objects representing the transformations to be done on the message generated from the parser before writing out to the kafka topic.
 * `spoutParallelism` : The kafka spout parallelism (default to `1`).  This can be overridden on the command line.
 * `spoutNumTasks` : The number of tasks for the spout (default to `1`). This can be overridden on the command line.

http://git-wip-us.apache.org/repos/asf/metron/blob/097ce950/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
index dfadfdc..99506de 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertThat;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.io.Serializable;
@@ -32,6 +33,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.function.Predicate;
 import javax.annotation.Nullable;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -66,7 +68,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
 
     @Override
     public boolean isValid(Map<String, Object> input, Map<String, Object> validationConfig, Map<String, Object> globalConfig, Context context) {
-      if (input.get("action").equals("invalid")) {
+      if (input.get("action") != null && input.get("action").equals("invalid")) {
         return false;
       }
       return true;
@@ -105,6 +107,69 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
   @Multiline
   public static String parserConfigJSON;
 
+  /**
+   * {
+   *    "parserClassName" : "org.apache.metron.parsers.csv.CSVParser",
+   *    "sensorTopic": "dummy",
+   *    "outputTopic": "output",
+   *    "errorTopic": "parser_error",
+   *    "parserConfig": {
+   *        "batchSize" : 1,
+   *        "columns" : {
+   *            "name" : 0,
+   *            "dummy" : 1
+   *        },
+   *      "kafka.topicField" : "route_field"
+   *    }
+   *    ,"fieldTransformations" : [
+   *    {
+   *      "transformation" : "STELLAR"
+   *     ,"input" :  ["name"]
+   *     ,"output" :  ["route_field"]
+   *     ,"config" : {
+   *        "route_field" : "match{ name == 'metron' => 'output', default => NULL}"
+   *      }
+   *    }
+   *    ]
+   * }
+   */
+  @Multiline
+  public static String parserConfigJSONKafkaRedirection;
+
+  @Test
+  public void test_topic_redirection() throws Exception {
+    final String sensorType = "dummy";
+    SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(parserConfigJSONKafkaRedirection, SensorParserConfig.class);
+    final List<byte[]> inputMessages = new ArrayList<byte[]>() {{
+      add(Bytes.toBytes("metron,foo"));
+      add(Bytes.toBytes("notmetron,foo"));
+      add(Bytes.toBytes("metron,bar"));
+      add(Bytes.toBytes("metron,baz"));
+    }};
+
+    final Properties topologyProperties = new Properties();
+    ComponentRunner runner = setupTopologyComponents(topologyProperties, sensorType, parserConfig, globalConfigWithValidation);
+    try {
+      runner.start();
+      kafkaComponent.writeMessages(sensorType, inputMessages);
+      KafkaProcessor<Map<String, List<JSONObject>>> kafkaProcessor = getKafkaProcessor(
+          parserConfig.getOutputTopic(), parserConfig.getErrorTopic(), kafkaMessageSet -> kafkaMessageSet.getMessages().size() == 3 && kafkaMessageSet.getErrors().isEmpty());
+      ProcessorResult<Map<String, List<JSONObject>>> result = runner.process(kafkaProcessor);
+
+      // validate the output messages
+      Map<String,List<JSONObject>> outputMessages = result.getResult();
+      for(JSONObject j : outputMessages.get(Constants.ENRICHMENT_TOPIC)) {
+        Assert.assertEquals("metron", j.get("name"));
+        Assert.assertEquals("output", j.get("route_field"));
+        Assert.assertTrue(ImmutableSet.of("foo", "bar", "baz").contains(j.get("dummy")));
+      }
+    } finally {
+      if(runner != null) {
+        runner.stop();
+      }
+    }
+  }
+
   @Test
   public void parser_with_global_validations_writes_bad_records_to_error_topic() throws Exception {
     final String sensorType = "dummy";
@@ -192,9 +257,13 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
         .build();
   }
 
-  @SuppressWarnings("unchecked")
   private KafkaProcessor<Map<String, List<JSONObject>>> getKafkaProcessor(String outputTopic,
       String errorTopic) {
+    return getKafkaProcessor(outputTopic, errorTopic, messageSet -> (messageSet.getMessages().size() == 1) && (messageSet.getErrors().size() == 2));
+  }
+  @SuppressWarnings("unchecked")
+  private KafkaProcessor<Map<String, List<JSONObject>>> getKafkaProcessor(String outputTopic,
+      String errorTopic, Predicate<KafkaMessageSet> predicate) {
 
     return new KafkaProcessor<>()
         .withKafkaComponentName("kafka")
@@ -204,7 +273,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
           @Nullable
           @Override
           public Boolean apply(@Nullable KafkaMessageSet messageSet) {
-            return (messageSet.getMessages().size() == 1) && (messageSet.getErrors().size() == 2);
+            return predicate.test(messageSet);
           }
         })
         .withProvideResult(new Function<KafkaMessageSet, Map<String, List<JSONObject>>>() {

http://git-wip-us.apache.org/repos/asf/metron/blob/097ce950/metron-platform/metron-writer/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/README.md b/metron-platform/metron-writer/README.md
index 16c6686..21cdbca 100644
--- a/metron-platform/metron-writer/README.md
+++ b/metron-platform/metron-writer/README.md
@@ -20,6 +20,30 @@ limitations under the License.
 ## Introduction
 The writer module provides some utilties for writing to outside components from within Storm.  This includes managing bulk writing.  An implemention is included for writing to HDFS in this module. Other writers can be found in their own modules.
 
+## Kafka Writer
+We have an implementation of a writer which will write batches of
+messages to Kafka.  An interesting aspect of this writer is that it can
+be configured to allow users to specify a message field which contains
+the topic for the message.
+
+The configuration for this writer is held in the individual Sensor
+Configurations:
+* [Enrichment](../metron-enrichment/README.md#sensor-enrichment-configuration) under the `config` element
+* [Parsers](../metron-parsers/README.md#parser-configuration) in the `parserConfig` element
+* Profiler - Unsupported currently
+
+In each of these, the kafka writer can be configured via a map which has
+the following elements:
+* `kafka.brokerUrl` : The broker URL
+* `kafka.keySerializer` : The key serializer (defaults to `StringSerializer`)
+* `kafka.valueSerializer` : The key serializer (defaults to `StringSerializer`)
+* `kafka.zkQuorum` : The zookeeper quorum
+* `kafka.requiredAcks` : Whether to require acks.
+* `kafka.topic` : The topic to write to
+* `kafka.topicField` : The field to pull the topic from.  If this is specified, then the producer will use this.  If it is unspecified, then it will default to the `kafka.topic` property.  If neither are specified, then an error will occur.
+* `kafka.producerConfigs` : A map of kafka producer configs for advanced customization.
+ 
+
 ## HDFS Writer
 The HDFS writer included here expands on what Storm has in several ways. There's customization in syncing to HDFS, rotation policy, etc. In addition, the writer allows for users to define output paths based on the fields in the provided JSON message.  This can be defined using Stellar.
 

http://git-wip-us.apache.org/repos/asf/metron/blob/097ce950/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
index efb2418..599ecbd 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
@@ -19,6 +19,7 @@ package org.apache.metron.writer.kafka;
 
 import com.google.common.base.Joiner;
 import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
 import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -41,8 +42,11 @@ import org.apache.metron.writer.AbstractWriter;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class KafkaWriter extends AbstractWriter implements BulkMessageWriter<JSONObject>, Serializable {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   public enum Configurations {
      BROKER("kafka.brokerUrl")
     ,KEY_SERIALIZER("kafka.keySerializer")
@@ -50,6 +54,7 @@ public class KafkaWriter extends AbstractWriter implements BulkMessageWriter<JSO
     ,VALUE_SERIALIZER("kafka.valueSerializer")
     ,REQUIRED_ACKS("kafka.requiredAcks")
     ,TOPIC("kafka.topic")
+    ,TOPIC_FIELD("kafka.topicField")
     ,PRODUCER_CONFIGS("kafka.producerConfigs");
     ;
     String key;
@@ -81,6 +86,7 @@ public class KafkaWriter extends AbstractWriter implements BulkMessageWriter<JSO
   private String valueSerializer = "org.apache.kafka.common.serialization.StringSerializer";
   private int requiredAcks = 1;
   private String kafkaTopic = Constants.ENRICHMENT_TOPIC;
+  private String kafkaTopicField = null;
   private KafkaProducer kafkaProducer;
   private String configPrefix = null;
   private String zkQuorum = null;
@@ -120,6 +126,12 @@ public class KafkaWriter extends AbstractWriter implements BulkMessageWriter<JSO
     this.kafkaTopic= topic;
     return this;
   }
+
+  public KafkaWriter withTopicField(String topicField) {
+    this.kafkaTopicField = topicField;
+    return this;
+  }
+
   public KafkaWriter withConfigPrefix(String prefix) {
     this.configPrefix = prefix;
     return this;
@@ -166,6 +178,10 @@ public class KafkaWriter extends AbstractWriter implements BulkMessageWriter<JSO
     if(topic != null) {
       withTopic(topic);
     }
+    String topicField = Configurations.TOPIC_FIELD.getAndConvert(getConfigPrefix(), configMap, String.class);
+    if(topicField != null) {
+      withTopicField(topicField);
+    }
     Map<String, Object> producerConfigs = (Map)Configurations.PRODUCER_CONFIGS.get(getConfigPrefix(), configMap);
     if(producerConfigs != null) {
       withProducerConfigs(producerConfigs);
@@ -197,6 +213,19 @@ public class KafkaWriter extends AbstractWriter implements BulkMessageWriter<JSO
     return producerConfig;
   }
 
+  public Optional<String> getKafkaTopic(JSONObject message) {
+    String t = null;
+    if(kafkaTopicField != null) {
+      t = (String)message.get(kafkaTopicField);
+      LOG.debug("Sending to topic: {} based on the field {}", t, kafkaTopicField);
+    }
+    else {
+      t = kafkaTopic;
+      LOG.debug("Sending to topic: {}", t);
+    }
+    return Optional.ofNullable(t);
+  }
+
   @Override
   public BulkWriterResponse write(String sensorType, WriterConfiguration configurations,
       Iterable<Tuple> tuples, List<JSONObject> messages) {
@@ -212,10 +241,16 @@ public class KafkaWriter extends AbstractWriter implements BulkMessageWriter<JSO
         writerResponse.addError(t, tuple);
         continue;
       }
-      Future future = kafkaProducer
-          .send(new ProducerRecord<String, String>(kafkaTopic, jsonMessage));
-      // we want to manage the batching
-      results.add(new AbstractMap.SimpleEntry<>(tuple, future));
+      Optional<String> topic = getKafkaTopic(message);
+      if(topic.isPresent()) {
+        Future future = kafkaProducer
+            .send(new ProducerRecord<String, String>(topic.get(), jsonMessage));
+        // we want to manage the batching
+        results.add(new AbstractMap.SimpleEntry<>(tuple, future));
+      }
+      else {
+        LOG.debug("Dropping {} because no topic is specified.", jsonMessage);
+      }
     }
 
     try {

http://git-wip-us.apache.org/repos/asf/metron/blob/097ce950/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java
index 1b95430..9d201b8 100644
--- a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java
+++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java
@@ -19,10 +19,12 @@
 package org.apache.metron.writer.kafka;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.json.simple.JSONObject;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -85,4 +87,65 @@ public class KafkaWriterTest {
     Assert.assertEquals(producerConfigs.get("key1"), 1);
     Assert.assertEquals(producerConfigs.get("key2"), "value2");
   }
+
+  @Test
+  public void testTopicField_bothTopicAndFieldSpecified() throws Exception {
+    KafkaWriter writer = new KafkaWriter();
+    WriterConfiguration configuration = createConfiguration(
+            new HashMap<String, Object>() {{
+              put("kafka.brokerUrl" , "localhost:6667");
+              put("kafka.topic" , SENSOR_TYPE);
+              put("kafka.topicField" , "kafka_topic");
+              put("kafka.producerConfigs" , ImmutableMap.of("key1", 1, "key2", "value2"));
+            }}
+    );
+
+    writer.configure(SENSOR_TYPE, configuration);
+    Assert.assertEquals( "metron"
+                       , writer.getKafkaTopic(new JSONObject() {{
+                          put("kafka_topic", "metron");
+                         }}).get()
+                       );
+    Assert.assertFalse( writer.getKafkaTopic(new JSONObject()).isPresent() );
+
+  }
+
+  @Test
+  public void testTopicField_onlyFieldSpecified() throws Exception {
+    KafkaWriter writer = new KafkaWriter();
+    WriterConfiguration configuration = createConfiguration(
+            new HashMap<String, Object>() {{
+              put("kafka.brokerUrl" , "localhost:6667");
+              put("kafka.topicField" , "kafka_topic");
+              put("kafka.producerConfigs" , ImmutableMap.of("key1", 1, "key2", "value2"));
+            }}
+    );
+
+    writer.configure(SENSOR_TYPE, configuration);
+    Assert.assertEquals( "metron"
+                       , writer.getKafkaTopic(new JSONObject() {{
+                          put("kafka_topic", "metron");
+                         }}).get()
+                       );
+    Assert.assertFalse( writer.getKafkaTopic(new JSONObject()).isPresent() );
+  }
+
+  @Test
+  public void testTopicField_neitherSpecified() throws Exception {
+    KafkaWriter writer = new KafkaWriter();
+    WriterConfiguration configuration = createConfiguration(
+            new HashMap<String, Object>() {{
+              put("kafka.brokerUrl" , "localhost:6667");
+              put("kafka.producerConfigs" , ImmutableMap.of("key1", 1, "key2", "value2"));
+            }}
+    );
+
+    writer.configure(SENSOR_TYPE, configuration);
+    Assert.assertEquals(Constants.ENRICHMENT_TOPIC
+                       , writer.getKafkaTopic(new JSONObject() {{
+                          put("kafka_topic", "metron");
+                         }}).get()
+                       );
+    Assert.assertTrue( writer.getKafkaTopic(new JSONObject()).isPresent() );
+  }
 }