You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ma...@apache.org on 2017/06/01 21:41:44 UTC
[25/44] metron git commit: METRON-954: Create ability to change
output topic of parsers from the CLI closes apache/incubator-metron#588
METRON-954: Create ability to change output topic of parsers from the CLI closes apache/incubator-metron#588
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/c1c21211
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/c1c21211
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/c1c21211
Branch: refs/heads/Metron_0.4.0
Commit: c1c212117d6123f2897a606fedbb2583fb3bb9c3
Parents: 6c836d1
Author: cstella <ce...@gmail.com>
Authored: Tue May 16 15:24:17 2017 -0400
Committer: cstella <ce...@gmail.com>
Committed: Tue May 16 15:24:17 2017 -0400
----------------------------------------------------------------------
.../parsers/topology/ParserTopologyBuilder.java | 8 +++++---
.../metron/parsers/topology/ParserTopologyCLI.java | 17 ++++++++++++++++-
.../parsers/integration/ParserIntegrationTest.java | 1 +
.../components/ParserTopologyComponent.java | 13 +++++++++++--
.../parsers/topology/ParserTopologyCLITest.java | 17 +++++++++++++++++
5 files changed, 50 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/c1c21211/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index 196c19d..0c88573 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -74,7 +74,8 @@ public class ParserTopologyBuilder {
int errorWriterParallelism,
int errorWriterNumTasks,
Map<String, Object> kafkaSpoutConfig,
- Optional<String> securityProtocol
+ Optional<String> securityProtocol,
+ Optional<String> outputTopic
) throws Exception {
// fetch configuration from zookeeper
@@ -88,7 +89,7 @@ public class ParserTopologyBuilder {
.setNumTasks(spoutNumTasks);
// create the parser bolt
- ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig);
+ ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig, outputTopic);
builder.setBolt("parserBolt", parserBolt, parserParallelism)
.setNumTasks(parserNumTasks)
.shuffleGrouping("kafkaSpout");
@@ -170,6 +171,7 @@ public class ParserTopologyBuilder {
, Optional<String> securityProtocol
, ParserConfigurations configs
, SensorParserConfig parserConfig
+ , Optional<String> outputTopic
)
{
@@ -182,7 +184,7 @@ public class ParserTopologyBuilder {
createKafkaWriter( brokerUrl
, zookeeperUrl
, securityProtocol
- ).withTopic(Constants.ENRICHMENT_TOPIC) :
+ ).withTopic(outputTopic.orElse(Constants.ENRICHMENT_TOPIC)) :
ReflectionUtils.createInstance(parserConfig.getWriterClassName());
writer.configure(sensorType, new ParserWriterConfiguration(configs));
http://git-wip-us.apache.org/repos/asf/metron/blob/c1c21211/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
index 7523333..c68e101 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
@@ -17,6 +17,7 @@
*/
package org.apache.metron.parsers.topology;
+import org.apache.metron.common.Constants;
import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
@@ -188,6 +189,18 @@ public class ParserTopologyCLI {
return o;
}
)
+ ,OUTPUT_TOPIC("ot", code -> {
+ Option o = new Option(code
+ , "output_topic"
+ , true
+ , "The output kafka topic for the parser. If unset, the default is " + Constants.ENRICHMENT_TOPIC
+ );
+ o.setArgName("KAFKA_TOPIC");
+ o.setRequired(false);
+ o.setType(String.class);
+ return o;
+ }
+ )
,TEST("t", code ->
{
Option o = new Option("t", "test", true, "Run in Test Mode");
@@ -296,6 +309,7 @@ public class ParserTopologyCLI {
if(ParserOptions.SPOUT_CONFIG.has(cmd)) {
spoutConfig = readSpoutConfig(new File(ParserOptions.SPOUT_CONFIG.get(cmd)));
}
+ Optional<String> outputTopic = ParserOptions.OUTPUT_TOPIC.has(cmd)?Optional.of(ParserOptions.OUTPUT_TOPIC.get(cmd)):Optional.empty();
Optional<String> securityProtocol = ParserOptions.SECURITY_PROTOCOL.has(cmd)?Optional.of(ParserOptions.SECURITY_PROTOCOL.get(cmd)):Optional.empty();
securityProtocol = getSecurityProtocol(securityProtocol, spoutConfig);
TopologyBuilder builder = ParserTopologyBuilder.build(zookeeperUrl,
@@ -308,7 +322,8 @@ public class ParserTopologyCLI {
errorParallelism,
errorNumTasks,
spoutConfig,
- securityProtocol
+ securityProtocol,
+ outputTopic
);
Config stormConf = ParserOptions.getConfig(cmd);
if (ParserOptions.TEST.has(cmd)) {
http://git-wip-us.apache.org/repos/asf/metron/blob/c1c21211/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
index defd815..fe6475d 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
@@ -62,6 +62,7 @@ public abstract class ParserIntegrationTest extends BaseIntegrationTest {
ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder()
.withSensorType(sensorType)
.withTopologyProperties(topologyProperties)
+ .withOutputTopic(Constants.ENRICHMENT_TOPIC)
.withBrokerUrl(kafkaComponent.getBrokerList()).build();
//UnitTestHelper.verboseLogging();
http://git-wip-us.apache.org/repos/asf/metron/blob/c1c21211/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
index b6a76d0..6ad7427 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
@@ -45,11 +45,13 @@ public class ParserTopologyComponent implements InMemoryComponent {
private String brokerUrl;
private String sensorType;
private LocalCluster stormCluster;
+ private String outputTopic;
public static class Builder {
Properties topologyProperties;
String brokerUrl;
String sensorType;
+ String outputTopic;
public Builder withTopologyProperties(Properties topologyProperties) {
this.topologyProperties = topologyProperties;
return this;
@@ -63,15 +65,21 @@ public class ParserTopologyComponent implements InMemoryComponent {
return this;
}
+ public Builder withOutputTopic(String topic) {
+ this.outputTopic = topic;
+ return this;
+ }
+
public ParserTopologyComponent build() {
- return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorType);
+ return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorType, outputTopic);
}
}
- public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, String sensorType) {
+ public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, String sensorType, String outputTopic) {
this.topologyProperties = topologyProperties;
this.brokerUrl = brokerUrl;
this.sensorType = sensorType;
+ this.outputTopic = outputTopic;
}
@@ -89,6 +97,7 @@ public class ParserTopologyComponent implements InMemoryComponent {
, 1
, null
, Optional.empty()
+ , Optional.ofNullable(outputTopic)
);
Map<String, Object> stormConf = new HashMap<>();
stormConf.put(Config.TOPOLOGY_DEBUG, true);
http://git-wip-us.apache.org/repos/asf/metron/blob/c1c21211/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
index ac73a2b..5f536a5 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
@@ -116,6 +116,7 @@ public class ParserTopologyCLITest {
.build(true);
UnitTestHelper.setLog4jLevel(Parser.class, Level.ERROR);
}
+
public void happyPath(boolean longOpt) throws ParseException {
CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
.with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
@@ -147,6 +148,22 @@ public class ParserTopologyCLITest {
Assert.assertEquals(3, config.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));
Assert.assertEquals(4, config.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
}
+
+ @Test
+ public void testOutputTopic() throws Exception {
+ testOutputTopic(true);
+ testOutputTopic(false);
+ }
+
+ public void testOutputTopic(boolean longOpt) throws ParseException {
+ CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
+ .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
+ .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor")
+ .with(ParserTopologyCLI.ParserOptions.OUTPUT_TOPIC, "my_topic")
+ .build(longOpt);
+ Assert.assertEquals("my_topic", ParserTopologyCLI.ParserOptions.OUTPUT_TOPIC.get(cli));
+ }
+
/**
{
"string" : "foo"