You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ma...@apache.org on 2017/06/01 21:41:44 UTC

[25/44] metron git commit: METRON-954: Create ability to change output topic of parsers from the CLI closes apache/incubator-metron#588

METRON-954: Create ability to change output topic of parsers from the CLI closes apache/incubator-metron#588


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/c1c21211
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/c1c21211
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/c1c21211

Branch: refs/heads/Metron_0.4.0
Commit: c1c212117d6123f2897a606fedbb2583fb3bb9c3
Parents: 6c836d1
Author: cstella <ce...@gmail.com>
Authored: Tue May 16 15:24:17 2017 -0400
Committer: cstella <ce...@gmail.com>
Committed: Tue May 16 15:24:17 2017 -0400

----------------------------------------------------------------------
 .../parsers/topology/ParserTopologyBuilder.java    |  8 +++++---
 .../metron/parsers/topology/ParserTopologyCLI.java | 17 ++++++++++++++++-
 .../parsers/integration/ParserIntegrationTest.java |  1 +
 .../components/ParserTopologyComponent.java        | 13 +++++++++++--
 .../parsers/topology/ParserTopologyCLITest.java    | 17 +++++++++++++++++
 5 files changed, 50 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/c1c21211/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index 196c19d..0c88573 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -74,7 +74,8 @@ public class ParserTopologyBuilder {
                                       int errorWriterParallelism,
                                       int errorWriterNumTasks,
                                       Map<String, Object> kafkaSpoutConfig,
-                                      Optional<String> securityProtocol
+                                      Optional<String> securityProtocol,
+                                      Optional<String> outputTopic
   ) throws Exception {
 
     // fetch configuration from zookeeper
@@ -88,7 +89,7 @@ public class ParserTopologyBuilder {
             .setNumTasks(spoutNumTasks);
 
     // create the parser bolt
-    ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig);
+    ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig, outputTopic);
     builder.setBolt("parserBolt", parserBolt, parserParallelism)
             .setNumTasks(parserNumTasks)
             .shuffleGrouping("kafkaSpout");
@@ -170,6 +171,7 @@ public class ParserTopologyBuilder {
                                             , Optional<String> securityProtocol
                                             , ParserConfigurations configs
                                             , SensorParserConfig parserConfig
+                                            , Optional<String> outputTopic
                                             )
   {
 
@@ -182,7 +184,7 @@ public class ParserTopologyBuilder {
             createKafkaWriter( brokerUrl
                              , zookeeperUrl
                              , securityProtocol
-                             ).withTopic(Constants.ENRICHMENT_TOPIC) :
+                             ).withTopic(outputTopic.orElse(Constants.ENRICHMENT_TOPIC)) :
             ReflectionUtils.createInstance(parserConfig.getWriterClassName());
     writer.configure(sensorType, new ParserWriterConfiguration(configs));
 

http://git-wip-us.apache.org/repos/asf/metron/blob/c1c21211/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
index 7523333..c68e101 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.parsers.topology;
 
+import org.apache.metron.common.Constants;
 import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
@@ -188,6 +189,18 @@ public class ParserTopologyCLI {
       return o;
     }
     )
+    ,OUTPUT_TOPIC("ot", code -> {
+      Option o = new Option(code
+                           , "output_topic"
+                           , true
+                           , "The output kafka topic for the parser.  If unset, the default is " + Constants.ENRICHMENT_TOPIC
+                           );
+      o.setArgName("KAFKA_TOPIC");
+      o.setRequired(false);
+      o.setType(String.class);
+      return o;
+    }
+    )
     ,TEST("t", code ->
     {
       Option o = new Option("t", "test", true, "Run in Test Mode");
@@ -296,6 +309,7 @@ public class ParserTopologyCLI {
       if(ParserOptions.SPOUT_CONFIG.has(cmd)) {
         spoutConfig = readSpoutConfig(new File(ParserOptions.SPOUT_CONFIG.get(cmd)));
       }
+      Optional<String> outputTopic = ParserOptions.OUTPUT_TOPIC.has(cmd)?Optional.of(ParserOptions.OUTPUT_TOPIC.get(cmd)):Optional.empty();
       Optional<String> securityProtocol = ParserOptions.SECURITY_PROTOCOL.has(cmd)?Optional.of(ParserOptions.SECURITY_PROTOCOL.get(cmd)):Optional.empty();
       securityProtocol = getSecurityProtocol(securityProtocol, spoutConfig);
       TopologyBuilder builder = ParserTopologyBuilder.build(zookeeperUrl,
@@ -308,7 +322,8 @@ public class ParserTopologyCLI {
               errorParallelism,
               errorNumTasks,
               spoutConfig,
-              securityProtocol
+              securityProtocol,
+              outputTopic
       );
       Config stormConf = ParserOptions.getConfig(cmd);
       if (ParserOptions.TEST.has(cmd)) {

http://git-wip-us.apache.org/repos/asf/metron/blob/c1c21211/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
index defd815..fe6475d 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
@@ -62,6 +62,7 @@ public abstract class ParserIntegrationTest extends BaseIntegrationTest {
     ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder()
             .withSensorType(sensorType)
             .withTopologyProperties(topologyProperties)
+            .withOutputTopic(Constants.ENRICHMENT_TOPIC)
             .withBrokerUrl(kafkaComponent.getBrokerList()).build();
 
     //UnitTestHelper.verboseLogging();

http://git-wip-us.apache.org/repos/asf/metron/blob/c1c21211/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
index b6a76d0..6ad7427 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
@@ -45,11 +45,13 @@ public class ParserTopologyComponent implements InMemoryComponent {
   private String brokerUrl;
   private String sensorType;
   private LocalCluster stormCluster;
+  private String outputTopic;
 
   public static class Builder {
     Properties topologyProperties;
     String brokerUrl;
     String sensorType;
+    String outputTopic;
     public Builder withTopologyProperties(Properties topologyProperties) {
       this.topologyProperties = topologyProperties;
       return this;
@@ -63,15 +65,21 @@ public class ParserTopologyComponent implements InMemoryComponent {
       return this;
     }
 
+    public Builder withOutputTopic(String topic) {
+      this.outputTopic = topic;
+      return this;
+    }
+
     public ParserTopologyComponent build() {
-      return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorType);
+      return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorType, outputTopic);
     }
   }
 
-  public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, String sensorType) {
+  public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, String sensorType, String outputTopic) {
     this.topologyProperties = topologyProperties;
     this.brokerUrl = brokerUrl;
     this.sensorType = sensorType;
+    this.outputTopic = outputTopic;
   }
 
 
@@ -89,6 +97,7 @@ public class ParserTopologyComponent implements InMemoryComponent {
                                                                    , 1
                                                                    , null
                                                                    , Optional.empty()
+                                                                   , Optional.ofNullable(outputTopic)
                                                                    );
       Map<String, Object> stormConf = new HashMap<>();
       stormConf.put(Config.TOPOLOGY_DEBUG, true);

http://git-wip-us.apache.org/repos/asf/metron/blob/c1c21211/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
index ac73a2b..5f536a5 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
@@ -116,6 +116,7 @@ public class ParserTopologyCLITest {
                                       .build(true);
     UnitTestHelper.setLog4jLevel(Parser.class, Level.ERROR);
   }
+
   public void happyPath(boolean longOpt) throws ParseException {
     CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
                                       .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
@@ -147,6 +148,22 @@ public class ParserTopologyCLITest {
     Assert.assertEquals(3, config.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));
     Assert.assertEquals(4, config.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
   }
+
+  @Test
+  public void testOutputTopic() throws Exception {
+    testOutputTopic(true);
+    testOutputTopic(false);
+  }
+
+  public void testOutputTopic(boolean longOpt) throws ParseException {
+     CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
+                                      .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
+                                      .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor")
+                                      .with(ParserTopologyCLI.ParserOptions.OUTPUT_TOPIC, "my_topic")
+                                      .build(longOpt);
+    Assert.assertEquals("my_topic", ParserTopologyCLI.ParserOptions.OUTPUT_TOPIC.get(cli));
+  }
+
   /**
     {
       "string" : "foo"