You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by revans2 <gi...@git.apache.org> on 2017/01/09 17:00:35 UTC

[GitHub] storm pull request #1868: STORM-2225: change spout config to be simpler. (1....

GitHub user revans2 opened a pull request:

    https://github.com/apache/storm/pull/1868

    STORM-2225: change spout config to be simpler. (1.x)

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/revans2/incubator-storm STORM-2225-1.x

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1868.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1868
    
----
commit 95883ac6f0202367b7a7f47eaa50ddeef824dc27
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Date:   2016-11-30T03:39:26Z

    STORM-1997: copy state/bolt from storm-kafka to storm-kafka-client
    STORM-2225: change spout config to be simpler.
    STORM-2228: removed ability to request a single topic go to multiple streams
    
    Conflicts:
    	examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountWildcardTopics.java
    	external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java

commit 2e041c3af64143157fd2e7a0af419685857e8083
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Date:   2016-12-08T19:26:49Z

    fixed some issues with rebase
    
    Conflicts:
    	external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java

commit dbf040082357e585bd3aba94ed31d25e8c5f3ea9
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Date:   2016-12-08T22:10:12Z

    addressed review comments
    
    Conflicts:
    	external/storm-kafka-client/README.md

commit f95fc1670c6c03e27082d59555e26560636d15a9
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Date:   2016-12-08T22:12:43Z

    oops

commit b8e32fceefaa1e71b4e4dec6e67d0a126761e949
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Date:   2017-01-06T22:37:31Z

    STORM-2225: make the core API java7 compatible

commit 38f4ede2899145b4f3b527a746e7cd0999e9bb46
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Date:   2017-01-06T22:42:31Z

    STORM-2225: addressed doc review comments

commit 853c524313cf1375499d3a9ccb0ec5a3509a7ae8
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Date:   2017-01-09T16:11:41Z

    STORM-2225: java7 modifications

commit 89eb16faef264057efebb6eb19f4a8089dd820a9
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Date:   2017-01-09T16:58:27Z

    STORM-2225: Updated docs

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1868: STORM-2225: change spout config to be simpler. (1....

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1868#discussion_r95682402
  
    --- Diff: examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java ---
    @@ -18,87 +18,58 @@
     
     package org.apache.storm.kafka.trident;
     
    +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
    +
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.concurrent.TimeUnit;
    +
     import org.apache.kafka.clients.consumer.ConsumerRecord;
     import org.apache.storm.Config;
     import org.apache.storm.StormSubmitter;
     import org.apache.storm.generated.AlreadyAliveException;
     import org.apache.storm.generated.AuthorizationException;
     import org.apache.storm.generated.InvalidTopologyException;
    +import org.apache.storm.kafka.spout.Func;
     import org.apache.storm.kafka.spout.KafkaSpoutConfig;
     import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
    +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
     import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
    -import org.apache.storm.kafka.spout.KafkaSpoutStreams;
    -import org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics;
    -import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
    -import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
    -import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilderNamedTopics;
    -import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutManager;
     import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
     import org.apache.storm.tuple.Fields;
     import org.apache.storm.tuple.Values;
     
    -import java.util.Arrays;
    -import java.util.HashMap;
    -import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.TimeUnit;
    -
    -import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
    -
     public class TridentKafkaClientWordCountNamedTopics {
         private static final String TOPIC_1 = "test-trident";
         private static final String TOPIC_2 = "test-trident-1";
         private static final String KAFKA_LOCAL_BROKER = "localhost:9092";
     
         private KafkaTridentSpoutOpaque<String, String> newKafkaTridentSpoutOpaque() {
    -        return new KafkaTridentSpoutOpaque<>(new KafkaTridentSpoutManager<>(
    -                        newKafkaSpoutConfig(
    -                        newKafkaSpoutStreams())));
    +        return new KafkaTridentSpoutOpaque<>(newKafkaSpoutConfig());
         }
     
    -    private KafkaSpoutConfig<String,String> newKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
    -        return new KafkaSpoutConfig.Builder<>(newKafkaConsumerProps(),
    -                    kafkaSpoutStreams, newTuplesBuilder(), newRetryService())
    +	private static Func<ConsumerRecord<String, String>, List<Object>> JUST_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
    +		@Override
    +		public List<Object> apply(ConsumerRecord<String, String> record) {
    +			return new Values(record.value());
    +		}
    +	};
    +    
    +    protected KafkaSpoutConfig<String,String> newKafkaSpoutConfig() {
    +        return KafkaSpoutConfig.builder("127.0.0.1:9092", TOPIC_1, TOPIC_2)
    --- End diff --
    
    This is an exact translation of the original code. Even down to not using KAFKA_LOCAL_BROKER.  If people want me to change it I am happy to, but I thought it best to not overreach on the scope of the pull request.  At least until the code worked.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1868: STORM-2225: change spout config to be simpler. (1....

Posted by ppoulosk <gi...@git.apache.org>.
Github user ppoulosk commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1868#discussion_r95681580
  
    --- Diff: examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java ---
    @@ -18,87 +18,58 @@
     
     package org.apache.storm.kafka.trident;
     
    +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
    +
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.concurrent.TimeUnit;
    +
     import org.apache.kafka.clients.consumer.ConsumerRecord;
     import org.apache.storm.Config;
     import org.apache.storm.StormSubmitter;
     import org.apache.storm.generated.AlreadyAliveException;
     import org.apache.storm.generated.AuthorizationException;
     import org.apache.storm.generated.InvalidTopologyException;
    +import org.apache.storm.kafka.spout.Func;
     import org.apache.storm.kafka.spout.KafkaSpoutConfig;
     import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
    +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
     import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
    -import org.apache.storm.kafka.spout.KafkaSpoutStreams;
    -import org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics;
    -import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
    -import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
    -import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilderNamedTopics;
    -import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutManager;
     import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
     import org.apache.storm.tuple.Fields;
     import org.apache.storm.tuple.Values;
     
    -import java.util.Arrays;
    -import java.util.HashMap;
    -import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.TimeUnit;
    -
    -import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
    -
     public class TridentKafkaClientWordCountNamedTopics {
         private static final String TOPIC_1 = "test-trident";
         private static final String TOPIC_2 = "test-trident-1";
         private static final String KAFKA_LOCAL_BROKER = "localhost:9092";
     
         private KafkaTridentSpoutOpaque<String, String> newKafkaTridentSpoutOpaque() {
    -        return new KafkaTridentSpoutOpaque<>(new KafkaTridentSpoutManager<>(
    -                        newKafkaSpoutConfig(
    -                        newKafkaSpoutStreams())));
    +        return new KafkaTridentSpoutOpaque<>(newKafkaSpoutConfig());
         }
     
    -    private KafkaSpoutConfig<String,String> newKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
    -        return new KafkaSpoutConfig.Builder<>(newKafkaConsumerProps(),
    -                    kafkaSpoutStreams, newTuplesBuilder(), newRetryService())
    +	private static Func<ConsumerRecord<String, String>, List<Object>> JUST_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
    +		@Override
    +		public List<Object> apply(ConsumerRecord<String, String> record) {
    +			return new Values(record.value());
    +		}
    +	};
    +    
    +    protected KafkaSpoutConfig<String,String> newKafkaSpoutConfig() {
    +        return KafkaSpoutConfig.builder("127.0.0.1:9092", TOPIC_1, TOPIC_2)
    --- End diff --
    
    Why are we using a hardcoded port here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1868: STORM-2225: change spout config to be simpler. (1.x)

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1868
  
    @revans2 
    Seems like the commit for JDK 7 is removed so Travis CI shows compile error on JDK 7. Could you address this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1868: STORM-2225: change spout config to be simpler. (1.x)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/1868
  
    This is the 1.x version of #1808 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1868: STORM-2225: change spout config to be simpler. (1.x)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/1868
  
    This branch is also in sync with #1808 now tabs are removed and code is squashed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1868: STORM-2225: change spout config to be simpler. (1....

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1868#discussion_r96109547
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/StormStringDeserializer.java ---
    @@ -15,22 +15,11 @@
      *   See the License for the specific language governing permissions and
      *   limitations under the License.
      */
    -
     package org.apache.storm.kafka.spout;
     
    -import org.apache.kafka.clients.consumer.ConsumerRecord;
    -
    -import java.util.List;
    -
    -public class KafkaSpoutTuplesBuilderWildcardTopics<K,V> implements KafkaSpoutTuplesBuilder<K,V> {
    -    private KafkaSpoutTupleBuilder<K, V> tupleBuilder;
    +import org.apache.kafka.common.serialization.StringDeserializer;
     
    -    public KafkaSpoutTuplesBuilderWildcardTopics(KafkaSpoutTupleBuilder<K, V> tupleBuilder) {
    -        this.tupleBuilder = tupleBuilder;
    -    }
    +public class StormStringDeserializer extends StringDeserializer implements SerializableDeserializer<String> {
    --- End diff --
    
    Do we still need this? users can configure the StringSerializer from kafka-clients lib


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1868: STORM-2225: change spout config to be simpler. (1.x)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/1868
  
    The test failures are unrelated and are around the integration tests that always seem to fail lately.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1868: STORM-2225: change spout config to be simpler. (1.x)

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1868
  
    +1. @revans2 can you squash the commits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1868: STORM-2225: change spout config to be simpler. (1.x)

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1868
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1868: STORM-2225: change spout config to be simpler. (1....

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/1868


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1868: STORM-2225: change spout config to be simpler. (1.x)

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1868
  
    @revans2 overall looks good to me. Minor nits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1868: STORM-2225: change spout config to be simpler. (1.x)

Posted by ppoulosk <gi...@git.apache.org>.
Github user ppoulosk commented on the issue:

    https://github.com/apache/storm/pull/1868
  
    +1, Non-binding.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1868: STORM-2225: change spout config to be simpler. (1.x)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/1868
  
    @harshach and @ppoulosk I just pushed fixes for your review comments.  @harshach you were right I could and did remove StormStringDeserializer.  I think this made the code much better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1868: STORM-2225: change spout config to be simpler. (1.x)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/1868
  
    It was two jdk8 annotations that I didn't remove as part of rework/upmerging.  I'll get them


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1868: STORM-2225: change spout config to be simpler. (1....

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1868#discussion_r96432779
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/StormStringDeserializer.java ---
    @@ -15,22 +15,11 @@
      *   See the License for the specific language governing permissions and
      *   limitations under the License.
      */
    -
     package org.apache.storm.kafka.spout;
     
    -import org.apache.kafka.clients.consumer.ConsumerRecord;
    -
    -import java.util.List;
    -
    -public class KafkaSpoutTuplesBuilderWildcardTopics<K,V> implements KafkaSpoutTuplesBuilder<K,V> {
    -    private KafkaSpoutTupleBuilder<K, V> tupleBuilder;
    +import org.apache.kafka.common.serialization.StringDeserializer;
     
    -    public KafkaSpoutTuplesBuilderWildcardTopics(KafkaSpoutTupleBuilder<K, V> tupleBuilder) {
    -        this.tupleBuilder = tupleBuilder;
    -    }
    +public class StormStringDeserializer extends StringDeserializer implements SerializableDeserializer<String> {
    --- End diff --
    
    Yes we need this even more now.  The Kafka Deserializer (including StringDeserializer) is not java serializable.  So if we don't do this on a real storm cluster we will get exceptions when we try to write out the spout.
    
    I can look into trying to support some kind of generics like 
    ```
    public <NK> Builder<NK,V> setKey(Class<? extends Deserializer<NK>> keyDeserializer) {
    ```
    But I really don't know if that works.  I'll try it out and let you know.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1868: STORM-2225: change spout config to be simpler. (1.x)

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1868
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1868: STORM-2225: change spout config to be simpler. (1....

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1868#discussion_r96109387
  
    --- Diff: external/storm-kafka-client/README.md ---
    @@ -1,192 +1,5 @@
    -#Storm Kafka Spout with New Kafka Consumer API
    +#Storm Kafka integration using the kafka-client jar
    --- End diff --
    
    can we call it as Kafka New consumer API. Not many understand its using newer api by calling it as "kafka-client" jar


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1868: STORM-2225: change spout config to be simpler. (1.x)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/1868
  
    @harshach I am happy to once I get a +1 on #1808 too.  It has almost identical code, but is for master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1868: STORM-2225: change spout config to be simpler. (1.x)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/1868
  
    @HeartSaVioR I removed the unneeded annotation and rebuilt with JDK7


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1868: STORM-2225: change spout config to be simpler. (1....

Posted by ppoulosk <gi...@git.apache.org>.
Github user ppoulosk commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1868#discussion_r95689831
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java ---
    @@ -0,0 +1,49 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.kafka.bolt.selector;
    +
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Uses field with a given index to select the topic name from a tuple .
    + */
    +public class FieldIndexTopicSelector implements KafkaTopicSelector {
    +    private static final long serialVersionUID = -3830575380208166367L;
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(FieldIndexTopicSelector.class);
    +
    +    private final int fieldIndex;
    +    private final String defaultTopicName;
    +
    +    public FieldIndexTopicSelector(int fieldIndex, String defaultTopicName) {
    +        this.fieldIndex = fieldIndex;
    +        this.defaultTopicName = defaultTopicName;
    +    }
    +
    +    @Override
    +    public String getTopic(Tuple tuple) {
    +        if (fieldIndex < tuple.size()) {
    --- End diff --
    
    Protect against negative index?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1868: STORM-2225: change spout config to be simpler. (1.x)

Posted by ppoulosk <gi...@git.apache.org>.
Github user ppoulosk commented on the issue:

    https://github.com/apache/storm/pull/1868
  
    Thanks, @revans2.  Still +1, NB.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---