You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by srdo <gi...@git.apache.org> on 2017/06/10 10:53:30 UTC

[GitHub] storm pull request #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid dup...

GitHub user srdo opened a pull request:

    https://github.com/apache/storm/pull/2155

    STORM-2548: Simplify KafkaSpoutConfig to avoid duplicating KafkaConsu…

    …mer configuration parameters
    
    This is put up for discussion, since a few people on the mailing list expressed a desire for a simpler KafkaSpoutConfig.
    
    @harshach and @priyank5485 I think this is something like what you were asking for?

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/srdo/storm STORM-2548

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2155.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2155
    
----
commit 5b3d0e1523dda680c024f4cc6d65b651abf56984
Author: Stig Rohde Døssing <st...@gmail.com>
Date:   2017-06-10T10:49:53Z

    STORM-2548: Simplify KafkaSpoutConfig to avoid duplicating KafkaConsumer configuration parameters

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid dup...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2155#discussion_r124580202
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -116,217 +141,57 @@
             private boolean emitNullTuples = false;
     
             public Builder(String bootstrapServers, String ... topics) {
    -            this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new NamedSubscription(topics));
    -        }
    -
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String ... topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -            SerializableDeserializer<V> valDes, Collection<String> topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -            SerializableDeserializer<V> valDes, Pattern topics) {
    -            this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
    +            this(bootstrapServers, new NamedSubscription(topics));
             }
             
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -            SerializableDeserializer<V> valDes, Subscription subscription) {
    -            this(bootstrapServers, keyDes, null, valDes, null, subscription);
    +        public Builder(String bootstrapServers, Collection<String> topics) {
    +            this(bootstrapServers, new NamedSubscription(topics));
             }
             
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, String ... topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    +        public Builder(String bootstrapServers, Pattern topics) {
    +            this(bootstrapServers, new PatternSubscription(topics));
             }
             
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, Collection<String> topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, Pattern topics) {
    -            this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, Subscription subscription) {
    -            this(bootstrapServers, null, keyDes, null, valDes, subscription);
    -        }
    -        
    -        private Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -                Class<? extends Deserializer<K>> keyDesClazz,
    -                SerializableDeserializer<V> valDes, Class<? extends Deserializer<V>> valDesClazz, Subscription subscription) {
    +        /**
    +         * Create a KafkaSpoutConfig builder.
    --- End diff --
    
    Create a KafkaSpoutConfig builder with default property values. Properties can be overridden with the respective builder set methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid dup...

Posted by priyank5485 <gi...@git.apache.org>.
Github user priyank5485 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2155#discussion_r121530308
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -79,16 +78,46 @@
             UNCOMMITTED_EARLIEST,
             UNCOMMITTED_LATEST }
         
    +    /**
    +     * Convenience method to get a Builder for String key/value spouts. Sets the key/value Deserializers to the StringDeserializer class.
    +     * Users who need different deserializers should use the Builder constructors instead, 
    +     * and set the deserializer classes via {@link Builder#setProp(java.lang.String, java.lang.Object)}
    +     * @param bootstrapServers The bootstrap servers for the consumer
    +     * @param topics The topics to subscribe to
    +     * @return The new builder
    +     */
         public static Builder<String, String> builder(String bootstrapServers, String ... topics) {
    -        return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics);
    +        return setDefaultStringDeserializers(new Builder<>(bootstrapServers, topics));
         }
         
    +    /**
    +     * Convenience method to get a Builder for String key/value spouts. Sets the key/value Deserializers to the StringDeserializer class.
    +     * Users who need different deserializers should use the Builder constructors instead, 
    +     * and set the deserializer classes via {@link Builder#setProp(java.lang.String, java.lang.Object)}
    +     * @param bootstrapServers The bootstrap servers for the consumer
    +     * @param topics The topics to subscribe to
    +     * @return The new builder
    +     */
         public static Builder<String, String> builder(String bootstrapServers, Collection<String> topics) {
    -        return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics);
    +        return setDefaultStringDeserializers(new Builder<>(bootstrapServers, topics));
         }
         
    +    /**
    +     * Convenience method to get a Builder for String key/value spouts. Sets the key/value Deserializers to the StringDeserializer class.
    +     * Users who need different deserializers should use the Builder constructors instead, 
    +     * and set the deserializer classes via {@link Builder#setProp(java.lang.String, java.lang.Object)}
    +     * @param bootstrapServers The bootstrap servers for the consumer
    +     * @param topics The topic pattern to subscribe to
    +     * @return The new builder
    +     */
         public static Builder<String, String> builder(String bootstrapServers, Pattern topics) {
    -        return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics);
    +        return setDefaultStringDeserializers(new Builder<>(bootstrapServers, topics));
    +    }
    +    
    +    private static Builder<String, String> setDefaultStringDeserializers(Builder<String, String> builder) {
    +        builder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    --- End diff --
    
    I think ByteArrayDeserializer makes more sense. Not a strong preference though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid duplicatin...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2155
  
    @hmcl Addressed your comments, and updated the README to reflect the API changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid duplicatin...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/2155
  
    @srdo I agree with you concerning this [comment](https://github.com/apache/storm/pull/2155#issuecomment-311751331). There is really not a much better way to do this. I agree with your comment "move the FirstPollOffsetStrategy default into a static field in KafkaSpoutConfig like the rest for consistency" - let's do it.
    
    I think that what confused me is the way the whole class is organized. For instance, the state of KafkaSpoutConfig is not at the top of the class as one would expect. Perhaps we should follow the usual convention and move the [constructor and fields](https://github.com/srdo/storm/blob/142d1f8e2609d1a48b6be71084987fe8cdc46a3f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L302-#L331) to the top, after the constants, then put the static inner classes ([here](https://github.com/srdo/storm/blob/142d1f8e2609d1a48b6be71084987fe8cdc46a3f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L59-#L79) and [here](https://github.com/srdo/storm/blob/142d1f8e2609d1a48b6be71084987fe8cdc46a3f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L125-#L300)), then the [factory methods](https://github.com/srdo/storm/blob/142d1f8e2609d1a48b6be71084987fe8cdc46a3f/external/st
 orm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L81-#L123), and at last all the [accessor methods](https://github.com/srdo/storm/blob/142d1f8e2609d1a48b6be71084987fe8cdc46a3f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L333-#L398).
    
    On a different note, I noticed that this PR now has the changes from the [STORM-2541 PR](https://github.com/apache/storm/pull/2150) included in its single commit. I think that this PR should have only one commit, with the refactoring of KafkaSpoutConfig. The [STORM-2541 PR](https://github.com/apache/storm/pull/2150) should have two commits, this PR's commit, and it's commit. That would make it easy to merge and the same time not loose Git history.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid dup...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2155#discussion_r124601205
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -79,16 +78,46 @@
             UNCOMMITTED_EARLIEST,
             UNCOMMITTED_LATEST }
         
    +    /**
    +     * Convenience method to get a Builder for String key/value spouts. Sets the key/value Deserializers to the StringDeserializer class.
    --- End diff --
    
    Sure, will shorten this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid duplicatin...

Posted by priyank5485 <gi...@git.apache.org>.
Github user priyank5485 commented on the issue:

    https://github.com/apache/storm/pull/2155
  
    +1(NB) given that people prefer keeping the Builder class around.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid duplicatin...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2155
  
    There were some +1s earlier, but this branch has changed a bunch since. I'd appreciate if someone could give it a skim and check that it's still good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid duplicatin...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2155
  
    I'll squash the commits soon, I just wanted people to have a chance to review the changes without having to read the entire diff again


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid dup...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2155#discussion_r124586447
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -116,217 +141,57 @@
             private boolean emitNullTuples = false;
     
             public Builder(String bootstrapServers, String ... topics) {
    -            this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new NamedSubscription(topics));
    -        }
    -
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String ... topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -            SerializableDeserializer<V> valDes, Collection<String> topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -            SerializableDeserializer<V> valDes, Pattern topics) {
    -            this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
    +            this(bootstrapServers, new NamedSubscription(topics));
             }
             
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -            SerializableDeserializer<V> valDes, Subscription subscription) {
    -            this(bootstrapServers, keyDes, null, valDes, null, subscription);
    +        public Builder(String bootstrapServers, Collection<String> topics) {
    +            this(bootstrapServers, new NamedSubscription(topics));
             }
             
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, String ... topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    +        public Builder(String bootstrapServers, Pattern topics) {
    +            this(bootstrapServers, new PatternSubscription(topics));
             }
             
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, Collection<String> topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, Pattern topics) {
    -            this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, Subscription subscription) {
    -            this(bootstrapServers, null, keyDes, null, valDes, subscription);
    -        }
    -        
    -        private Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -                Class<? extends Deserializer<K>> keyDesClazz,
    -                SerializableDeserializer<V> valDes, Class<? extends Deserializer<V>> valDesClazz, Subscription subscription) {
    +        /**
    +         * Create a KafkaSpoutConfig builder.
    +         * @param bootstrapServers The bootstrap servers the consumer will use
    +         * @param subscription The subscription defining which topics and partitions each spout instance will read.
    +         */
    +        public Builder(String bootstrapServers, Subscription subscription) {
                 kafkaProps = new HashMap<>();
                 if (bootstrapServers == null || bootstrapServers.isEmpty()) {
                     throw new IllegalArgumentException("bootstrap servers cannot be null");
                 }
                 kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    -            this.keyDes = keyDes;
    -            this.keyDesClazz = keyDesClazz;
    -            this.valueDes = valDes;
    -            this.valueDesClazz = valDesClazz;
                 this.subscription = subscription;
    -            this.translator = new DefaultRecordTranslator<K,V>();
    -        }
    -
    -        private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
    -                SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz) {
    -            this.kafkaProps = new HashMap<>(builder.kafkaProps);
    -            this.subscription = builder.subscription;
    -            this.pollTimeoutMs = builder.pollTimeoutMs;
    -            this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
    -            this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
    -            this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
    -            //this could result in a lot of class case exceptions at runtime,
    -            // but because some translators will work no matter what the generics
    -            // are I thought it best not to force someone to reset the translator
    -            // when they change the key/value types.
    -            this.translator = (RecordTranslator<K, V>) builder.translator;
    -            this.retryService = builder.retryService;
    -            this.keyDes = keyDes;
    -            this.keyDesClazz = keyDesClazz;
    -            this.valueDes = valueDes;
    -            this.valueDesClazz = valueDesClazz;
    -        }
    -
    -        /**
    -         * Specifying this key deserializer overrides the property key.deserializer. If you have
    -         * set a custom RecordTranslator before calling this it may result in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewKeyT> Builder<NewKeyT,V> setKey(SerializableDeserializer<NewKeyT> keyDeserializer) {
    -            return new Builder<>(this, keyDeserializer, null, valueDes, valueDesClazz);
    -        }
    -        
    -        /**
    -         * Specify a class that can be instantiated to create a key.deserializer
    -         * This is the same as setting key.deserializer, but overrides it. If you have
    -         * set a custom RecordTranslator before calling this it may result in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewKeyT> Builder<NewKeyT, V> setKey(Class<? extends Deserializer<NewKeyT>> clazz) {
    -            return new Builder<>(this, null, clazz, valueDes, valueDesClazz);
    -        }
    -
    -        /**
    -         * Specifying this value deserializer overrides the property value.deserializer.  If you have
    -         * set a custom RecordTranslator before calling this it may result in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewValueT> Builder<K,NewValueT> setValue(SerializableDeserializer<NewValueT> valueDeserializer) {
    -            return new Builder<>(this, keyDes, keyDesClazz, valueDeserializer, null);
    +            this.translator = new DefaultRecordTranslator<>();
             }
             
             /**
    -         * Specify a class that can be instantiated to create a value.deserializer
    -         * This is the same as setting value.deserializer, but overrides it.  If you have
    -         * set a custom RecordTranslator before calling this it may result in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewValueT> Builder<K,NewValueT> setValue(Class<? extends Deserializer<NewValueT>> clazz) {
    -            return new Builder<>(this, keyDes, keyDesClazz, null, clazz);
    -        }
    -        
    -        /**
    -         * Set a Kafka property config.
    +         * Set a Kafka consumer property config. 
              */
             public Builder<K,V> setProp(String key, Object value) {
                 kafkaProps.put(key, value);
                 return this;
             }
             
             /**
    -         * Set multiple Kafka property configs.
    +         * Set multiple Kafka consumer property configs.
              */
             public Builder<K,V> setProp(Map<String, Object> props) {
                 kafkaProps.putAll(props);
                 return this;
             }
             
             /**
    -         * Set multiple Kafka property configs.
    +         * Set multiple Kafka consumer property configs.
    --- End diff --
    
    Set multiple Kafka consumer properties


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid duplicatin...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2155
  
    Actually you're right, building them on each other would be faster. I'll do as you suggested


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid duplicatin...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/2155
  
    @srdo Thanks for your diligence and awesome work refactoring this code. It just made it much better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid dup...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2155


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid duplicatin...

Posted by priyank5485 <gi...@git.apache.org>.
Github user priyank5485 commented on the issue:

    https://github.com/apache/storm/pull/2155
  
    @srdo Overall, this looks good. But I also think we should get rid of the Builder class totally. Reason is that it does not work well with flux yaml since you create the object using builder.build(). Let me know your thoughts on that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid dup...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2155#discussion_r124600901
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -116,217 +141,57 @@
             private boolean emitNullTuples = false;
     
             public Builder(String bootstrapServers, String ... topics) {
    -            this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new NamedSubscription(topics));
    -        }
    -
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String ... topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -            SerializableDeserializer<V> valDes, Collection<String> topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -            SerializableDeserializer<V> valDes, Pattern topics) {
    -            this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
    +            this(bootstrapServers, new NamedSubscription(topics));
             }
             
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -            SerializableDeserializer<V> valDes, Subscription subscription) {
    -            this(bootstrapServers, keyDes, null, valDes, null, subscription);
    +        public Builder(String bootstrapServers, Collection<String> topics) {
    +            this(bootstrapServers, new NamedSubscription(topics));
             }
             
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, String ... topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    +        public Builder(String bootstrapServers, Pattern topics) {
    +            this(bootstrapServers, new PatternSubscription(topics));
             }
             
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, Collection<String> topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, Pattern topics) {
    -            this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, Subscription subscription) {
    -            this(bootstrapServers, null, keyDes, null, valDes, subscription);
    -        }
    -        
    -        private Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -                Class<? extends Deserializer<K>> keyDesClazz,
    -                SerializableDeserializer<V> valDes, Class<? extends Deserializer<V>> valDesClazz, Subscription subscription) {
    +        /**
    +         * Create a KafkaSpoutConfig builder.
    +         * @param bootstrapServers The bootstrap servers the consumer will use
    +         * @param subscription The subscription defining which topics and partitions each spout instance will read.
    +         */
    +        public Builder(String bootstrapServers, Subscription subscription) {
                 kafkaProps = new HashMap<>();
                 if (bootstrapServers == null || bootstrapServers.isEmpty()) {
                     throw new IllegalArgumentException("bootstrap servers cannot be null");
                 }
                 kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    -            this.keyDes = keyDes;
    -            this.keyDesClazz = keyDesClazz;
    -            this.valueDes = valDes;
    -            this.valueDesClazz = valDesClazz;
                 this.subscription = subscription;
    -            this.translator = new DefaultRecordTranslator<K,V>();
    -        }
    -
    -        private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
    -                SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz) {
    -            this.kafkaProps = new HashMap<>(builder.kafkaProps);
    -            this.subscription = builder.subscription;
    -            this.pollTimeoutMs = builder.pollTimeoutMs;
    -            this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
    -            this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
    -            this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
    -            //this could result in a lot of class case exceptions at runtime,
    -            // but because some translators will work no matter what the generics
    -            // are I thought it best not to force someone to reset the translator
    -            // when they change the key/value types.
    -            this.translator = (RecordTranslator<K, V>) builder.translator;
    -            this.retryService = builder.retryService;
    -            this.keyDes = keyDes;
    -            this.keyDesClazz = keyDesClazz;
    -            this.valueDes = valueDes;
    -            this.valueDesClazz = valueDesClazz;
    -        }
    -
    -        /**
    -         * Specifying this key deserializer overrides the property key.deserializer. If you have
    -         * set a custom RecordTranslator before calling this it may result in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewKeyT> Builder<NewKeyT,V> setKey(SerializableDeserializer<NewKeyT> keyDeserializer) {
    -            return new Builder<>(this, keyDeserializer, null, valueDes, valueDesClazz);
    -        }
    -        
    -        /**
    -         * Specify a class that can be instantiated to create a key.deserializer
    -         * This is the same as setting key.deserializer, but overrides it. If you have
    -         * set a custom RecordTranslator before calling this it may result in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewKeyT> Builder<NewKeyT, V> setKey(Class<? extends Deserializer<NewKeyT>> clazz) {
    -            return new Builder<>(this, null, clazz, valueDes, valueDesClazz);
    -        }
    -
    -        /**
    -         * Specifying this value deserializer overrides the property value.deserializer.  If you have
    -         * set a custom RecordTranslator before calling this it may result in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewValueT> Builder<K,NewValueT> setValue(SerializableDeserializer<NewValueT> valueDeserializer) {
    -            return new Builder<>(this, keyDes, keyDesClazz, valueDeserializer, null);
    +            this.translator = new DefaultRecordTranslator<>();
             }
             
             /**
    -         * Specify a class that can be instantiated to create a value.deserializer
    -         * This is the same as setting value.deserializer, but overrides it.  If you have
    -         * set a custom RecordTranslator before calling this it may result in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewValueT> Builder<K,NewValueT> setValue(Class<? extends Deserializer<NewValueT>> clazz) {
    -            return new Builder<>(this, keyDes, keyDesClazz, null, clazz);
    -        }
    -        
    -        /**
    -         * Set a Kafka property config.
    +         * Set a Kafka consumer property config. 
              */
             public Builder<K,V> setProp(String key, Object value) {
                 kafkaProps.put(key, value);
                 return this;
             }
             
             /**
    -         * Set multiple Kafka property configs.
    +         * Set multiple Kafka consumer property configs.
              */
             public Builder<K,V> setProp(Map<String, Object> props) {
                 kafkaProps.putAll(props);
                 return this;
             }
             
             /**
    -         * Set multiple Kafka property configs.
    +         * Set multiple Kafka consumer property configs.
              */
             public Builder<K,V> setProp(Properties props) {
                 for (String name: props.stringPropertyNames()) {
    --- End diff --
    
    Nice catch. How about we use putAll to put the properties into kafkaProps instead? I'm not sure that I like the method filtering the input properties, I'd rather throw an error if the key is not a String. At the same time I don't think it should be KafkaSpoutConfig's job to validate KafkaConsumer parameters. The KafkaConsumer will handle throwing the error if the keys are not Strings https://github.com/apache/kafka/blob/efb060c57f05d1d586bb14c016b0187c60f8e994/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L60


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid duplicatin...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/2155
  
    Yeah... I also think that's the ideal way to do it. Squash at the end and have a new commit addressing each batch of code review comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid duplicatin...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/2155
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid dup...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2155#discussion_r124586375
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -116,217 +141,57 @@
             private boolean emitNullTuples = false;
     
             public Builder(String bootstrapServers, String ... topics) {
    -            this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new NamedSubscription(topics));
    -        }
    -
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String ... topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -            SerializableDeserializer<V> valDes, Collection<String> topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -            SerializableDeserializer<V> valDes, Pattern topics) {
    -            this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
    +            this(bootstrapServers, new NamedSubscription(topics));
             }
             
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -            SerializableDeserializer<V> valDes, Subscription subscription) {
    -            this(bootstrapServers, keyDes, null, valDes, null, subscription);
    +        public Builder(String bootstrapServers, Collection<String> topics) {
    +            this(bootstrapServers, new NamedSubscription(topics));
             }
             
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, String ... topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    +        public Builder(String bootstrapServers, Pattern topics) {
    +            this(bootstrapServers, new PatternSubscription(topics));
             }
             
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, Collection<String> topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, Pattern topics) {
    -            this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, Subscription subscription) {
    -            this(bootstrapServers, null, keyDes, null, valDes, subscription);
    -        }
    -        
    -        private Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -                Class<? extends Deserializer<K>> keyDesClazz,
    -                SerializableDeserializer<V> valDes, Class<? extends Deserializer<V>> valDesClazz, Subscription subscription) {
    +        /**
    +         * Create a KafkaSpoutConfig builder.
    +         * @param bootstrapServers The bootstrap servers the consumer will use
    +         * @param subscription The subscription defining which topics and partitions each spout instance will read.
    +         */
    +        public Builder(String bootstrapServers, Subscription subscription) {
                 kafkaProps = new HashMap<>();
                 if (bootstrapServers == null || bootstrapServers.isEmpty()) {
                     throw new IllegalArgumentException("bootstrap servers cannot be null");
                 }
                 kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    -            this.keyDes = keyDes;
    -            this.keyDesClazz = keyDesClazz;
    -            this.valueDes = valDes;
    -            this.valueDesClazz = valDesClazz;
                 this.subscription = subscription;
    -            this.translator = new DefaultRecordTranslator<K,V>();
    -        }
    -
    -        private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
    -                SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz) {
    -            this.kafkaProps = new HashMap<>(builder.kafkaProps);
    -            this.subscription = builder.subscription;
    -            this.pollTimeoutMs = builder.pollTimeoutMs;
    -            this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
    -            this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
    -            this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
    -            //this could result in a lot of class case exceptions at runtime,
    -            // but because some translators will work no matter what the generics
    -            // are I thought it best not to force someone to reset the translator
    -            // when they change the key/value types.
    -            this.translator = (RecordTranslator<K, V>) builder.translator;
    -            this.retryService = builder.retryService;
    -            this.keyDes = keyDes;
    -            this.keyDesClazz = keyDesClazz;
    -            this.valueDes = valueDes;
    -            this.valueDesClazz = valueDesClazz;
    -        }
    -
    -        /**
    -         * Specifying this key deserializer overrides the property key.deserializer. If you have
    -         * set a custom RecordTranslator before calling this it may result in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewKeyT> Builder<NewKeyT,V> setKey(SerializableDeserializer<NewKeyT> keyDeserializer) {
    -            return new Builder<>(this, keyDeserializer, null, valueDes, valueDesClazz);
    -        }
    -        
    -        /**
    -         * Specify a class that can be instantiated to create a key.deserializer
    -         * This is the same as setting key.deserializer, but overrides it. If you have
    -         * set a custom RecordTranslator before calling this it may result in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewKeyT> Builder<NewKeyT, V> setKey(Class<? extends Deserializer<NewKeyT>> clazz) {
    -            return new Builder<>(this, null, clazz, valueDes, valueDesClazz);
    -        }
    -
    -        /**
    -         * Specifying this value deserializer overrides the property value.deserializer.  If you have
    -         * set a custom RecordTranslator before calling this it may result in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewValueT> Builder<K,NewValueT> setValue(SerializableDeserializer<NewValueT> valueDeserializer) {
    -            return new Builder<>(this, keyDes, keyDesClazz, valueDeserializer, null);
    +            this.translator = new DefaultRecordTranslator<>();
             }
             
             /**
    -         * Specify a class that can be instantiated to create a value.deserializer
    -         * This is the same as setting value.deserializer, but overrides it.  If you have
    -         * set a custom RecordTranslator before calling this it may result in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewValueT> Builder<K,NewValueT> setValue(Class<? extends Deserializer<NewValueT>> clazz) {
    -            return new Builder<>(this, keyDes, keyDesClazz, null, clazz);
    -        }
    -        
    -        /**
    -         * Set a Kafka property config.
    +         * Set a Kafka consumer property config. 
              */
             public Builder<K,V> setProp(String key, Object value) {
                 kafkaProps.put(key, value);
                 return this;
             }
             
             /**
    -         * Set multiple Kafka property configs.
    +         * Set multiple Kafka consumer property configs.
    --- End diff --
    
    Set multiple Kafka consumer properties


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid duplicatin...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2155
  
    (copying from the mailing list so it is easier to track the discussion if someone reads this later)
    
    With regard to getting rid of the builder pattern, I think it is a pretty nice pattern for Java. It looks to me like it should be possible to declare and configure the builder with "component:", and then pass it to the KafkaSpoutConfig constructor with "ref:" after (which lets you avoid calling build()). Doesn't this work?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid dup...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2155#discussion_r124594357
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -116,217 +141,57 @@
             private boolean emitNullTuples = false;
     
             public Builder(String bootstrapServers, String ... topics) {
    -            this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new NamedSubscription(topics));
    -        }
    -
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String ... topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -            SerializableDeserializer<V> valDes, Collection<String> topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -            SerializableDeserializer<V> valDes, Pattern topics) {
    -            this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
    +            this(bootstrapServers, new NamedSubscription(topics));
             }
             
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -            SerializableDeserializer<V> valDes, Subscription subscription) {
    -            this(bootstrapServers, keyDes, null, valDes, null, subscription);
    +        public Builder(String bootstrapServers, Collection<String> topics) {
    +            this(bootstrapServers, new NamedSubscription(topics));
             }
             
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, String ... topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    +        public Builder(String bootstrapServers, Pattern topics) {
    +            this(bootstrapServers, new PatternSubscription(topics));
             }
             
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, Collection<String> topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, Pattern topics) {
    -            this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, Subscription subscription) {
    -            this(bootstrapServers, null, keyDes, null, valDes, subscription);
    -        }
    -        
    -        private Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -                Class<? extends Deserializer<K>> keyDesClazz,
    -                SerializableDeserializer<V> valDes, Class<? extends Deserializer<V>> valDesClazz, Subscription subscription) {
    +        /**
    +         * Create a KafkaSpoutConfig builder.
    +         * @param bootstrapServers The bootstrap servers the consumer will use
    +         * @param subscription The subscription defining which topics and partitions each spout instance will read.
    +         */
    +        public Builder(String bootstrapServers, Subscription subscription) {
                 kafkaProps = new HashMap<>();
                 if (bootstrapServers == null || bootstrapServers.isEmpty()) {
                     throw new IllegalArgumentException("bootstrap servers cannot be null");
                 }
                 kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    -            this.keyDes = keyDes;
    -            this.keyDesClazz = keyDesClazz;
    -            this.valueDes = valDes;
    -            this.valueDesClazz = valDesClazz;
                 this.subscription = subscription;
    -            this.translator = new DefaultRecordTranslator<K,V>();
    -        }
    -
    -        private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
    -                SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz) {
    -            this.kafkaProps = new HashMap<>(builder.kafkaProps);
    -            this.subscription = builder.subscription;
    -            this.pollTimeoutMs = builder.pollTimeoutMs;
    -            this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
    -            this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
    -            this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
    -            //this could result in a lot of class case exceptions at runtime,
    -            // but because some translators will work no matter what the generics
    -            // are I thought it best not to force someone to reset the translator
    -            // when they change the key/value types.
    -            this.translator = (RecordTranslator<K, V>) builder.translator;
    -            this.retryService = builder.retryService;
    -            this.keyDes = keyDes;
    -            this.keyDesClazz = keyDesClazz;
    -            this.valueDes = valueDes;
    -            this.valueDesClazz = valueDesClazz;
    -        }
    -
    -        /**
    -         * Specifying this key deserializer overrides the property key.deserializer. If you have
    -         * set a custom RecordTranslator before calling this it may result in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewKeyT> Builder<NewKeyT,V> setKey(SerializableDeserializer<NewKeyT> keyDeserializer) {
    -            return new Builder<>(this, keyDeserializer, null, valueDes, valueDesClazz);
    -        }
    -        
    -        /**
    -         * Specify a class that can be instantiated to create a key.deserializer
    -         * This is the same as setting key.deserializer, but overrides it. If you have
    -         * set a custom RecordTranslator before calling this it may result in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewKeyT> Builder<NewKeyT, V> setKey(Class<? extends Deserializer<NewKeyT>> clazz) {
    -            return new Builder<>(this, null, clazz, valueDes, valueDesClazz);
    -        }
    -
    -        /**
    -         * Specifying this value deserializer overrides the property value.deserializer.  If you have
    -         * set a custom RecordTranslator before calling this it may result in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewValueT> Builder<K,NewValueT> setValue(SerializableDeserializer<NewValueT> valueDeserializer) {
    -            return new Builder<>(this, keyDes, keyDesClazz, valueDeserializer, null);
    +            this.translator = new DefaultRecordTranslator<>();
             }
             
             /**
    -         * Specify a class that can be instantiated to create a value.deserializer
    -         * This is the same as setting value.deserializer, but overrides it.  If you have
    -         * set a custom RecordTranslator before calling this it may result in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewValueT> Builder<K,NewValueT> setValue(Class<? extends Deserializer<NewValueT>> clazz) {
    -            return new Builder<>(this, keyDes, keyDesClazz, null, clazz);
    -        }
    -        
    -        /**
    -         * Set a Kafka property config.
    +         * Set a Kafka consumer property config. 
              */
             public Builder<K,V> setProp(String key, Object value) {
                 kafkaProps.put(key, value);
                 return this;
             }
             
             /**
    -         * Set multiple Kafka property configs.
    +         * Set multiple Kafka consumer property configs.
              */
             public Builder<K,V> setProp(Map<String, Object> props) {
                 kafkaProps.putAll(props);
                 return this;
             }
             
             /**
    -         * Set multiple Kafka property configs.
    +         * Set multiple Kafka consumer property configs.
              */
             public Builder<K,V> setProp(Properties props) {
                 for (String name: props.stringPropertyNames()) {
    --- End diff --
    
    This code has one bug as it only picks properties for which both key and value are both strings. It should also pick key/value pairs that are String/Object. I suggest that we refactor this code to be as such:
    
    ```java
        public Builder<K,V> setProp(Properties props) {
                props.forEach((key, value) -> {
                    if (key instanceof String) {
                        kafkaProps.put((String) key, value);
                    }
                });
                return this;
            }
    ```
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid dup...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2155#discussion_r124580583
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -116,217 +141,57 @@
             private boolean emitNullTuples = false;
     
             public Builder(String bootstrapServers, String ... topics) {
    -            this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new NamedSubscription(topics));
    -        }
    -
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String ... topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -            SerializableDeserializer<V> valDes, Collection<String> topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -            SerializableDeserializer<V> valDes, Pattern topics) {
    -            this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
    +            this(bootstrapServers, new NamedSubscription(topics));
             }
             
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -            SerializableDeserializer<V> valDes, Subscription subscription) {
    -            this(bootstrapServers, keyDes, null, valDes, null, subscription);
    +        public Builder(String bootstrapServers, Collection<String> topics) {
    +            this(bootstrapServers, new NamedSubscription(topics));
             }
             
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, String ... topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    +        public Builder(String bootstrapServers, Pattern topics) {
    +            this(bootstrapServers, new PatternSubscription(topics));
             }
             
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, Collection<String> topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, Pattern topics) {
    -            this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, Subscription subscription) {
    -            this(bootstrapServers, null, keyDes, null, valDes, subscription);
    -        }
    -        
    -        private Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -                Class<? extends Deserializer<K>> keyDesClazz,
    -                SerializableDeserializer<V> valDes, Class<? extends Deserializer<V>> valDesClazz, Subscription subscription) {
    +        /**
    +         * Create a KafkaSpoutConfig builder.
    +         * @param bootstrapServers The bootstrap servers the consumer will use
    +         * @param subscription The subscription defining which topics and partitions each spout instance will read.
    +         */
    +        public Builder(String bootstrapServers, Subscription subscription) {
                 kafkaProps = new HashMap<>();
                 if (bootstrapServers == null || bootstrapServers.isEmpty()) {
                     throw new IllegalArgumentException("bootstrap servers cannot be null");
                 }
                 kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    -            this.keyDes = keyDes;
    -            this.keyDesClazz = keyDesClazz;
    -            this.valueDes = valDes;
    -            this.valueDesClazz = valDesClazz;
                 this.subscription = subscription;
    -            this.translator = new DefaultRecordTranslator<K,V>();
    -        }
    -
    -        private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
    -                SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz) {
    -            this.kafkaProps = new HashMap<>(builder.kafkaProps);
    -            this.subscription = builder.subscription;
    -            this.pollTimeoutMs = builder.pollTimeoutMs;
    -            this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
    -            this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
    -            this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
    -            //this could result in a lot of class case exceptions at runtime,
    -            // but because some translators will work no matter what the generics
    -            // are I thought it best not to force someone to reset the translator
    -            // when they change the key/value types.
    -            this.translator = (RecordTranslator<K, V>) builder.translator;
    -            this.retryService = builder.retryService;
    -            this.keyDes = keyDes;
    -            this.keyDesClazz = keyDesClazz;
    -            this.valueDes = valueDes;
    -            this.valueDesClazz = valueDesClazz;
    -        }
    -
    -        /**
    -         * Specifying this key deserializer overrides the property key.deserializer. If you have
    -         * set a custom RecordTranslator before calling this it may result in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewKeyT> Builder<NewKeyT,V> setKey(SerializableDeserializer<NewKeyT> keyDeserializer) {
    -            return new Builder<>(this, keyDeserializer, null, valueDes, valueDesClazz);
    -        }
    -        
    -        /**
    -         * Specify a class that can be instantiated to create a key.deserializer
    -         * This is the same as setting key.deserializer, but overrides it. If you have
    -         * set a custom RecordTranslator before calling this it may result in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewKeyT> Builder<NewKeyT, V> setKey(Class<? extends Deserializer<NewKeyT>> clazz) {
    -            return new Builder<>(this, null, clazz, valueDes, valueDesClazz);
    -        }
    -
    -        /**
    -         * Specifying this value deserializer overrides the property value.deserializer.  If you have
    -         * set a custom RecordTranslator before calling this it may result in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewValueT> Builder<K,NewValueT> setValue(SerializableDeserializer<NewValueT> valueDeserializer) {
    -            return new Builder<>(this, keyDes, keyDesClazz, valueDeserializer, null);
    +            this.translator = new DefaultRecordTranslator<>();
             }
             
             /**
    -         * Specify a class that can be instantiated to create a value.deserializer
    -         * This is the same as setting value.deserializer, but overrides it.  If you have
    -         * set a custom RecordTranslator before calling this it may result in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewValueT> Builder<K,NewValueT> setValue(Class<? extends Deserializer<NewValueT>> clazz) {
    -            return new Builder<>(this, keyDes, keyDesClazz, null, clazz);
    -        }
    -        
    -        /**
    -         * Set a Kafka property config.
    +         * Set a Kafka consumer property config. 
    --- End diff --
    
    Set a Kafka consumer property


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid duplicatin...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2155
  
    Thanks for the reviews everyone.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid dup...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2155#discussion_r121728052
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -79,16 +78,46 @@
             UNCOMMITTED_EARLIEST,
             UNCOMMITTED_LATEST }
         
    +    /**
    +     * Convenience method to get a Builder for String key/value spouts. Sets the key/value Deserializers to the StringDeserializer class.
    +     * Users who need different deserializers should use the Builder constructors instead, 
    +     * and set the deserializer classes via {@link Builder#setProp(java.lang.String, java.lang.Object)}
    +     * @param bootstrapServers The bootstrap servers for the consumer
    +     * @param topics The topics to subscribe to
    +     * @return The new builder
    +     */
         public static Builder<String, String> builder(String bootstrapServers, String ... topics) {
    -        return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics);
    +        return setDefaultStringDeserializers(new Builder<>(bootstrapServers, topics));
         }
         
    +    /**
    +     * Convenience method to get a Builder for String key/value spouts. Sets the key/value Deserializers to the StringDeserializer class.
    +     * Users who need different deserializers should use the Builder constructors instead, 
    +     * and set the deserializer classes via {@link Builder#setProp(java.lang.String, java.lang.Object)}
    +     * @param bootstrapServers The bootstrap servers for the consumer
    +     * @param topics The topics to subscribe to
    +     * @return The new builder
    +     */
         public static Builder<String, String> builder(String bootstrapServers, Collection<String> topics) {
    -        return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics);
    +        return setDefaultStringDeserializers(new Builder<>(bootstrapServers, topics));
         }
         
    +    /**
    +     * Convenience method to get a Builder for String key/value spouts. Sets the key/value Deserializers to the StringDeserializer class.
    +     * Users who need different deserializers should use the Builder constructors instead, 
    +     * and set the deserializer classes via {@link Builder#setProp(java.lang.String, java.lang.Object)}
    +     * @param bootstrapServers The bootstrap servers for the consumer
    +     * @param topics The topic pattern to subscribe to
    +     * @return The new builder
    +     */
         public static Builder<String, String> builder(String bootstrapServers, Pattern topics) {
    -        return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics);
    +        return setDefaultStringDeserializers(new Builder<>(bootstrapServers, topics));
    +    }
    +    
    +    private static Builder<String, String> setDefaultStringDeserializers(Builder<String, String> builder) {
    +        builder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    --- End diff --
    
    This is just here to support the 3 convenience builder() functions above. I'll rename it so it doesn't imply that String is the default type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid dup...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2155#discussion_r124572749
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -79,16 +78,46 @@
             UNCOMMITTED_EARLIEST,
             UNCOMMITTED_LATEST }
         
    +    /**
    +     * Convenience method to get a Builder for String key/value spouts. Sets the key/value Deserializers to the StringDeserializer class.
    --- End diff --
    
    NIT: I would suggest a more succinct javadoc, e.g. "Factory method that creates a Builder that with key/val String deserializers.". In my opinion the text mentioning how to use builders for other ser/des should go in the README.
    
    Same comment would apply to other Builder factory methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid duplicatin...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2155
  
    @hmcl Added a field for the offset strategy default and reordered the KafkaSpoutConfig contents as you suggested.
    
    I didn't mean for the branches to contain each other, I accidentally reset a branch to the wrong content, should be fixed now. Once this or STORM-2541 goes in I'll fix any conflicts that pop up in the other PR. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid duplicatin...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/2155
  
    still +1 .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid duplicatin...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/2155
  
    +1. @harshach @priyank5485 can you please take one final look. If you don't have any objection, I suggest that we merge this patch in the next day or so.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid duplicatin...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/2155
  
    @srdo thanks for addressing the code comments. It LGTM, but I forgot to publish the following comment - sorry for the extra overhead. Do you want to address it in this patch as well ?
    
    Most of the defaults in [here](https://github.com/srdo/storm/blob/73adaa983cc421c7974e95011030c54f75e7698a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L38-#L51) are also in [here](https://github.com/srdo/storm/blob/73adaa983cc421c7974e95011030c54f75e7698a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L135-#L140). I think it would be better to have de defaults only in one place, ideally in the top level class. However, if it makes the code simpler to leave the defaults only in the Builder class, so be it. Neertheless, we should avoid having them in both places.
    
    I also would like to ask if you can squash all the commits once we are done addressing everything, such that we can keep the git log clean. Thanks a lot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid dup...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2155#discussion_r124604957
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -116,217 +141,57 @@
             private boolean emitNullTuples = false;
     
             public Builder(String bootstrapServers, String ... topics) {
    -            this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new NamedSubscription(topics));
    -        }
    -
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String ... topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -            SerializableDeserializer<V> valDes, Collection<String> topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -            SerializableDeserializer<V> valDes, Pattern topics) {
    -            this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
    +            this(bootstrapServers, new NamedSubscription(topics));
             }
             
    -        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -            SerializableDeserializer<V> valDes, Subscription subscription) {
    -            this(bootstrapServers, keyDes, null, valDes, null, subscription);
    +        public Builder(String bootstrapServers, Collection<String> topics) {
    +            this(bootstrapServers, new NamedSubscription(topics));
             }
             
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, String ... topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    +        public Builder(String bootstrapServers, Pattern topics) {
    +            this(bootstrapServers, new PatternSubscription(topics));
             }
             
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, Collection<String> topics) {
    -            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, Pattern topics) {
    -            this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, Subscription subscription) {
    -            this(bootstrapServers, null, keyDes, null, valDes, subscription);
    -        }
    -        
    -        private Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
    -                Class<? extends Deserializer<K>> keyDesClazz,
    -                SerializableDeserializer<V> valDes, Class<? extends Deserializer<V>> valDesClazz, Subscription subscription) {
    +        /**
    +         * Create a KafkaSpoutConfig builder.
    +         * @param bootstrapServers The bootstrap servers the consumer will use
    +         * @param subscription The subscription defining which topics and partitions each spout instance will read.
    +         */
    +        public Builder(String bootstrapServers, Subscription subscription) {
                 kafkaProps = new HashMap<>();
                 if (bootstrapServers == null || bootstrapServers.isEmpty()) {
                     throw new IllegalArgumentException("bootstrap servers cannot be null");
                 }
                 kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    -            this.keyDes = keyDes;
    -            this.keyDesClazz = keyDesClazz;
    -            this.valueDes = valDes;
    -            this.valueDesClazz = valDesClazz;
                 this.subscription = subscription;
    -            this.translator = new DefaultRecordTranslator<K,V>();
    -        }
    -
    -        private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
    -                SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz) {
    -            this.kafkaProps = new HashMap<>(builder.kafkaProps);
    -            this.subscription = builder.subscription;
    -            this.pollTimeoutMs = builder.pollTimeoutMs;
    -            this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
    -            this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
    -            this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
    -            //this could result in a lot of class case exceptions at runtime,
    -            // but because some translators will work no matter what the generics
    -            // are I thought it best not to force someone to reset the translator
    -            // when they change the key/value types.
    -            this.translator = (RecordTranslator<K, V>) builder.translator;
    -            this.retryService = builder.retryService;
    -            this.keyDes = keyDes;
    -            this.keyDesClazz = keyDesClazz;
    -            this.valueDes = valueDes;
    -            this.valueDesClazz = valueDesClazz;
    -        }
    -
    -        /**
    -         * Specifying this key deserializer overrides the property key.deserializer. If you have
    -         * set a custom RecordTranslator before calling this it may result in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewKeyT> Builder<NewKeyT,V> setKey(SerializableDeserializer<NewKeyT> keyDeserializer) {
    -            return new Builder<>(this, keyDeserializer, null, valueDes, valueDesClazz);
    -        }
    -        
    -        /**
    -         * Specify a class that can be instantiated to create a key.deserializer
    -         * This is the same as setting key.deserializer, but overrides it. If you have
    -         * set a custom RecordTranslator before calling this it may result in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewKeyT> Builder<NewKeyT, V> setKey(Class<? extends Deserializer<NewKeyT>> clazz) {
    -            return new Builder<>(this, null, clazz, valueDes, valueDesClazz);
    -        }
    -
    -        /**
    -         * Specifying this value deserializer overrides the property value.deserializer.  If you have
    -         * set a custom RecordTranslator before calling this it may result in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewValueT> Builder<K,NewValueT> setValue(SerializableDeserializer<NewValueT> valueDeserializer) {
    -            return new Builder<>(this, keyDes, keyDesClazz, valueDeserializer, null);
    +            this.translator = new DefaultRecordTranslator<>();
             }
             
             /**
    -         * Specify a class that can be instantiated to create a value.deserializer
    -         * This is the same as setting value.deserializer, but overrides it.  If you have
    -         * set a custom RecordTranslator before calling this it may result in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewValueT> Builder<K,NewValueT> setValue(Class<? extends Deserializer<NewValueT>> clazz) {
    -            return new Builder<>(this, keyDes, keyDesClazz, null, clazz);
    -        }
    -        
    -        /**
    -         * Set a Kafka property config.
    +         * Set a Kafka consumer property config. 
              */
             public Builder<K,V> setProp(String key, Object value) {
                 kafkaProps.put(key, value);
                 return this;
             }
             
             /**
    -         * Set multiple Kafka property configs.
    +         * Set multiple Kafka consumer property configs.
              */
             public Builder<K,V> setProp(Map<String, Object> props) {
                 kafkaProps.putAll(props);
                 return this;
             }
             
             /**
    -         * Set multiple Kafka property configs.
    +         * Set multiple Kafka consumer property configs.
              */
             public Builder<K,V> setProp(Properties props) {
                 for (String name: props.stringPropertyNames()) {
    --- End diff --
    
    Nevermind, that won't work since KafkaSpoutConfig enforces the String type on keys. I'd at least like to throw an exception if the input properties contain a non-String key.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid duplicatin...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2155
  
    @hmcl I'm not sure how we can do that. The fields in Builder are not static, so if we move the default definitions there, we'd have to create a builder and fish out the default values in the tests. The defaults also wouldn't be visible on the KafkaSpoutConfig class, which would probably be a minus. I don't see a way to have the defaults solely in the KafkaSpoutConfig class with no references to them from Builder, since the Builer fields have to get the default values if nothing else is set.
    
    We could move the FirstPollOffsetStrategy default into a static field in KafkaSpoutConfig like the rest for consistency at least?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---