You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/11/15 23:16:27 UTC
[2/4] storm git commit: STORM-3123: Changes to return extra
properties from KafkaSpout and use it in TopologySpoutLag
STORM-3123: Changes to return extra properties from KafkaSpout and use it in TopologySpoutLag
Change-Id: Id6e3ce120cc813adbe611d085cd4bc3ebd0ff590
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fa0b8624
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fa0b8624
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fa0b8624
Branch: refs/heads/master
Commit: fa0b8624492630d0f5c3c89e6bf7a5d20ac59d8b
Parents: 40e24ce
Author: Arun Mahadevan <ar...@apache.org>
Authored: Mon Nov 12 11:19:31 2018 -0800
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Tue Nov 13 00:38:50 2018 -0800
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 21 ++++++-
.../storm/kafka/monitor/KafkaOffsetLagUtil.java | 12 ++--
.../apache/storm/utils/TopologySpoutLag.java | 60 +++++++++++++++++---
3 files changed, 77 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/fa0b8624/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 1ee0a5c..cc8cb77 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -692,11 +692,28 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
configuration.put(configKeyPrefix + "topics", getTopicsString());
configuration.put(configKeyPrefix + "groupid", kafkaSpoutConfig.getConsumerGroupId());
- configuration.put(configKeyPrefix + "bootstrap.servers", kafkaSpoutConfig.getKafkaProps().get("bootstrap.servers"));
- configuration.put(configKeyPrefix + "security.protocol", kafkaSpoutConfig.getKafkaProps().get("security.protocol"));
+ for (Entry<String, Object> conf: kafkaSpoutConfig.getKafkaProps().entrySet()) {
+ if (conf.getValue() != null && isPrimitiveOrWrapper(conf.getValue().getClass())) {
+ configuration.put(configKeyPrefix + conf.getKey(), conf.getValue());
+ }
+ }
return configuration;
}
+ private boolean isPrimitiveOrWrapper(Class<?> type) {
+ if (type == null) {
+ return false;
+ }
+ return type.isPrimitive() || isWrapper(type);
+ }
+
+ private boolean isWrapper(Class<?> type) {
+ return type == Double.class || type == Float.class || type == Long.class
+ || type == Integer.class || type == Short.class || type == Character.class
+ || type == Byte.class || type == Boolean.class || type == String.class;
+ }
+
+
private String getTopicsString() {
return kafkaSpoutConfig.getTopicFilter().getTopicsString();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fa0b8624/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
index e31fad4..1d53436 100644
--- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
+++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
@@ -114,8 +114,8 @@ public class KafkaOffsetLagUtil {
"consumer/spout e.g. hostname1:9092,hostname2:9092");
options.addOption(OPTION_GROUP_ID_SHORT, OPTION_GROUP_ID_LONG, true, "Group id of consumer");
options.addOption(OPTION_SECURITY_PROTOCOL_SHORT, OPTION_SECURITY_PROTOCOL_LONG, true, "Security protocol to connect to kafka");
- options.addOption(OPTION_CONSUMER_CONFIG_SHORT, OPTION_CONSUMER_CONFIG_LONG, true, "Security configuration file useful "
- + "when connecting to secure kafka");
+ options.addOption(OPTION_CONSUMER_CONFIG_SHORT, OPTION_CONSUMER_CONFIG_LONG, true, "Properties file with additional " +
+ "Kafka consumer properties");
return options;
}
@@ -136,10 +136,10 @@ public class KafkaOffsetLagUtil {
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
if (newKafkaSpoutOffsetQuery.getSecurityProtocol() != null) {
props.put("security.protocol", newKafkaSpoutOffsetQuery.getSecurityProtocol());
- // Read Kafka property file for extra security options
- if (newKafkaSpoutOffsetQuery.getConsumerConfig() != null) {
- props.putAll(Utils.loadProps(newKafkaSpoutOffsetQuery.getConsumerConfig()));
- }
+ }
+ // Read property file for extra consumer properties
+ if (newKafkaSpoutOffsetQuery.getConsumerConfig() != null) {
+ props.putAll(Utils.loadProps(newKafkaSpoutOffsetQuery.getConsumerConfig()));
}
List<TopicPartition> topicPartitionList = new ArrayList<>();
consumer = new KafkaConsumer<>(props);
http://git-wip-us.apache.org/repos/asf/storm/blob/fa0b8624/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
index ce3b22b..adb53d8 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
@@ -17,11 +17,17 @@
package org.apache.storm.utils;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
@@ -41,6 +47,9 @@ public class TopologySpoutLag {
private static final String TOPICS_CONFIG = CONFIG_KEY_PREFIX + "topics";
private static final String GROUPID_CONFIG = CONFIG_KEY_PREFIX + "groupid";
private static final String BOOTSTRAP_CONFIG = CONFIG_KEY_PREFIX + "bootstrap.servers";
+ private static final String SECURITY_PROTOCOL_CONFIG = CONFIG_KEY_PREFIX + "security.protocol";
+ private static final Set<String> ALL_CONFIGS = new HashSet<>(Arrays.asList(TOPICS_CONFIG, GROUPID_CONFIG,
+ BOOTSTRAP_CONFIG, SECURITY_PROTOCOL_CONFIG));
private final static Logger logger = LoggerFactory.getLogger(TopologySpoutLag.class);
public static Map<String, Map<String, Object>> lag(StormTopology stormTopology, Map<String, Object> topologyConf) {
@@ -68,7 +77,7 @@ public class TopologySpoutLag {
commands.add((String) jsonConf.get(GROUPID_CONFIG));
commands.add("-b");
commands.add((String) jsonConf.get(BOOTSTRAP_CONFIG));
- String securityProtocol = (String) jsonConf.get(CONFIG_KEY_PREFIX + "security.protocol");
+ String securityProtocol = (String) jsonConf.get(SECURITY_PROTOCOL_CONFIG);
if (securityProtocol != null && !securityProtocol.isEmpty()) {
commands.add("-s");
commands.add(securityProtocol);
@@ -76,6 +85,30 @@ public class TopologySpoutLag {
return commands;
}
+ private static File getExtraPropertiesFile(Map<String, Object> jsonConf) {
+ File file = null;
+ Map<String, String> extraProperties = new HashMap<>();
+ for (Map.Entry<String, Object> conf: jsonConf.entrySet()) {
+ if (conf.getKey().startsWith(CONFIG_KEY_PREFIX) && !ALL_CONFIGS.contains(conf.getKey())) {
+ extraProperties.put(conf.getKey().substring(CONFIG_KEY_PREFIX.length()), conf.getValue().toString());
+ }
+ }
+ if (!extraProperties.isEmpty()) {
+ try {
+ file = File.createTempFile("kafka-consumer-extra", "props");
+ file.deleteOnExit();
+ Properties properties = new Properties();
+ properties.putAll(extraProperties);
+ try(FileOutputStream fos = new FileOutputStream(file)) {
+ properties.store(fos, "Kafka consumer extra properties");
+ }
+ } catch (IOException ex) {
+ // ignore
+ }
+ }
+ return file;
+ }
+
private static void addLagResultForKafkaSpout(Map<String, Map<String, Object>> finalResult, String spoutId, SpoutSpec spoutSpec)
throws IOException {
ComponentCommon componentCommon = spoutSpec.get_common();
@@ -116,18 +149,29 @@ public class TopologySpoutLag {
}
commands.addAll(getCommandLineOptionsForNewKafkaSpout(jsonMap));
+ File extraPropertiesFile = getExtraPropertiesFile(jsonMap);
+ if (extraPropertiesFile != null) {
+ commands.add("-c");
+ commands.add(extraPropertiesFile.getAbsolutePath());
+ }
logger.debug("Command to run: {}", commands);
// if commands contains one or more null value, spout is compiled with lower version of storm-kafka-client
if (!commands.contains(null)) {
- String resultFromMonitor = new ShellCommandRunnerImpl().execCommand(commands.toArray(new String[0]));
-
try {
- result = (Map<String, Object>) JSONValue.parseWithException(resultFromMonitor);
- } catch (ParseException e) {
- logger.debug("JSON parsing failed, assuming message as error message: {}", resultFromMonitor);
- // json parsing fail -> error received
- errorMsg = resultFromMonitor;
+ String resultFromMonitor = new ShellCommandRunnerImpl().execCommand(commands.toArray(new String[0]));
+
+ try {
+ result = (Map<String, Object>) JSONValue.parseWithException(resultFromMonitor);
+ } catch (ParseException e) {
+ logger.debug("JSON parsing failed, assuming message as error message: {}", resultFromMonitor);
+ // json parsing fail -> error received
+ errorMsg = resultFromMonitor;
+ }
+ } finally {
+ if (extraPropertiesFile != null) {
+ extraPropertiesFile.delete();
+ }
}
}
}