You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Nasron Cheong <na...@gmail.com> on 2017/12/19 20:59:41 UTC

Migrating from storm-kafka to storm-kafka-client

Hi,

I'm trying to determine steps for migration to the storm-kafka-client in
order to use the new kafka client.

It's not quite clear to me how offsets are migrated - is there a specific
set of steps to ensure offsets are moved from the ZK based offsets into the
kafka based offsets?

Or is the original configuration respected, and storm-kafka-client can
mostly be a drop in replacement?

I want to avoid having spouts reset to the beginning of topics after
deployment, due to this change.

Thanks.

- Nasron

Re: Migrating from storm-kafka to storm-kafka-client

Posted by Stig Rohde Døssing <sr...@apache.org>.
In case you still need a tool for this, I added one here
https://github.com/apache/storm/pull/2560.

2018-01-12 20:22 GMT+01:00 Stig Rohde Døssing <sr...@apache.org>:

> There's a small discrepancy, in the old spout we track the offset the next
> batch should start at, while in storm-kafka-client we track the last
> emitted offset. So for example if offset 0, 1, 2 were emitted, the old
> spout would store offset = 0, nextOffset = 3. The new spout would store
> firstOffset = 0, lastOffset = 2. I think it should be lastOffset =
> nextOffset - 1. Other than that I agree with your mapping.
>
> 2018-01-12 <20%2018%2001%2012> 3:43 GMT+01:00 Nasron Cheong <
> nasron@gmail.com>:
>
>> Hi Stig,
>>
>> That's great! Thanks for all the info. Looking through the code, one
>> small detail is the difference between storm-kafka-client's format and
>> storm-kafka. The former uses 'firstOffset' and 'lastOffset' and the latter
>> uses 'offset' and 'nextOffset'.
>>
>> So, can I map with
>>
>> firstOffset = offset
>>
>> and
>>
>> lastOffset = nextOffset+1 ?
>>
>> Looking through the code it seems to be that nextOffset is placed after
>> the last consumed message, but I'm not sure.
>>
>> Thanks
>>
>> - Nasron
>>
>> On Thu, Jan 11, 2018 at 5:43 PM, Stig Rohde Døssing <sr...@apache.org>
>> wrote:
>>
>>> Nasron,
>>>
>>> Okay, migrating a Trident spout is a very different thing. Trident
>>> spouts store their state in Storm's zookeeper (unless you decide otherwise
>>> by setting transactional.zookeeper.servers in storm.yaml). This also
>>> applies to the storm-kafka-client Trident spout, so we won't need to move
>>> offsets into Kafka.
>>>
>>> The idea of stopping all the producers and starting at LATEST (or
>>> UNCOMMITTED_LATEST) is decent, but as you note there's a (small) risk of
>>> skipping tuples. In order to get Trident to commit something, you have to
>>> deploy the new topology with LATEST and start the producers again, wait
>>> until at least one commit happens, and then take the topology back down and
>>> redeploy with whatever your first poll strategy normally is. If the worker
>>> crashes before the spout manages to commit something, you will skip tuples.
>>>
>>> If you don't want to do that, here's my notes on storm-kafka ->
>>> storm-kafka-client for Trident:
>>>
>>> The storage formats and zk paths for the two spouts are a little
>>> different. Both spouts store their state as JSON maps, but some of the keys
>>> are different. I use ${} below to indicate variable substitution.
>>>
>>> The root path (in the following: zkRoot) for your spouts data is
>>> /${transactional.zookeeper.root from storm.yaml}/${txId you set with
>>> TopologyBuilder.newStream}/user.
>>>
>>> For the storm-kafka spout the offsets are stored in one of the following
>>> two paths:
>>> ${zkRoot}/${topicName}partition_${partition} if you are using wildcard
>>> topic subscriptions
>>> ${zkRoot}/partition_${partition} otherwise
>>>
>>> The storage format for storm-kafka is as follows:
>>> { "${topicName}partition_${partition}': {"offset": 0, "nextOffset": 2 }
>>> } if you are using wildcard topic subscriptions
>>> { "${topicName}partition_${partition}': {"offset": 0, "nextOffset": 2 }
>>> } otherwise (I left out some irrelevant properties)
>>>
>>> For storm-kafka-client the zk path is
>>> ${zkRoot}/${topicName}@${partition}
>>>
>>> and the storage format is
>>> { "${topicName}@${partition}': {"firstOffset": 0, "nextOffset": 2 } }
>>>
>>> In order to migrate from storm-kafka to storm-kafka-client, we need to
>>> stop the topology and run a script that moves the offsets from the old
>>> location/format to the new location/format. There's no way to tell Trident
>>> to read from one path/format and write to another, so it has to be done
>>> offline. Once the offsets are migrated, the spout can be replaced in the
>>> topology and the topology can be redeployed.
>>>
>>> I might look at writing an application that can do this at some point,
>>> but it might take me a while. If you'd like to look at it yourself, here's
>>> some pointers where to start:
>>> * This is where the offset are written to Zookeeper, assuming you use an
>>> opaque spout https://github.com/apache/stor
>>> m/blob/master/storm-client/src/jvm/org/apache/storm/trident/
>>> spout/OpaquePartitionedTridentSpoutExecutor.java#L184. You might want
>>> to look at this class for a bit (particularly the emit function), because
>>> it's pretty useful for understanding how/where Trident stores metadata for
>>> spouts.
>>> * The return value of https://github.com/apache/stor
>>> m/blob/master/external/storm-kafka/src/jvm/org/apache/storm/
>>> kafka/trident/TridentKafkaEmitter.java#L85 defines the format of what's
>>> being saved to Zookeeper for storm-kafka. It's being wrapped in a map so
>>> the full written value is { "${topicName}partition_${partition}':
>>> ${theReturnValue} } (see the storage format note above, it's different if
>>> you're not using wildcard subscriptions)
>>> * Similarly for storm-kafka-client the return value of
>>> https://github.com/apache/storm/blob/master/external/storm-k
>>> afka-client/src/main/java/org/apache/storm/kafka/spout/tride
>>> nt/KafkaTridentSpoutEmitter.java#L106 defines the format of what that
>>> spout saves to Zookeeper (and expects to find).
>>> * You should use zkCli (it's in your zookeeper/bin directory) to explore
>>> your Zookeeper filesystem. It should be pretty easy to find your offsets in
>>> there with that tool.
>>>
>>> Sorry about the wall of text, this turned out to have a lot of detail to
>>> cover.
>>>
>>> 2018-01-10 21:40 GMT+01:00 Nasron Cheong <na...@gmail.com>:
>>>
>>>> Thanks Stig,
>>>>
>>>> So after some digging, I realized we are really migrating from the
>>>> kafka trident emitter in storm-kafka, to the trident emitter in
>>>> storm-kafka-client.
>>>>
>>>> As far as I can see, the offset information is still stored in zk, and
>>>> the offset info for storm-kafka is (https://github.com/apache/sto
>>>> rm/blob/master/external/storm-kafka/src/jvm/org/apache/storm
>>>> /kafka/trident/TridentKafkaEmitter.java#L140)
>>>>
>>>> However this seems quite different from storm-kafka-client, which uses
>>>> https://github.com/apache/storm/blob/master/external/st
>>>> orm-kafka-client/src/main/java/org/apache/storm/kafka/spout/
>>>> trident/KafkaTridentSpoutBatchMetadata.java#L56
>>>>
>>>> I'm not sure under which zknode this information is stored - and if the
>>>> zknode itself is different between the two implementations.
>>>>
>>>> Looks like I need a tool to copy the stored values in zk from old
>>>> storm-kafka to storm-kafka-client?
>>>>
>>>> Another option I suppose is to:
>>>> - stop topic producers
>>>> - run the old code until it drains all topics
>>>> - start new code with FirstPollOffsetStrategy.LATEST
>>>>
>>>> Although this seems risky.
>>>>
>>>> Thanks!
>>>>
>>>> - Nasron
>>>>
>>>>
>>>> On Thu, Dec 21, 2017 at 4:23 PM, Stig Rohde Døssing <sr...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Nasron,
>>>>>
>>>>> I don't believe there's currently a tool to help you migrate. We did
>>>>> it manually by writing a small utility that looked up the commit offsets in
>>>>> Storm's Zookeeper, opened a KafkaConsumer with the new consumer group id
>>>>> and committed the offsets for the appropriate partitions. We stopped our
>>>>> topologies, used this utility and redeployed with the new spout.
>>>>>
>>>>> Assuming there isn't already a tool for migration floating around
>>>>> somewhere, I think we could probably build some migration support into the
>>>>> storm-kafka-client spout. If the path to the old offsets in Storm's
>>>>> Zookeeper is given, we might be able to extract them and start up the new
>>>>> spout from there.
>>>>>
>>>>> 2017-12-19 21:59 GMT+01:00 Nasron Cheong <na...@gmail.com>:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I'm trying to determine steps for migration to the storm-kafka-client
>>>>>> in order to use the new kafka client.
>>>>>>
>>>>>> It's not quite clear to me how offsets are migrated - is there a
>>>>>> specific set of steps to ensure offsets are moved from the ZK based offsets
>>>>>> into the kafka based offsets?
>>>>>>
>>>>>> Or is the original configuration respected, and storm-kafka-client
>>>>>> can mostly be a drop in replacement?
>>>>>>
>>>>>> I want to avoid having spouts reset to the beginning of topics after
>>>>>> deployment, due to this change.
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> - Nasron
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Migrating from storm-kafka to storm-kafka-client

Posted by Stig Rohde Døssing <sr...@apache.org>.
There's a small discrepancy, in the old spout we track the offset the next
batch should start at, while in storm-kafka-client we track the last
emitted offset. So for example if offset 0, 1, 2 were emitted, the old
spout would store offset = 0, nextOffset = 3. The new spout would store
firstOffset = 0, lastOffset = 2. I think it should be lastOffset =
nextOffset - 1. Other than that I agree with your mapping.

2018-01-12 3:43 GMT+01:00 Nasron Cheong <na...@gmail.com>:

> Hi Stig,
>
> That's great! Thanks for all the info. Looking through the code, one small
> detail is the difference between storm-kafka-client's format and
> storm-kafka. The former uses 'firstOffset' and 'lastOffset' and the latter
> uses 'offset' and 'nextOffset'.
>
> So, can I map with
>
> firstOffset = offset
>
> and
>
> lastOffset = nextOffset+1 ?
>
> Looking through the code it seems to be that nextOffset is placed after
> the last consumed message, but I'm not sure.
>
> Thanks
>
> - Nasron
>
> On Thu, Jan 11, 2018 at 5:43 PM, Stig Rohde Døssing <sr...@apache.org>
> wrote:
>
>> Nasron,
>>
>> Okay, migrating a Trident spout is a very different thing. Trident spouts
>> store their state in Storm's zookeeper (unless you decide otherwise by
>> setting transactional.zookeeper.servers in storm.yaml). This also
>> applies to the storm-kafka-client Trident spout, so we won't need to move
>> offsets into Kafka.
>>
>> The idea of stopping all the producers and starting at LATEST (or
>> UNCOMMITTED_LATEST) is decent, but as you note there's a (small) risk of
>> skipping tuples. In order to get Trident to commit something, you have to
>> deploy the new topology with LATEST and start the producers again, wait
>> until at least one commit happens, and then take the topology back down and
>> redeploy with whatever your first poll strategy normally is. If the worker
>> crashes before the spout manages to commit something, you will skip tuples.
>>
>> If you don't want to do that, here's my notes on storm-kafka ->
>> storm-kafka-client for Trident:
>>
>> The storage formats and zk paths for the two spouts are a little
>> different. Both spouts store their state as JSON maps, but some of the keys
>> are different. I use ${} below to indicate variable substitution.
>>
>> The root path (in the following: zkRoot) for your spouts data is
>> /${transactional.zookeeper.root from storm.yaml}/${txId you set with
>> TopologyBuilder.newStream}/user.
>>
>> For the storm-kafka spout the offsets are stored in one of the following
>> two paths:
>> ${zkRoot}/${topicName}partition_${partition} if you are using wildcard
>> topic subscriptions
>> ${zkRoot}/partition_${partition} otherwise
>>
>> The storage format for storm-kafka is as follows:
>> { "${topicName}partition_${partition}': {"offset": 0, "nextOffset": 2 }
>> } if you are using wildcard topic subscriptions
>> { "${topicName}partition_${partition}': {"offset": 0, "nextOffset": 2 }
>> } otherwise (I left out some irrelevant properties)
>>
>> For storm-kafka-client the zk path is
>> ${zkRoot}/${topicName}@${partition}
>>
>> and the storage format is
>> { "${topicName}@${partition}': {"firstOffset": 0, "nextOffset": 2 } }
>>
>> In order to migrate from storm-kafka to storm-kafka-client, we need to
>> stop the topology and run a script that moves the offsets from the old
>> location/format to the new location/format. There's no way to tell Trident
>> to read from one path/format and write to another, so it has to be done
>> offline. Once the offsets are migrated, the spout can be replaced in the
>> topology and the topology can be redeployed.
>>
>> I might look at writing an application that can do this at some point,
>> but it might take me a while. If you'd like to look at it yourself, here's
>> some pointers where to start:
>> * This is where the offset are written to Zookeeper, assuming you use an
>> opaque spout https://github.com/apache/storm/blob/master/storm-client/src
>> /jvm/org/apache/storm/trident/spout/OpaquePartitionedTrident
>> SpoutExecutor.java#L184. You might want to look at this class for a bit
>> (particularly the emit function), because it's pretty useful for
>> understanding how/where Trident stores metadata for spouts.
>> * The return value of https://github.com/apache/stor
>> m/blob/master/external/storm-kafka/src/jvm/org/apache/storm/
>> kafka/trident/TridentKafkaEmitter.java#L85 defines the format of what's
>> being saved to Zookeeper for storm-kafka. It's being wrapped in a map so
>> the full written value is { "${topicName}partition_${partition}':
>> ${theReturnValue} } (see the storage format note above, it's different if
>> you're not using wildcard subscriptions)
>> * Similarly for storm-kafka-client the return value of
>> https://github.com/apache/storm/blob/master/external/storm-k
>> afka-client/src/main/java/org/apache/storm/kafka/spout/tride
>> nt/KafkaTridentSpoutEmitter.java#L106 defines the format of what that
>> spout saves to Zookeeper (and expects to find).
>> * You should use zkCli (it's in your zookeeper/bin directory) to explore
>> your Zookeeper filesystem. It should be pretty easy to find your offsets in
>> there with that tool.
>>
>> Sorry about the wall of text, this turned out to have a lot of detail to
>> cover.
>>
>> 2018-01-10 21:40 GMT+01:00 Nasron Cheong <na...@gmail.com>:
>>
>>> Thanks Stig,
>>>
>>> So after some digging, I realized we are really migrating from the kafka
>>> trident emitter in storm-kafka, to the trident emitter in
>>> storm-kafka-client.
>>>
>>> As far as I can see, the offset information is still stored in zk, and
>>> the offset info for storm-kafka is (https://github.com/apache/sto
>>> rm/blob/master/external/storm-kafka/src/jvm/org/apache/storm
>>> /kafka/trident/TridentKafkaEmitter.java#L140)
>>>
>>> However this seems quite different from storm-kafka-client, which uses
>>> https://github.com/apache/storm/blob/master/external/st
>>> orm-kafka-client/src/main/java/org/apache/storm/kafka/spout/
>>> trident/KafkaTridentSpoutBatchMetadata.java#L56
>>>
>>> I'm not sure under which zknode this information is stored - and if the
>>> zknode itself is different between the two implementations.
>>>
>>> Looks like I need a tool to copy the stored values in zk from old
>>> storm-kafka to storm-kafka-client?
>>>
>>> Another option I suppose is to:
>>> - stop topic producers
>>> - run the old code until it drains all topics
>>> - start new code with FirstPollOffsetStrategy.LATEST
>>>
>>> Although this seems risky.
>>>
>>> Thanks!
>>>
>>> - Nasron
>>>
>>>
>>> On Thu, Dec 21, 2017 at 4:23 PM, Stig Rohde Døssing <sr...@apache.org>
>>> wrote:
>>>
>>>> Hi Nasron,
>>>>
>>>> I don't believe there's currently a tool to help you migrate. We did it
>>>> manually by writing a small utility that looked up the commit offsets in
>>>> Storm's Zookeeper, opened a KafkaConsumer with the new consumer group id
>>>> and committed the offsets for the appropriate partitions. We stopped our
>>>> topologies, used this utility and redeployed with the new spout.
>>>>
>>>> Assuming there isn't already a tool for migration floating around
>>>> somewhere, I think we could probably build some migration support into the
>>>> storm-kafka-client spout. If the path to the old offsets in Storm's
>>>> Zookeeper is given, we might be able to extract them and start up the new
>>>> spout from there.
>>>>
>>>> 2017-12-19 21:59 GMT+01:00 Nasron Cheong <na...@gmail.com>:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'm trying to determine steps for migration to the storm-kafka-client
>>>>> in order to use the new kafka client.
>>>>>
>>>>> It's not quite clear to me how offsets are migrated - is there a
>>>>> specific set of steps to ensure offsets are moved from the ZK based offsets
>>>>> into the kafka based offsets?
>>>>>
>>>>> Or is the original configuration respected, and storm-kafka-client can
>>>>> mostly be a drop in replacement?
>>>>>
>>>>> I want to avoid having spouts reset to the beginning of topics after
>>>>> deployment, due to this change.
>>>>>
>>>>> Thanks.
>>>>>
>>>>> - Nasron
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Migrating from storm-kafka to storm-kafka-client

Posted by Nasron Cheong <na...@gmail.com>.
Hi Stig,

That's great! Thanks for all the info. Looking through the code, one small
detail is the difference between storm-kafka-client's format and
storm-kafka. The former uses 'firstOffset' and 'lastOffset' and the latter
uses 'offset' and 'nextOffset'.

So, can I map with

firstOffset = offset

and

lastOffset = nextOffset+1 ?

Looking through the code it seems to be that nextOffset is placed after the
last consumed message, but I'm not sure.

Thanks

- Nasron

On Thu, Jan 11, 2018 at 5:43 PM, Stig Rohde Døssing <sr...@apache.org> wrote:

> Nasron,
>
> Okay, migrating a Trident spout is a very different thing. Trident spouts
> store their state in Storm's zookeeper (unless you decide otherwise by
> setting transactional.zookeeper.servers in storm.yaml). This also applies
> to the storm-kafka-client Trident spout, so we won't need to move offsets
> into Kafka.
>
> The idea of stopping all the producers and starting at LATEST (or
> UNCOMMITTED_LATEST) is decent, but as you note there's a (small) risk of
> skipping tuples. In order to get Trident to commit something, you have to
> deploy the new topology with LATEST and start the producers again, wait
> until at least one commit happens, and then take the topology back down and
> redeploy with whatever your first poll strategy normally is. If the worker
> crashes before the spout manages to commit something, you will skip tuples.
>
> If you don't want to do that, here's my notes on storm-kafka ->
> storm-kafka-client for Trident:
>
> The storage formats and zk paths for the two spouts are a little
> different. Both spouts store their state as JSON maps, but some of the keys
> are different. I use ${} below to indicate variable substitution.
>
> The root path (in the following: zkRoot) for your spouts data is
> /${transactional.zookeeper.root from storm.yaml}/${txId you set with
> TopologyBuilder.newStream}/user.
>
> For the storm-kafka spout the offsets are stored in one of the following
> two paths:
> ${zkRoot}/${topicName}partition_${partition} if you are using wildcard
> topic subscriptions
> ${zkRoot}/partition_${partition} otherwise
>
> The storage format for storm-kafka is as follows:
> { "${topicName}partition_${partition}': {"offset": 0, "nextOffset": 2 } }
> if you are using wildcard topic subscriptions
> { "${topicName}partition_${partition}': {"offset": 0, "nextOffset": 2 } }
> otherwise (I left out some irrelevant properties)
>
> For storm-kafka-client the zk path is
> ${zkRoot}/${topicName}@${partition}
>
> and the storage format is
> { "${topicName}@${partition}': {"firstOffset": 0, "nextOffset": 2 } }
>
> In order to migrate from storm-kafka to storm-kafka-client, we need to
> stop the topology and run a script that moves the offsets from the old
> location/format to the new location/format. There's no way to tell Trident
> to read from one path/format and write to another, so it has to be done
> offline. Once the offsets are migrated, the spout can be replaced in the
> topology and the topology can be redeployed.
>
> I might look at writing an application that can do this at some point, but
> it might take me a while. If you'd like to look at it yourself, here's some
> pointers where to start:
> * This is where the offset are written to Zookeeper, assuming you use an
> opaque spout https://github.com/apache/storm/blob/master/storm-client/
> src/jvm/org/apache/storm/trident/spout/OpaquePartitione
> dTridentSpoutExecutor.java#L184. You might want to look at this class for
> a bit (particularly the emit function), because it's pretty useful for
> understanding how/where Trident stores metadata for spouts.
> * The return value of https://github.com/apache/stor
> m/blob/master/external/storm-kafka/src/jvm/org/apache/
> storm/kafka/trident/TridentKafkaEmitter.java#L85 defines the format of
> what's being saved to Zookeeper for storm-kafka. It's being wrapped in a
> map so the full written value is { "${topicName}partition_${partition}':
> ${theReturnValue} } (see the storage format note above, it's different if
> you're not using wildcard subscriptions)
> * Similarly for storm-kafka-client the return value of
> https://github.com/apache/storm/blob/master/external/storm-
> kafka-client/src/main/java/org/apache/storm/kafka/spout/
> trident/KafkaTridentSpoutEmitter.java#L106 defines the format of what
> that spout saves to Zookeeper (and expects to find).
> * You should use zkCli (it's in your zookeeper/bin directory) to explore
> your Zookeeper filesystem. It should be pretty easy to find your offsets in
> there with that tool.
>
> Sorry about the wall of text, this turned out to have a lot of detail to
> cover.
>
> 2018-01-10 21:40 GMT+01:00 Nasron Cheong <na...@gmail.com>:
>
>> Thanks Stig,
>>
>> So after some digging, I realized we are really migrating from the kafka
>> trident emitter in storm-kafka, to the trident emitter in
>> storm-kafka-client.
>>
>> As far as I can see, the offset information is still stored in zk, and
>> the offset info for storm-kafka is (https://github.com/apache/sto
>> rm/blob/master/external/storm-kafka/src/jvm/org/apache/
>> storm/kafka/trident/TridentKafkaEmitter.java#L140)
>>
>> However this seems quite different from storm-kafka-client, which uses
>> https://github.com/apache/storm/blob/master/external/
>> storm-kafka-client/src/main/java/org/apache/storm/kafka/
>> spout/trident/KafkaTridentSpoutBatchMetadata.java#L56
>>
>> I'm not sure under which zknode this information is stored - and if the
>> zknode itself is different between the two implementations.
>>
>> Looks like I need a tool to copy the stored values in zk from old
>> storm-kafka to storm-kafka-client?
>>
>> Another option I suppose is to:
>> - stop topic producers
>> - run the old code until it drains all topics
>> - start new code with FirstPollOffsetStrategy.LATEST
>>
>> Although this seems risky.
>>
>> Thanks!
>>
>> - Nasron
>>
>>
>> On Thu, Dec 21, 2017 at 4:23 PM, Stig Rohde Døssing <sr...@apache.org>
>> wrote:
>>
>>> Hi Nasron,
>>>
>>> I don't believe there's currently a tool to help you migrate. We did it
>>> manually by writing a small utility that looked up the commit offsets in
>>> Storm's Zookeeper, opened a KafkaConsumer with the new consumer group id
>>> and committed the offsets for the appropriate partitions. We stopped our
>>> topologies, used this utility and redeployed with the new spout.
>>>
>>> Assuming there isn't already a tool for migration floating around
>>> somewhere, I think we could probably build some migration support into the
>>> storm-kafka-client spout. If the path to the old offsets in Storm's
>>> Zookeeper is given, we might be able to extract them and start up the new
>>> spout from there.
>>>
>>> 2017-12-19 21:59 GMT+01:00 Nasron Cheong <na...@gmail.com>:
>>>
>>>> Hi,
>>>>
>>>> I'm trying to determine steps for migration to the storm-kafka-client
>>>> in order to use the new kafka client.
>>>>
>>>> It's not quite clear to me how offsets are migrated - is there a
>>>> specific set of steps to ensure offsets are moved from the ZK based offsets
>>>> into the kafka based offsets?
>>>>
>>>> Or is the original configuration respected, and storm-kafka-client can
>>>> mostly be a drop in replacement?
>>>>
>>>> I want to avoid having spouts reset to the beginning of topics after
>>>> deployment, due to this change.
>>>>
>>>> Thanks.
>>>>
>>>> - Nasron
>>>>
>>>
>>>
>>
>

Re: Migrating from storm-kafka to storm-kafka-client

Posted by Stig Rohde Døssing <sr...@apache.org>.
Nasron,

Okay, migrating a Trident spout is a very different thing. Trident spouts
store their state in Storm's zookeeper (unless you decide otherwise by
setting transactional.zookeeper.servers in storm.yaml). This also applies
to the storm-kafka-client Trident spout, so we won't need to move offsets
into Kafka.

The idea of stopping all the producers and starting at LATEST (or
UNCOMMITTED_LATEST) is decent, but as you note there's a (small) risk of
skipping tuples. In order to get Trident to commit something, you have to
deploy the new topology with LATEST and start the producers again, wait
until at least one commit happens, and then take the topology back down and
redeploy with whatever your first poll strategy normally is. If the worker
crashes before the spout manages to commit something, you will skip tuples.

If you don't want to do that, here's my notes on storm-kafka ->
storm-kafka-client for Trident:

The storage formats and zk paths for the two spouts are a little different.
Both spouts store their state as JSON maps, but some of the keys are
different. I use ${} below to indicate variable substitution.

The root path (in the following: zkRoot) for your spouts data is
/${transactional.zookeeper.root from storm.yaml}/${txId you set with
TopologyBuilder.newStream}/user.

For the storm-kafka spout the offsets are stored in one of the following
two paths:
${zkRoot}/${topicName}partition_${partition} if you are using wildcard
topic subscriptions
${zkRoot}/partition_${partition} otherwise

The storage format for storm-kafka is as follows:
{ "${topicName}partition_${partition}': {"offset": 0, "nextOffset": 2 } }
if you are using wildcard topic subscriptions
{ "${topicName}partition_${partition}': {"offset": 0, "nextOffset": 2 } }
otherwise (I left out some irrelevant properties)

For storm-kafka-client the zk path is
${zkRoot}/${topicName}@${partition}

and the storage format is
{ "${topicName}@${partition}': {"firstOffset": 0, "nextOffset": 2 } }

In order to migrate from storm-kafka to storm-kafka-client, we need to stop
the topology and run a script that moves the offsets from the old
location/format to the new location/format. There's no way to tell Trident
to read from one path/format and write to another, so it has to be done
offline. Once the offsets are migrated, the spout can be replaced in the
topology and the topology can be redeployed.

I might look at writing an application that can do this at some point, but
it might take me a while. If you'd like to look at it yourself, here's some
pointers where to start:
* This is where the offset are written to Zookeeper, assuming you use an
opaque spout https://github.com/apache/storm/blob/master/storm-
client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutE
xecutor.java#L184. You might want to look at this class for a bit
(particularly the emit function), because it's pretty useful for
understanding how/where Trident stores metadata for spouts.
* The return value of https://github.com/apache/storm/blob/master/external/
storm-kafka/src/jvm/org/apache/storm/kafka/trident/
TridentKafkaEmitter.java#L85 defines the format of what's being saved to
Zookeeper for storm-kafka. It's being wrapped in a map so the full written
value is { "${topicName}partition_${partition}': ${theReturnValue} } (see
the storage format note above, it's different if you're not using wildcard
subscriptions)
* Similarly for storm-kafka-client the return value of
https://github.com/apache/storm/blob/master/external/
storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/
KafkaTridentSpoutEmitter.java#L106 defines the format of what that spout
saves to Zookeeper (and expects to find).
* You should use zkCli (it's in your zookeeper/bin directory) to explore
your Zookeeper filesystem. It should be pretty easy to find your offsets in
there with that tool.

Sorry about the wall of text, this turned out to have a lot of detail to
cover.

2018-01-10 21:40 GMT+01:00 Nasron Cheong <na...@gmail.com>:

> Thanks Stig,
>
> So after some digging, I realized we are really migrating from the kafka
> trident emitter in storm-kafka, to the trident emitter in
> storm-kafka-client.
>
> As far as I can see, the offset information is still stored in zk, and the
> offset info for storm-kafka is (https://github.com/apache/
> storm/blob/master/external/storm-kafka/src/jvm/org/
> apache/storm/kafka/trident/TridentKafkaEmitter.java#L140)
>
> However this seems quite different from storm-kafka-client, which uses
> https://github.com/apache/storm/blob/master/external/storm-kafka-client/
> src/main/java/org/apache/storm/kafka/spout/trident/
> KafkaTridentSpoutBatchMetadata.java#L56
>
> I'm not sure under which zknode this information is stored - and if the
> zknode itself is different between the two implementations.
>
> Looks like I need a tool to copy the stored values in zk from old
> storm-kafka to storm-kafka-client?
>
> Another option I suppose is to:
> - stop topic producers
> - run the old code until it drains all topics
> - start new code with FirstPollOffsetStrategy.LATEST
>
> Although this seems risky.
>
> Thanks!
>
> - Nasron
>
>
> On Thu, Dec 21, 2017 at 4:23 PM, Stig Rohde Døssing <sr...@apache.org>
> wrote:
>
>> Hi Nasron,
>>
>> I don't believe there's currently a tool to help you migrate. We did it
>> manually by writing a small utility that looked up the commit offsets in
>> Storm's Zookeeper, opened a KafkaConsumer with the new consumer group id
>> and committed the offsets for the appropriate partitions. We stopped our
>> topologies, used this utility and redeployed with the new spout.
>>
>> Assuming there isn't already a tool for migration floating around
>> somewhere, I think we could probably build some migration support into the
>> storm-kafka-client spout. If the path to the old offsets in Storm's
>> Zookeeper is given, we might be able to extract them and start up the new
>> spout from there.
>>
>> 2017-12-19 21:59 GMT+01:00 Nasron Cheong <na...@gmail.com>:
>>
>>> Hi,
>>>
>>> I'm trying to determine steps for migration to the storm-kafka-client in
>>> order to use the new kafka client.
>>>
>>> It's not quite clear to me how offsets are migrated - is there a
>>> specific set of steps to ensure offsets are moved from the ZK based offsets
>>> into the kafka based offsets?
>>>
>>> Or is the original configuration respected, and storm-kafka-client can
>>> mostly be a drop in replacement?
>>>
>>> I want to avoid having spouts reset to the beginning of topics after
>>> deployment, due to this change.
>>>
>>> Thanks.
>>>
>>> - Nasron
>>>
>>
>>
>

Re: Migrating from storm-kafka to storm-kafka-client

Posted by Nasron Cheong <na...@gmail.com>.
Thanks Stig,

So after some digging, I realized we are really migrating from the kafka
trident emitter in storm-kafka, to the trident emitter in
storm-kafka-client.

As far as I can see, the offset information is still stored in zk, and the
offset info for storm-kafka is (
https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java#L140
)

However this seems quite different from storm-kafka-client, which uses
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java#L56

I'm not sure under which zknode this information is stored - and if the
zknode itself is different between the two implementations.

Looks like I need a tool to copy the stored values in zk from old
storm-kafka to storm-kafka-client?

Another option I suppose is to:
- stop topic producers
- run the old code until it drains all topics
- start new code with FirstPollOffsetStrategy.LATEST

Although this seems risky.

Thanks!

- Nasron


On Thu, Dec 21, 2017 at 4:23 PM, Stig Rohde Døssing <sr...@apache.org> wrote:

> Hi Nasron,
>
> I don't believe there's currently a tool to help you migrate. We did it
> manually by writing a small utility that looked up the commit offsets in
> Storm's Zookeeper, opened a KafkaConsumer with the new consumer group id
> and committed the offsets for the appropriate partitions. We stopped our
> topologies, used this utility and redeployed with the new spout.
>
> Assuming there isn't already a tool for migration floating around
> somewhere, I think we could probably build some migration support into the
> storm-kafka-client spout. If the path to the old offsets in Storm's
> Zookeeper is given, we might be able to extract them and start up the new
> spout from there.
>
> 2017-12-19 21:59 GMT+01:00 Nasron Cheong <na...@gmail.com>:
>
>> Hi,
>>
>> I'm trying to determine steps for migration to the storm-kafka-client in
>> order to use the new kafka client.
>>
>> It's not quite clear to me how offsets are migrated - is there a specific
>> set of steps to ensure offsets are moved from the ZK based offsets into the
>> kafka based offsets?
>>
>> Or is the original configuration respected, and storm-kafka-client can
>> mostly be a drop in replacement?
>>
>> I want to avoid having spouts reset to the beginning of topics after
>> deployment, due to this change.
>>
>> Thanks.
>>
>> - Nasron
>>
>
>

Re: Migrating from storm-kafka to storm-kafka-client

Posted by Manish Sharma <ma...@gmail.com>.
We solved this offset sync issue by making our topology idempotent, (we
could do that with our use case)
our storm topology consumes documents from kafka and commits to
elasticsearch & inserting records to cassandra..
our topology can re-consume from beginning of the queue, and the docids and
primary keys are chosen such that the records get overwritten with the same
document.

cheers, /Manish


On Thu, Dec 21, 2017 at 1:23 PM, Stig Rohde Døssing <sr...@apache.org> wrote:

> Hi Nasron,
>
> I don't believe there's currently a tool to help you migrate. We did it
> manually by writing a small utility that looked up the commit offsets in
> Storm's Zookeeper, opened a KafkaConsumer with the new consumer group id
> and committed the offsets for the appropriate partitions. We stopped our
> topologies, used this utility and redeployed with the new spout.
>
> Assuming there isn't already a tool for migration floating around
> somewhere, I think we could probably build some migration support into the
> storm-kafka-client spout. If the path to the old offsets in Storm's
> Zookeeper is given, we might be able to extract them and start up the new
> spout from there.
>
> 2017-12-19 21:59 GMT+01:00 Nasron Cheong <na...@gmail.com>:
>
>> Hi,
>>
>> I'm trying to determine steps for migration to the storm-kafka-client in
>> order to use the new kafka client.
>>
>> It's not quite clear to me how offsets are migrated - is there a specific
>> set of steps to ensure offsets are moved from the ZK based offsets into the
>> kafka based offsets?
>>
>> Or is the original configuration respected, and storm-kafka-client can
>> mostly be a drop in replacement?
>>
>> I want to avoid having spouts reset to the beginning of topics after
>> deployment, due to this change.
>>
>> Thanks.
>>
>> - Nasron
>>
>
>

Re: Migrating from storm-kafka to storm-kafka-client

Posted by Stig Rohde Døssing <sr...@apache.org>.
Hi Nasron,

I don't believe there's currently a tool to help you migrate. We did it
manually by writing a small utility that looked up the commit offsets in
Storm's Zookeeper, opened a KafkaConsumer with the new consumer group id
and committed the offsets for the appropriate partitions. We stopped our
topologies, used this utility and redeployed with the new spout.

Assuming there isn't already a tool for migration floating around
somewhere, I think we could probably build some migration support into the
storm-kafka-client spout. If the path to the old offsets in Storm's
Zookeeper is given, we might be able to extract them and start up the new
spout from there.

2017-12-19 21:59 GMT+01:00 Nasron Cheong <na...@gmail.com>:

> Hi,
>
> I'm trying to determine steps for migration to the storm-kafka-client in
> order to use the new kafka client.
>
> It's not quite clear to me how offsets are migrated - is there a specific
> set of steps to ensure offsets are moved from the ZK based offsets into the
> kafka based offsets?
>
> Or is the original configuration respected, and storm-kafka-client can
> mostly be a drop in replacement?
>
> I want to avoid having spouts reset to the beginning of topics after
> deployment, due to this change.
>
> Thanks.
>
> - Nasron
>