You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/11/15 23:16:28 UTC

[3/4] storm git commit: STORM-3123: Address review comments

STORM-3123: Address review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d8d88374
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d8d88374
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d8d88374

Branch: refs/heads/master
Commit: d8d88374d992a7f3b6455e26fececa2bf50d5165
Parents: fa0b862
Author: Arun Mahadevan <ar...@apache.org>
Authored: Thu Nov 15 10:38:51 2018 -0800
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Thu Nov 15 10:38:58 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/storm/kafka/spout/KafkaSpout.java     |  2 ++
 .../apache/storm/kafka/monitor/KafkaOffsetLagUtil.java    |  4 ++--
 .../storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java     | 10 +++++-----
 .../src/jvm/org/apache/storm/utils/TopologySpoutLag.java  |  4 ++--
 4 files changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d8d88374/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index cc8cb77..7d7a856 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -695,6 +695,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         for (Entry<String, Object> conf: kafkaSpoutConfig.getKafkaProps().entrySet()) {
             if (conf.getValue() != null && isPrimitiveOrWrapper(conf.getValue().getClass())) {
                 configuration.put(configKeyPrefix + conf.getKey(), conf.getValue());
+            } else {
+                LOG.debug("Dropping Kafka prop '{}' from component configuration", conf.getKey());
             }
         }
         return configuration;

http://git-wip-us.apache.org/repos/asf/storm/blob/d8d88374/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
index 1d53436..398ebae 100644
--- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
+++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
@@ -138,8 +138,8 @@ public class KafkaOffsetLagUtil {
                 props.put("security.protocol", newKafkaSpoutOffsetQuery.getSecurityProtocol());
             }
             // Read property file for extra consumer properties
-            if (newKafkaSpoutOffsetQuery.getConsumerConfig() != null) {
-                props.putAll(Utils.loadProps(newKafkaSpoutOffsetQuery.getConsumerConfig()));
+            if (newKafkaSpoutOffsetQuery.getConsumerPropertiesFileName() != null) {
+                props.putAll(Utils.loadProps(newKafkaSpoutOffsetQuery.getConsumerPropertiesFileName()));
             }
             List<TopicPartition> topicPartitionList = new ArrayList<>();
             consumer = new KafkaConsumer<>(props);

http://git-wip-us.apache.org/repos/asf/storm/blob/d8d88374/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
index 53df461..0327ea0 100644
--- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
+++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
@@ -27,15 +27,15 @@ public class NewKafkaSpoutOffsetQuery {
     private final String consumerGroupId; // consumer group id for which the offset needs to be calculated
     private final String bootStrapBrokers; // bootstrap brokers
     private final String securityProtocol; // security protocol to connect to kafka
-    private final String consumerConfig; // security configuration file to connect to secure kafka
+    private final String consumerPropertiesFileName; // properties file containing additional kafka consumer configs
 
     public NewKafkaSpoutOffsetQuery(String topics, String bootstrapBrokers, String consumerGroupId, String securityProtocol,
-        String consumerConfig) {
+        String consumerPropertiesFileName) {
         this.topics = topics;
         this.bootStrapBrokers = bootstrapBrokers;
         this.consumerGroupId = consumerGroupId;
         this.securityProtocol = securityProtocol;
-        this.consumerConfig = consumerConfig;
+        this.consumerPropertiesFileName = consumerPropertiesFileName;
     }
 
     public String getTopics() {
@@ -54,8 +54,8 @@ public class NewKafkaSpoutOffsetQuery {
         return this.securityProtocol;
     }
 
-    public String getConsumerConfig() {
-        return this.consumerConfig;
+    public String getConsumerPropertiesFileName() {
+        return this.consumerPropertiesFileName;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/d8d88374/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
index adb53d8..eb4e4b6 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
@@ -85,7 +85,7 @@ public class TopologySpoutLag {
         return commands;
     }
 
-    private static File getExtraPropertiesFile(Map<String, Object> jsonConf) {
+    private static File createExtraPropertiesFile(Map<String, Object> jsonConf) {
         File file = null;
         Map<String, String> extraProperties = new HashMap<>();
         for (Map.Entry<String, Object> conf: jsonConf.entrySet()) {
@@ -149,7 +149,7 @@ public class TopologySpoutLag {
             }
             commands.addAll(getCommandLineOptionsForNewKafkaSpout(jsonMap));
 
-            File extraPropertiesFile = getExtraPropertiesFile(jsonMap);
+            File extraPropertiesFile = createExtraPropertiesFile(jsonMap);
             if (extraPropertiesFile != null) {
                 commands.add("-c");
                 commands.add(extraPropertiesFile.getAbsolutePath());