You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Milind Vaidya <ka...@gmail.com> on 2018/09/17 22:16:35 UTC

Transition from kafka 0.8 to 0.10

Hi

We had been using kafka 0.8 with Storm. It was upgraded to
kafka_2.11-0.10.0.1 and Storm 1.1.1 as of now. Though the libraries changed
the code pretty much remained the same.

Now we are trying to upgrade to version 1.2.2 of Storm and also look into
KafkaSpoutRetryService. This also leads to using new KafkaSpoutConfig.

What I fail to understand is where do I set properties related to zookeeper
such as zkRoot in this new config. I also did not find any way to set
following properties


public int fetchSizeBytes = 1024 * 1024;public int socketTimeoutMs =
10000;public int fetchMaxWait = 10000;public int bufferSizeBytes =
1024 * 1024;public MultiScheme scheme = new RawMultiScheme();public
boolean ignoreZkOffsets = false;public long startOffsetTime =
kafka.api.OffsetRequest.EarliestTime();public long maxOffsetBehind =
Long.MAX_VALUE;public boolean useStartOffsetTimeIfOffsetOutOfRange =
true;public int metricsTimeBucketSizeInSecs = 60;

Re: Transition from kafka 0.8 to 0.10

Posted by Stig Rohde Døssing <st...@gmail.com>.
See https://issues.apache.org/jira/browse/STORM-2992.

Den tir. 2. okt. 2018 kl. 01.01 skrev Milind Vaidya <ka...@gmail.com>:

> Hi Stig,
>
> Thanks for that response. I was able to set up kafka-storm integration in
> QA without zookeeper. I have a specific question related to
> FIrstPollOffsetStrategy. The table at the end of the above link you sent
> explains the mapping between startOffsetTime and forceFromStart to
> FIrstPollOffsetStrategy. But the old KafkaSpout also had a provision by
> which a unix timestamp aka seconds since epoch was also acceptable value.
> This combined with ignoring offsets from ZK was convenient to have a finer
> granularity of consuming the kafka offsets.The new spout has only 4 such
> options. Is there a way to achieve the timestamp based consumption ? When
> described, the kafka consumer group using command line the timestamp is
> nowhere to be found in the fields.
>
>
> On Mon, Sep 17, 2018 at 10:45 PM Stig Rohde Døssing <
> stigdoessing@gmail.com> wrote:
>
>> The storm-kafka-client spout is a complete rewrite. The new spout stores
>> offsets in Kafka instead of Zookeeper, so you don't need to set any
>> Zookeeper configuration. You will need to migrate your committed offsets
>> using
>> https://github.com/apache/storm/tree/master/external/storm-kafka-migration
>> though.
>>
>> Regarding how to set the properties you listed, take a look at
>> https://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-kafka-client.html.
>> There's a table at the bottom that compares the two spout configs. Most of
>> the configs are now set using KafkaConsumer properties, which you can set
>> using KafkaSpoutConfig.setProp. The properties are documented at
>> https://kafka.apache.org/documentation/#newconsumerconfigs.
>>
>> Den tir. 18. sep. 2018 kl. 00.17 skrev Milind Vaidya <ka...@gmail.com>:
>>
>>> Hi
>>>
>>> We had been using kafka 0.8 with Storm. It was upgraded to
>>> kafka_2.11-0.10.0.1 and Storm 1.1.1 as of now. Though the libraries changed
>>> the code pretty much remained the same.
>>>
>>> Now we are trying to upgrade to version 1.2.2 of Storm and also look
>>> into KafkaSpoutRetryService. This also leads to using new
>>> KafkaSpoutConfig.
>>>
>>> What I fail to understand is where do I set properties related to
>>> zookeeper such as zkRoot in this new config. I also did not find any way to
>>> set following properties
>>>
>>>
>>> public int fetchSizeBytes = 1024 * 1024;public int socketTimeoutMs = 10000;public int fetchMaxWait = 10000;public int bufferSizeBytes = 1024 * 1024;public MultiScheme scheme = new RawMultiScheme();public boolean ignoreZkOffsets = false;public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();public long maxOffsetBehind = Long.MAX_VALUE;public boolean useStartOffsetTimeIfOffsetOutOfRange = true;public int metricsTimeBucketSizeInSecs = 60;
>>>
>>>

Re: Transition from kafka 0.8 to 0.10

Posted by Milind Vaidya <ka...@gmail.com>.
Hi Stig,

Thanks for that response. I was able to set up kafka-storm integration in
QA without zookeeper. I have a specific question related to
FIrstPollOffsetStrategy. The table at the end of the above link you sent
explains the mapping between startOffsetTime and forceFromStart to
FIrstPollOffsetStrategy. But the old KafkaSpout also had a provision by
which a unix timestamp aka seconds since epoch was also acceptable value.
This combined with ignoring offsets from ZK was convenient to have a finer
granularity of consuming the kafka offsets.The new spout has only 4 such
options. Is there a way to achieve the timestamp based consumption ? When
described, the kafka consumer group using command line the timestamp is
nowhere to be found in the fields.


On Mon, Sep 17, 2018 at 10:45 PM Stig Rohde Døssing <st...@gmail.com>
wrote:

> The storm-kafka-client spout is a complete rewrite. The new spout stores
> offsets in Kafka instead of Zookeeper, so you don't need to set any
> Zookeeper configuration. You will need to migrate your committed offsets
> using
> https://github.com/apache/storm/tree/master/external/storm-kafka-migration
> though.
>
> Regarding how to set the properties you listed, take a look at
> https://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-kafka-client.html.
> There's a table at the bottom that compares the two spout configs. Most of
> the configs are now set using KafkaConsumer properties, which you can set
> using KafkaSpoutConfig.setProp. The properties are documented at
> https://kafka.apache.org/documentation/#newconsumerconfigs.
>
> Den tir. 18. sep. 2018 kl. 00.17 skrev Milind Vaidya <ka...@gmail.com>:
>
>> Hi
>>
>> We had been using kafka 0.8 with Storm. It was upgraded to
>> kafka_2.11-0.10.0.1 and Storm 1.1.1 as of now. Though the libraries changed
>> the code pretty much remained the same.
>>
>> Now we are trying to upgrade to version 1.2.2 of Storm and also look into
>> KafkaSpoutRetryService. This also leads to using new KafkaSpoutConfig.
>>
>> What I fail to understand is where do I set properties related to
>> zookeeper such as zkRoot in this new config. I also did not find any way to
>> set following properties
>>
>>
>> public int fetchSizeBytes = 1024 * 1024;public int socketTimeoutMs = 10000;public int fetchMaxWait = 10000;public int bufferSizeBytes = 1024 * 1024;public MultiScheme scheme = new RawMultiScheme();public boolean ignoreZkOffsets = false;public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();public long maxOffsetBehind = Long.MAX_VALUE;public boolean useStartOffsetTimeIfOffsetOutOfRange = true;public int metricsTimeBucketSizeInSecs = 60;
>>
>>

Re: Transition from kafka 0.8 to 0.10

Posted by Stig Rohde Døssing <st...@gmail.com>.
The storm-kafka-client spout is a complete rewrite. The new spout stores
offsets in Kafka instead of Zookeeper, so you don't need to set any
Zookeeper configuration. You will need to migrate your committed offsets
using
https://github.com/apache/storm/tree/master/external/storm-kafka-migration
though.

Regarding how to set the properties you listed, take a look at
https://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-kafka-client.html.
There's a table at the bottom that compares the two spout configs. Most of
the configs are now set using KafkaConsumer properties, which you can set
using KafkaSpoutConfig.setProp. The properties are documented at
https://kafka.apache.org/documentation/#newconsumerconfigs.

Den tir. 18. sep. 2018 kl. 00.17 skrev Milind Vaidya <ka...@gmail.com>:

> Hi
>
> We had been using kafka 0.8 with Storm. It was upgraded to
> kafka_2.11-0.10.0.1 and Storm 1.1.1 as of now. Though the libraries changed
> the code pretty much remained the same.
>
> Now we are trying to upgrade to version 1.2.2 of Storm and also look into
> KafkaSpoutRetryService. This also leads to using new KafkaSpoutConfig.
>
> What I fail to understand is where do I set properties related to
> zookeeper such as zkRoot in this new config. I also did not find any way to
> set following properties
>
>
> public int fetchSizeBytes = 1024 * 1024;public int socketTimeoutMs = 10000;public int fetchMaxWait = 10000;public int bufferSizeBytes = 1024 * 1024;public MultiScheme scheme = new RawMultiScheme();public boolean ignoreZkOffsets = false;public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();public long maxOffsetBehind = Long.MAX_VALUE;public boolean useStartOffsetTimeIfOffsetOutOfRange = true;public int metricsTimeBucketSizeInSecs = 60;
>
>