You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/11/15 23:16:26 UTC

[1/4] storm git commit: STORM-3123 - add support for Kafka security config in storm-kafka-monitor

Repository: storm
Updated Branches:
  refs/heads/master f17b3dad8 -> 29eb449ee


STORM-3123 - add support for Kafka security config in storm-kafka-monitor


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/40e24ce4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/40e24ce4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/40e24ce4

Branch: refs/heads/master
Commit: 40e24ce45a7744e2d24d4e8d3f6f146372c57824
Parents: 98ed0a8
Author: Vipin Rathor <v....@gmail.com>
Authored: Wed Jul 11 17:01:36 2018 -0700
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Mon Nov 12 18:12:18 2018 -0800

----------------------------------------------------------------------
 .../storm/kafka/monitor/KafkaOffsetLagUtil.java       | 14 ++++++++++++--
 .../storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java |  9 ++++++++-
 2 files changed, 20 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/40e24ce4/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
index 78b6993..e31fad4 100644
--- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
+++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
@@ -33,6 +33,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
 import org.json.simple.JSONValue;
 
 /**
@@ -47,6 +48,8 @@ public class KafkaOffsetLagUtil {
     private static final String OPTION_GROUP_ID_LONG = "groupid";
     private static final String OPTION_SECURITY_PROTOCOL_SHORT = "s";
     private static final String OPTION_SECURITY_PROTOCOL_LONG = "security-protocol";
+    private static final String OPTION_CONSUMER_CONFIG_SHORT = "c";
+    private static final String OPTION_CONSUMER_CONFIG_LONG = "consumer-config";
 
     public static void main(String args[]) {
         try {
@@ -63,7 +66,8 @@ public class KafkaOffsetLagUtil {
             NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery =
                 new NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
                     commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG),
-                    commandLine.getOptionValue(OPTION_GROUP_ID_LONG), securityProtocol);
+                    commandLine.getOptionValue(OPTION_GROUP_ID_LONG), securityProtocol,
+                    commandLine.getOptionValue(OPTION_CONSUMER_CONFIG_LONG));
             List<KafkaOffsetLagResult> results = getOffsetLags(newKafkaSpoutOffsetQuery);
 
             Map<String, Map<Integer, KafkaPartitionOffsetLag>> keyedResult = keyByTopicAndPartition(results);
@@ -110,6 +114,8 @@ public class KafkaOffsetLagUtil {
                           "consumer/spout e.g. hostname1:9092,hostname2:9092");
         options.addOption(OPTION_GROUP_ID_SHORT, OPTION_GROUP_ID_LONG, true, "Group id of consumer");
         options.addOption(OPTION_SECURITY_PROTOCOL_SHORT, OPTION_SECURITY_PROTOCOL_LONG, true, "Security protocol to connect to kafka");
+        options.addOption(OPTION_CONSUMER_CONFIG_SHORT, OPTION_CONSUMER_CONFIG_LONG, true, "Security configuration file useful "
+                          + "when connecting to secure kafka");
         return options;
     }
 
@@ -117,7 +123,7 @@ public class KafkaOffsetLagUtil {
      * @param newKafkaSpoutOffsetQuery represents the information needed to query kafka for log head and spout offsets
      * @return log head offset, spout offset and lag for each partition
      */
-    public static List<KafkaOffsetLagResult> getOffsetLags(NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery) {
+    public static List<KafkaOffsetLagResult> getOffsetLags(NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery) throws Exception {
         KafkaConsumer<String, String> consumer = null;
         List<KafkaOffsetLagResult> result = new ArrayList<>();
         try {
@@ -130,6 +136,10 @@ public class KafkaOffsetLagUtil {
             props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
             if (newKafkaSpoutOffsetQuery.getSecurityProtocol() != null) {
                 props.put("security.protocol", newKafkaSpoutOffsetQuery.getSecurityProtocol());
+                // Read Kafka property file for extra security options
+                if (newKafkaSpoutOffsetQuery.getConsumerConfig() != null) {
+                    props.putAll(Utils.loadProps(newKafkaSpoutOffsetQuery.getConsumerConfig()));
+                }
             }
             List<TopicPartition> topicPartitionList = new ArrayList<>();
             consumer = new KafkaConsumer<>(props);

http://git-wip-us.apache.org/repos/asf/storm/blob/40e24ce4/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
index e6c4524..53df461 100644
--- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
+++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
@@ -27,12 +27,15 @@ public class NewKafkaSpoutOffsetQuery {
     private final String consumerGroupId; // consumer group id for which the offset needs to be calculated
     private final String bootStrapBrokers; // bootstrap brokers
     private final String securityProtocol; // security protocol to connect to kafka
+    private final String consumerConfig; // security configuration file to connect to secure kafka
 
-    public NewKafkaSpoutOffsetQuery(String topics, String bootstrapBrokers, String consumerGroupId, String securityProtocol) {
+    public NewKafkaSpoutOffsetQuery(String topics, String bootstrapBrokers, String consumerGroupId, String securityProtocol,
+        String consumerConfig) {
         this.topics = topics;
         this.bootStrapBrokers = bootstrapBrokers;
         this.consumerGroupId = consumerGroupId;
         this.securityProtocol = securityProtocol;
+        this.consumerConfig = consumerConfig;
     }
 
     public String getTopics() {
@@ -51,6 +54,10 @@ public class NewKafkaSpoutOffsetQuery {
         return this.securityProtocol;
     }
 
+    public String getConsumerConfig() {
+        return this.consumerConfig;
+    }
+
     @Override
     public String toString() {
         return "NewKafkaSpoutOffsetQuery{" +


[3/4] storm git commit: STORM-3123: Address review comments

Posted by ka...@apache.org.
STORM-3123: Address review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d8d88374
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d8d88374
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d8d88374

Branch: refs/heads/master
Commit: d8d88374d992a7f3b6455e26fececa2bf50d5165
Parents: fa0b862
Author: Arun Mahadevan <ar...@apache.org>
Authored: Thu Nov 15 10:38:51 2018 -0800
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Thu Nov 15 10:38:58 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/storm/kafka/spout/KafkaSpout.java     |  2 ++
 .../apache/storm/kafka/monitor/KafkaOffsetLagUtil.java    |  4 ++--
 .../storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java     | 10 +++++-----
 .../src/jvm/org/apache/storm/utils/TopologySpoutLag.java  |  4 ++--
 4 files changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d8d88374/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index cc8cb77..7d7a856 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -695,6 +695,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         for (Entry<String, Object> conf: kafkaSpoutConfig.getKafkaProps().entrySet()) {
             if (conf.getValue() != null && isPrimitiveOrWrapper(conf.getValue().getClass())) {
                 configuration.put(configKeyPrefix + conf.getKey(), conf.getValue());
+            } else {
+                LOG.debug("Dropping Kafka prop '{}' from component configuration", conf.getKey());
             }
         }
         return configuration;

http://git-wip-us.apache.org/repos/asf/storm/blob/d8d88374/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
index 1d53436..398ebae 100644
--- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
+++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
@@ -138,8 +138,8 @@ public class KafkaOffsetLagUtil {
                 props.put("security.protocol", newKafkaSpoutOffsetQuery.getSecurityProtocol());
             }
             // Read property file for extra consumer properties
-            if (newKafkaSpoutOffsetQuery.getConsumerConfig() != null) {
-                props.putAll(Utils.loadProps(newKafkaSpoutOffsetQuery.getConsumerConfig()));
+            if (newKafkaSpoutOffsetQuery.getConsumerPropertiesFileName() != null) {
+                props.putAll(Utils.loadProps(newKafkaSpoutOffsetQuery.getConsumerPropertiesFileName()));
             }
             List<TopicPartition> topicPartitionList = new ArrayList<>();
             consumer = new KafkaConsumer<>(props);

http://git-wip-us.apache.org/repos/asf/storm/blob/d8d88374/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
index 53df461..0327ea0 100644
--- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
+++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
@@ -27,15 +27,15 @@ public class NewKafkaSpoutOffsetQuery {
     private final String consumerGroupId; // consumer group id for which the offset needs to be calculated
     private final String bootStrapBrokers; // bootstrap brokers
     private final String securityProtocol; // security protocol to connect to kafka
-    private final String consumerConfig; // security configuration file to connect to secure kafka
+    private final String consumerPropertiesFileName; // properties file containing additional kafka consumer configs
 
     public NewKafkaSpoutOffsetQuery(String topics, String bootstrapBrokers, String consumerGroupId, String securityProtocol,
-        String consumerConfig) {
+        String consumerPropertiesFileName) {
         this.topics = topics;
         this.bootStrapBrokers = bootstrapBrokers;
         this.consumerGroupId = consumerGroupId;
         this.securityProtocol = securityProtocol;
-        this.consumerConfig = consumerConfig;
+        this.consumerPropertiesFileName = consumerPropertiesFileName;
     }
 
     public String getTopics() {
@@ -54,8 +54,8 @@ public class NewKafkaSpoutOffsetQuery {
         return this.securityProtocol;
     }
 
-    public String getConsumerConfig() {
-        return this.consumerConfig;
+    public String getConsumerPropertiesFileName() {
+        return this.consumerPropertiesFileName;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/d8d88374/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
index adb53d8..eb4e4b6 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
@@ -85,7 +85,7 @@ public class TopologySpoutLag {
         return commands;
     }
 
-    private static File getExtraPropertiesFile(Map<String, Object> jsonConf) {
+    private static File createExtraPropertiesFile(Map<String, Object> jsonConf) {
         File file = null;
         Map<String, String> extraProperties = new HashMap<>();
         for (Map.Entry<String, Object> conf: jsonConf.entrySet()) {
@@ -149,7 +149,7 @@ public class TopologySpoutLag {
             }
             commands.addAll(getCommandLineOptionsForNewKafkaSpout(jsonMap));
 
-            File extraPropertiesFile = getExtraPropertiesFile(jsonMap);
+            File extraPropertiesFile = createExtraPropertiesFile(jsonMap);
             if (extraPropertiesFile != null) {
                 commands.add("-c");
                 commands.add(extraPropertiesFile.getAbsolutePath());


[4/4] storm git commit: Merge branch 'STORM-3123' of https://github.com/arunmahadevan/storm into STORM-3123

Posted by ka...@apache.org.
Merge branch 'STORM-3123' of https://github.com/arunmahadevan/storm into STORM-3123


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/29eb449e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/29eb449e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/29eb449e

Branch: refs/heads/master
Commit: 29eb449ee2518d4d2f750fa32b667b671912cbad
Parents: f17b3da d8d8837
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
Authored: Fri Nov 16 08:14:47 2018 +0900
Committer: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
Committed: Fri Nov 16 08:14:47 2018 +0900

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    | 23 +++++++-
 .../storm/kafka/monitor/KafkaOffsetLagUtil.java | 14 ++++-
 .../kafka/monitor/NewKafkaSpoutOffsetQuery.java |  9 ++-
 .../apache/storm/utils/TopologySpoutLag.java    | 60 +++++++++++++++++---
 4 files changed, 93 insertions(+), 13 deletions(-)
----------------------------------------------------------------------



[2/4] storm git commit: STORM-3123: Changes to return extra properties from KafkaSpout and use it in TopologySpoutLag

Posted by ka...@apache.org.
STORM-3123: Changes to return extra properties from KafkaSpout and use it in TopologySpoutLag

Change-Id: Id6e3ce120cc813adbe611d085cd4bc3ebd0ff590


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fa0b8624
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fa0b8624
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fa0b8624

Branch: refs/heads/master
Commit: fa0b8624492630d0f5c3c89e6bf7a5d20ac59d8b
Parents: 40e24ce
Author: Arun Mahadevan <ar...@apache.org>
Authored: Mon Nov 12 11:19:31 2018 -0800
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Tue Nov 13 00:38:50 2018 -0800

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    | 21 ++++++-
 .../storm/kafka/monitor/KafkaOffsetLagUtil.java | 12 ++--
 .../apache/storm/utils/TopologySpoutLag.java    | 60 +++++++++++++++++---
 3 files changed, 77 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fa0b8624/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 1ee0a5c..cc8cb77 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -692,11 +692,28 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         configuration.put(configKeyPrefix + "topics", getTopicsString());
 
         configuration.put(configKeyPrefix + "groupid", kafkaSpoutConfig.getConsumerGroupId());
-        configuration.put(configKeyPrefix + "bootstrap.servers", kafkaSpoutConfig.getKafkaProps().get("bootstrap.servers"));
-        configuration.put(configKeyPrefix + "security.protocol", kafkaSpoutConfig.getKafkaProps().get("security.protocol"));
+        for (Entry<String, Object> conf: kafkaSpoutConfig.getKafkaProps().entrySet()) {
+            if (conf.getValue() != null && isPrimitiveOrWrapper(conf.getValue().getClass())) {
+                configuration.put(configKeyPrefix + conf.getKey(), conf.getValue());
+            }
+        }
         return configuration;
     }
 
+    private boolean isPrimitiveOrWrapper(Class<?> type) {
+        if (type == null) {
+            return false;
+        }
+        return type.isPrimitive() || isWrapper(type);
+    }
+
+    private boolean isWrapper(Class<?> type) {
+        return type == Double.class || type == Float.class || type == Long.class
+                || type == Integer.class || type == Short.class || type == Character.class
+                || type == Byte.class || type == Boolean.class || type == String.class;
+    }
+
+
     private String getTopicsString() {
         return kafkaSpoutConfig.getTopicFilter().getTopicsString();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/fa0b8624/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
index e31fad4..1d53436 100644
--- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
+++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
@@ -114,8 +114,8 @@ public class KafkaOffsetLagUtil {
                           "consumer/spout e.g. hostname1:9092,hostname2:9092");
         options.addOption(OPTION_GROUP_ID_SHORT, OPTION_GROUP_ID_LONG, true, "Group id of consumer");
         options.addOption(OPTION_SECURITY_PROTOCOL_SHORT, OPTION_SECURITY_PROTOCOL_LONG, true, "Security protocol to connect to kafka");
-        options.addOption(OPTION_CONSUMER_CONFIG_SHORT, OPTION_CONSUMER_CONFIG_LONG, true, "Security configuration file useful "
-                          + "when connecting to secure kafka");
+        options.addOption(OPTION_CONSUMER_CONFIG_SHORT, OPTION_CONSUMER_CONFIG_LONG, true, "Properties file with additional " +
+                "Kafka consumer properties");
         return options;
     }
 
@@ -136,10 +136,10 @@ public class KafkaOffsetLagUtil {
             props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
             if (newKafkaSpoutOffsetQuery.getSecurityProtocol() != null) {
                 props.put("security.protocol", newKafkaSpoutOffsetQuery.getSecurityProtocol());
-                // Read Kafka property file for extra security options
-                if (newKafkaSpoutOffsetQuery.getConsumerConfig() != null) {
-                    props.putAll(Utils.loadProps(newKafkaSpoutOffsetQuery.getConsumerConfig()));
-                }
+            }
+            // Read property file for extra consumer properties
+            if (newKafkaSpoutOffsetQuery.getConsumerConfig() != null) {
+                props.putAll(Utils.loadProps(newKafkaSpoutOffsetQuery.getConsumerConfig()));
             }
             List<TopicPartition> topicPartitionList = new ArrayList<>();
             consumer = new KafkaConsumer<>(props);

http://git-wip-us.apache.org/repos/asf/storm/blob/fa0b8624/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
index ce3b22b..adb53d8 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
@@ -17,11 +17,17 @@
 package org.apache.storm.utils;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
 import org.apache.storm.generated.ComponentCommon;
 import org.apache.storm.generated.SpoutSpec;
 import org.apache.storm.generated.StormTopology;
@@ -41,6 +47,9 @@ public class TopologySpoutLag {
     private static final String TOPICS_CONFIG = CONFIG_KEY_PREFIX + "topics";
     private static final String GROUPID_CONFIG = CONFIG_KEY_PREFIX + "groupid";
     private static final String BOOTSTRAP_CONFIG = CONFIG_KEY_PREFIX + "bootstrap.servers";
+    private static final String SECURITY_PROTOCOL_CONFIG = CONFIG_KEY_PREFIX + "security.protocol";
+    private static final Set<String> ALL_CONFIGS = new HashSet<>(Arrays.asList(TOPICS_CONFIG, GROUPID_CONFIG,
+            BOOTSTRAP_CONFIG, SECURITY_PROTOCOL_CONFIG));
     private final static Logger logger = LoggerFactory.getLogger(TopologySpoutLag.class);
 
     public static Map<String, Map<String, Object>> lag(StormTopology stormTopology, Map<String, Object> topologyConf) {
@@ -68,7 +77,7 @@ public class TopologySpoutLag {
         commands.add((String) jsonConf.get(GROUPID_CONFIG));
         commands.add("-b");
         commands.add((String) jsonConf.get(BOOTSTRAP_CONFIG));
-        String securityProtocol = (String) jsonConf.get(CONFIG_KEY_PREFIX + "security.protocol");
+        String securityProtocol = (String) jsonConf.get(SECURITY_PROTOCOL_CONFIG);
         if (securityProtocol != null && !securityProtocol.isEmpty()) {
             commands.add("-s");
             commands.add(securityProtocol);
@@ -76,6 +85,30 @@ public class TopologySpoutLag {
         return commands;
     }
 
+    private static File getExtraPropertiesFile(Map<String, Object> jsonConf) {
+        File file = null;
+        Map<String, String> extraProperties = new HashMap<>();
+        for (Map.Entry<String, Object> conf: jsonConf.entrySet()) {
+            if (conf.getKey().startsWith(CONFIG_KEY_PREFIX) && !ALL_CONFIGS.contains(conf.getKey())) {
+                extraProperties.put(conf.getKey().substring(CONFIG_KEY_PREFIX.length()), conf.getValue().toString());
+            }
+        }
+        if (!extraProperties.isEmpty()) {
+            try {
+                file = File.createTempFile("kafka-consumer-extra", "props");
+                file.deleteOnExit();
+                Properties properties = new Properties();
+                properties.putAll(extraProperties);
+                try(FileOutputStream fos = new FileOutputStream(file)) {
+                    properties.store(fos, "Kafka consumer extra properties");
+                }
+            } catch (IOException ex) {
+                // ignore
+            }
+        }
+        return file;
+    }
+
     private static void addLagResultForKafkaSpout(Map<String, Map<String, Object>> finalResult, String spoutId, SpoutSpec spoutSpec)
         throws IOException {
         ComponentCommon componentCommon = spoutSpec.get_common();
@@ -116,18 +149,29 @@ public class TopologySpoutLag {
             }
             commands.addAll(getCommandLineOptionsForNewKafkaSpout(jsonMap));
 
+            File extraPropertiesFile = getExtraPropertiesFile(jsonMap);
+            if (extraPropertiesFile != null) {
+                commands.add("-c");
+                commands.add(extraPropertiesFile.getAbsolutePath());
+            }
             logger.debug("Command to run: {}", commands);
 
             // if commands contains one or more null value, spout is compiled with lower version of storm-kafka-client
             if (!commands.contains(null)) {
-                String resultFromMonitor = new ShellCommandRunnerImpl().execCommand(commands.toArray(new String[0]));
-
                 try {
-                    result = (Map<String, Object>) JSONValue.parseWithException(resultFromMonitor);
-                } catch (ParseException e) {
-                    logger.debug("JSON parsing failed, assuming message as error message: {}", resultFromMonitor);
-                    // json parsing fail -> error received
-                    errorMsg = resultFromMonitor;
+                    String resultFromMonitor = new ShellCommandRunnerImpl().execCommand(commands.toArray(new String[0]));
+
+                    try {
+                        result = (Map<String, Object>) JSONValue.parseWithException(resultFromMonitor);
+                    } catch (ParseException e) {
+                        logger.debug("JSON parsing failed, assuming message as error message: {}", resultFromMonitor);
+                        // json parsing fail -> error received
+                        errorMsg = resultFromMonitor;
+                    }
+                } finally {
+                    if (extraPropertiesFile != null) {
+                        extraPropertiesFile.delete();
+                    }
                 }
             }
         }