You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Steven Schlansker <ss...@opentable.com> on 2017/06/05 21:12:07 UTC

Reliably implementing global KeyValueStore#get

Hi everyone, me again :)

I'm still trying to implement my "remoting" layer that allows
my clients to see the partitioned Kafka Streams state
regardless of which instance they hit.  Roughly, my lookup is:

Message get(Key key) {
    RemoteInstance instance = selectPartition(key);
    return instance.get(key); // http remoting
}

RemoteInstance.get(Key key) { // http endpoint
    return readOnlyKeyValueStore.get(key);
}

However, the mapping of partitions to instances may change.
If you call KeyValueStore.get(K) where K is on a partition you
don't own, it returns null.  This is indistinguishable from a
successful get on a key that doesn't exist.

If one instance selects a sibling instance right as the partition is failing
off of that instance, it may get routed there and by the time it gets
the request no longer "owns" the partition -- returns a false 'null'.

You can try re-checking after you get a null value, but that's susceptible
to the same race -- it's unlikely but possible that the data migrates *back*
before you do this re-check.

Is there any way to correctly implement this without races?  I'd imagine
you need a new primitive like KeyValueStore#get that atomically finds
the key or throws an exception if it is not in an owned partition
at the time of lookup so you know to recheck the partition and retry.

Thoughts?

Thanks again,
Steven


Re: Reliably implementing global KeyValueStore#get

Posted by Steven Schlansker <ss...@opentable.com>.
Indeed, all good points.  Thanks all for the continuing valuable feedback!

> On Jun 7, 2017, at 3:07 PM, Matthias J. Sax <ma...@confluent.io> wrote:
> 
> If you write to remote DB, keep in mind that this will impact you
> Streams app, as you loose data locality.
> 
> Thus, populating a DB from the changelog might be better. It also
> decouples both systems what give you the advantage that your Streams app
> can still run if DB has an issues. If you write directly into DB and DB
> is not available Streams App is doomed to fail too.
> 
> 
> -Matthias
> 
> 
> On 6/7/17 2:54 PM, Jan Filipiak wrote:
>> Depends, embedded postgress puts you into the same spot.
>> 
>> But if you use your state store change log to materialize into a
>> postgress; that might work out decently.
>> Current JDBC doesn't support delete which is an issue but writing a
>> custom sink is not to hard.
>> 
>> Best Jan
>> 
>> 
>> On 07.06.2017 23:47, Steven Schlansker wrote:
>>> I was actually considering writing my own KeyValueStore backed
>>> by e.g. a Postgres or the like.
>>> 
>>> Is there some feature Connect gains me that would make it better
>>> than such an approach?
>>> 
>>> thanks
>>> 
>>>> On Jun 7, 2017, at 2:20 PM, Jan Filipiak <Ja...@trivago.com>
>>>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> have you thought about using connect to put data into a store that is
>>>> more reasonable for your kind of query requirements?
>>>> 
>>>> Best Jan
>>>> 
>>>> On 07.06.2017 00:29, Steven Schlansker wrote:
>>>>>> On Jun 6, 2017, at 2:52 PM, Damian Guy <da...@gmail.com> wrote:
>>>>>> 
>>>>>> Steven,
>>>>>> 
>>>>>> In practice, data shouldn't be migrating that often. If it is then you
>>>>>> probably have bigger problems.
>>>>> Understood and agreed, but when designing distributed systems, it
>>>>> usually
>>>>> helps to model for the worst case rather than the "well that should
>>>>> never
>>>>> happen" case, lest you find yourself fixing those bugs at 3am
>>>>> instead :)
>>>>> 
>>>>> I'd like to be able to induce extreme pain at the Kafka layer
>>>>> (change leader
>>>>> every 3 seconds and migrate all partitions around randomly) and
>>>>> still have
>>>>> my app behave correctly.
>>>>> 
>>>>>> You should be able to use the metadata api
>>>>>> to find the instance the key should be on and then when you check
>>>>>> that node
>>>>>> you can also check with the metadata api that the key should still
>>>>>> be on
>>>>>> this host. If streams is rebalancing while you query an exception
>>>>>> will be
>>>>>> raised and you'll need to retry the request once the rebalance has
>>>>>> completed.
>>>>> Agreed here as well.  But let's assume I have a very fast replication
>>>>> setup (assume it takes zero time, for the sake of argument) -- I'm
>>>>> fairly
>>>>> sure there's still a race here as this exception only fires *during
>>>>> a migration*
>>>>> not *after a migration that may have invalidated your metadata
>>>>> lookup completes*
>>>>> 
>>>>>> HTH,
>>>>>> Damian
>>>>>> 
>>>>>> On Tue, 6 Jun 2017 at 18:11 Steven Schlansker
>>>>>> <ss...@opentable.com>
>>>>>> wrote:
>>>>>> 
>>>>>>>> On Jun 6, 2017, at 6:16 AM, Eno Thereska <en...@gmail.com>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>> Hi Steven,
>>>>>>>> 
>>>>>>>> Do you know beforehand if a key exists? If you know that and are
>>>>>>>> getting
>>>>>>> null() the code will have to retry by refreshing the metadata and
>>>>>>> going to
>>>>>>> the new instance. If you don’t know beforehand if a key exists or
>>>>>>> not you
>>>>>>> might have to check all instances of a store to make sure.
>>>>>>> No, I am not presupposing that the key can exist -- this is a user
>>>>>>> visible
>>>>>>> API and will
>>>>>>> be prone to "accidents" :)
>>>>>>> 
>>>>>>> Thanks for the insight.  I worry that even checking all stores is not
>>>>>>> truly sufficient,
>>>>>>> as querying different all workers at different times in the
>>>>>>> presence of
>>>>>>> migrating data
>>>>>>> can still in theory miss it given pessimal execution.
>>>>>>> 
>>>>>>> I'm sure I've long wandered off into the hypothetical, but I dream
>>>>>>> of some
>>>>>>> day being
>>>>>>> cool like Jepsen :)
>>>>>>> 
>>>>>>>> Eno
>>>>>>>> 
>>>>>>>> 
>>>>>>>>> On Jun 5, 2017, at 10:12 PM, Steven Schlansker <
>>>>>>> sschlansker@opentable.com> wrote:
>>>>>>>>> Hi everyone, me again :)
>>>>>>>>> 
>>>>>>>>> I'm still trying to implement my "remoting" layer that allows
>>>>>>>>> my clients to see the partitioned Kafka Streams state
>>>>>>>>> regardless of which instance they hit.  Roughly, my lookup is:
>>>>>>>>> 
>>>>>>>>> Message get(Key key) {
>>>>>>>>>  RemoteInstance instance = selectPartition(key);
>>>>>>>>>  return instance.get(key); // http remoting
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> RemoteInstance.get(Key key) { // http endpoint
>>>>>>>>>  return readOnlyKeyValueStore.get(key);
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> However, the mapping of partitions to instances may change.
>>>>>>>>> If you call KeyValueStore.get(K) where K is on a partition you
>>>>>>>>> don't own, it returns null.  This is indistinguishable from a
>>>>>>>>> successful get on a key that doesn't exist.
>>>>>>>>> 
>>>>>>>>> If one instance selects a sibling instance right as the
>>>>>>>>> partition is
>>>>>>> failing
>>>>>>>>> off of that instance, it may get routed there and by the time it
>>>>>>>>> gets
>>>>>>>>> the request no longer "owns" the partition -- returns a false
>>>>>>>>> 'null'.
>>>>>>>>> 
>>>>>>>>> You can try re-checking after you get a null value, but that's
>>>>>>> susceptible
>>>>>>>>> to the same race -- it's unlikely but possible that the data
>>>>>>>>> migrates
>>>>>>> *back*
>>>>>>>>> before you do this re-check.
>>>>>>>>> 
>>>>>>>>> Is there any way to correctly implement this without races?  I'd
>>>>>>>>> imagine
>>>>>>>>> you need a new primitive like KeyValueStore#get that atomically
>>>>>>>>> finds
>>>>>>>>> the key or throws an exception if it is not in an owned partition
>>>>>>>>> at the time of lookup so you know to recheck the partition and
>>>>>>>>> retry.
>>>>>>>>> 
>>>>>>>>> Thoughts?
>>>>>>>>> 
>>>>>>>>> Thanks again,
>>>>>>>>> Steven
>>>>>>>>> 
>> 
> 


Re: Reliably implementing global KeyValueStore#get

Posted by "Matthias J. Sax" <ma...@confluent.io>.
If you write to remote DB, keep in mind that this will impact you
Streams app, as you loose data locality.

Thus, populating a DB from the changelog might be better. It also
decouples both systems what give you the advantage that your Streams app
can still run if DB has an issues. If you write directly into DB and DB
is not available Streams App is doomed to fail too.


-Matthias


On 6/7/17 2:54 PM, Jan Filipiak wrote:
> Depends, embedded postgress puts you into the same spot.
> 
> But if you use your state store change log to materialize into a
> postgress; that might work out decently.
> Current JDBC doesn't support delete which is an issue but writing a
> custom sink is not to hard.
> 
> Best Jan
> 
> 
> On 07.06.2017 23:47, Steven Schlansker wrote:
>> I was actually considering writing my own KeyValueStore backed
>> by e.g. a Postgres or the like.
>>
>> Is there some feature Connect gains me that would make it better
>> than such an approach?
>>
>> thanks
>>
>>> On Jun 7, 2017, at 2:20 PM, Jan Filipiak <Ja...@trivago.com>
>>> wrote:
>>>
>>> Hi,
>>>
>>> have you thought about using connect to put data into a store that is
>>> more reasonable for your kind of query requirements?
>>>
>>> Best Jan
>>>
>>> On 07.06.2017 00:29, Steven Schlansker wrote:
>>>>> On Jun 6, 2017, at 2:52 PM, Damian Guy <da...@gmail.com> wrote:
>>>>>
>>>>> Steven,
>>>>>
>>>>> In practice, data shouldn't be migrating that often. If it is then you
>>>>> probably have bigger problems.
>>>> Understood and agreed, but when designing distributed systems, it
>>>> usually
>>>> helps to model for the worst case rather than the "well that should
>>>> never
>>>> happen" case, lest you find yourself fixing those bugs at 3am
>>>> instead :)
>>>>
>>>> I'd like to be able to induce extreme pain at the Kafka layer
>>>> (change leader
>>>> every 3 seconds and migrate all partitions around randomly) and
>>>> still have
>>>> my app behave correctly.
>>>>
>>>>> You should be able to use the metadata api
>>>>> to find the instance the key should be on and then when you check
>>>>> that node
>>>>> you can also check with the metadata api that the key should still
>>>>> be on
>>>>> this host. If streams is rebalancing while you query an exception
>>>>> will be
>>>>> raised and you'll need to retry the request once the rebalance has
>>>>> completed.
>>>> Agreed here as well.  But let's assume I have a very fast replication
>>>> setup (assume it takes zero time, for the sake of argument) -- I'm
>>>> fairly
>>>> sure there's still a race here as this exception only fires *during
>>>> a migration*
>>>> not *after a migration that may have invalidated your metadata
>>>> lookup completes*
>>>>
>>>>> HTH,
>>>>> Damian
>>>>>
>>>>> On Tue, 6 Jun 2017 at 18:11 Steven Schlansker
>>>>> <ss...@opentable.com>
>>>>> wrote:
>>>>>
>>>>>>> On Jun 6, 2017, at 6:16 AM, Eno Thereska <en...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Steven,
>>>>>>>
>>>>>>> Do you know beforehand if a key exists? If you know that and are
>>>>>>> getting
>>>>>> null() the code will have to retry by refreshing the metadata and
>>>>>> going to
>>>>>> the new instance. If you don’t know beforehand if a key exists or
>>>>>> not you
>>>>>> might have to check all instances of a store to make sure.
>>>>>> No, I am not presupposing that the key can exist -- this is a user
>>>>>> visible
>>>>>> API and will
>>>>>> be prone to "accidents" :)
>>>>>>
>>>>>> Thanks for the insight.  I worry that even checking all stores is not
>>>>>> truly sufficient,
>>>>>> as querying different all workers at different times in the
>>>>>> presence of
>>>>>> migrating data
>>>>>> can still in theory miss it given pessimal execution.
>>>>>>
>>>>>> I'm sure I've long wandered off into the hypothetical, but I dream
>>>>>> of some
>>>>>> day being
>>>>>> cool like Jepsen :)
>>>>>>
>>>>>>> Eno
>>>>>>>
>>>>>>>
>>>>>>>> On Jun 5, 2017, at 10:12 PM, Steven Schlansker <
>>>>>> sschlansker@opentable.com> wrote:
>>>>>>>> Hi everyone, me again :)
>>>>>>>>
>>>>>>>> I'm still trying to implement my "remoting" layer that allows
>>>>>>>> my clients to see the partitioned Kafka Streams state
>>>>>>>> regardless of which instance they hit.  Roughly, my lookup is:
>>>>>>>>
>>>>>>>> Message get(Key key) {
>>>>>>>>   RemoteInstance instance = selectPartition(key);
>>>>>>>>   return instance.get(key); // http remoting
>>>>>>>> }
>>>>>>>>
>>>>>>>> RemoteInstance.get(Key key) { // http endpoint
>>>>>>>>   return readOnlyKeyValueStore.get(key);
>>>>>>>> }
>>>>>>>>
>>>>>>>> However, the mapping of partitions to instances may change.
>>>>>>>> If you call KeyValueStore.get(K) where K is on a partition you
>>>>>>>> don't own, it returns null.  This is indistinguishable from a
>>>>>>>> successful get on a key that doesn't exist.
>>>>>>>>
>>>>>>>> If one instance selects a sibling instance right as the
>>>>>>>> partition is
>>>>>> failing
>>>>>>>> off of that instance, it may get routed there and by the time it
>>>>>>>> gets
>>>>>>>> the request no longer "owns" the partition -- returns a false
>>>>>>>> 'null'.
>>>>>>>>
>>>>>>>> You can try re-checking after you get a null value, but that's
>>>>>> susceptible
>>>>>>>> to the same race -- it's unlikely but possible that the data
>>>>>>>> migrates
>>>>>> *back*
>>>>>>>> before you do this re-check.
>>>>>>>>
>>>>>>>> Is there any way to correctly implement this without races?  I'd
>>>>>>>> imagine
>>>>>>>> you need a new primitive like KeyValueStore#get that atomically
>>>>>>>> finds
>>>>>>>> the key or throws an exception if it is not in an owned partition
>>>>>>>> at the time of lookup so you know to recheck the partition and
>>>>>>>> retry.
>>>>>>>>
>>>>>>>> Thoughts?
>>>>>>>>
>>>>>>>> Thanks again,
>>>>>>>> Steven
>>>>>>>>
> 


Re: Reliably implementing global KeyValueStore#get

Posted by Jan Filipiak <Ja...@trivago.com>.
Depends, embedded postgress puts you into the same spot.

But if you use your state store change log to materialize into a 
postgress; that might work out decently.
Current JDBC doesn't support delete which is an issue but writing a 
custom sink is not to hard.

Best Jan


On 07.06.2017 23:47, Steven Schlansker wrote:
> I was actually considering writing my own KeyValueStore backed
> by e.g. a Postgres or the like.
>
> Is there some feature Connect gains me that would make it better
> than such an approach?
>
> thanks
>
>> On Jun 7, 2017, at 2:20 PM, Jan Filipiak <Ja...@trivago.com> wrote:
>>
>> Hi,
>>
>> have you thought about using connect to put data into a store that is more reasonable for your kind of query requirements?
>>
>> Best Jan
>>
>> On 07.06.2017 00:29, Steven Schlansker wrote:
>>>> On Jun 6, 2017, at 2:52 PM, Damian Guy <da...@gmail.com> wrote:
>>>>
>>>> Steven,
>>>>
>>>> In practice, data shouldn't be migrating that often. If it is then you
>>>> probably have bigger problems.
>>> Understood and agreed, but when designing distributed systems, it usually
>>> helps to model for the worst case rather than the "well that should never
>>> happen" case, lest you find yourself fixing those bugs at 3am instead :)
>>>
>>> I'd like to be able to induce extreme pain at the Kafka layer (change leader
>>> every 3 seconds and migrate all partitions around randomly) and still have
>>> my app behave correctly.
>>>
>>>> You should be able to use the metadata api
>>>> to find the instance the key should be on and then when you check that node
>>>> you can also check with the metadata api that the key should still be on
>>>> this host. If streams is rebalancing while you query an exception will be
>>>> raised and you'll need to retry the request once the rebalance has
>>>> completed.
>>> Agreed here as well.  But let's assume I have a very fast replication
>>> setup (assume it takes zero time, for the sake of argument) -- I'm fairly
>>> sure there's still a race here as this exception only fires *during a migration*
>>> not *after a migration that may have invalidated your metadata lookup completes*
>>>
>>>> HTH,
>>>> Damian
>>>>
>>>> On Tue, 6 Jun 2017 at 18:11 Steven Schlansker <ss...@opentable.com>
>>>> wrote:
>>>>
>>>>>> On Jun 6, 2017, at 6:16 AM, Eno Thereska <en...@gmail.com> wrote:
>>>>>>
>>>>>> Hi Steven,
>>>>>>
>>>>>> Do you know beforehand if a key exists? If you know that and are getting
>>>>> null() the code will have to retry by refreshing the metadata and going to
>>>>> the new instance. If you don’t know beforehand if a key exists or not you
>>>>> might have to check all instances of a store to make sure.
>>>>> No, I am not presupposing that the key can exist -- this is a user visible
>>>>> API and will
>>>>> be prone to "accidents" :)
>>>>>
>>>>> Thanks for the insight.  I worry that even checking all stores is not
>>>>> truly sufficient,
>>>>> as querying different all workers at different times in the presence of
>>>>> migrating data
>>>>> can still in theory miss it given pessimal execution.
>>>>>
>>>>> I'm sure I've long wandered off into the hypothetical, but I dream of some
>>>>> day being
>>>>> cool like Jepsen :)
>>>>>
>>>>>> Eno
>>>>>>
>>>>>>
>>>>>>> On Jun 5, 2017, at 10:12 PM, Steven Schlansker <
>>>>> sschlansker@opentable.com> wrote:
>>>>>>> Hi everyone, me again :)
>>>>>>>
>>>>>>> I'm still trying to implement my "remoting" layer that allows
>>>>>>> my clients to see the partitioned Kafka Streams state
>>>>>>> regardless of which instance they hit.  Roughly, my lookup is:
>>>>>>>
>>>>>>> Message get(Key key) {
>>>>>>>   RemoteInstance instance = selectPartition(key);
>>>>>>>   return instance.get(key); // http remoting
>>>>>>> }
>>>>>>>
>>>>>>> RemoteInstance.get(Key key) { // http endpoint
>>>>>>>   return readOnlyKeyValueStore.get(key);
>>>>>>> }
>>>>>>>
>>>>>>> However, the mapping of partitions to instances may change.
>>>>>>> If you call KeyValueStore.get(K) where K is on a partition you
>>>>>>> don't own, it returns null.  This is indistinguishable from a
>>>>>>> successful get on a key that doesn't exist.
>>>>>>>
>>>>>>> If one instance selects a sibling instance right as the partition is
>>>>> failing
>>>>>>> off of that instance, it may get routed there and by the time it gets
>>>>>>> the request no longer "owns" the partition -- returns a false 'null'.
>>>>>>>
>>>>>>> You can try re-checking after you get a null value, but that's
>>>>> susceptible
>>>>>>> to the same race -- it's unlikely but possible that the data migrates
>>>>> *back*
>>>>>>> before you do this re-check.
>>>>>>>
>>>>>>> Is there any way to correctly implement this without races?  I'd imagine
>>>>>>> you need a new primitive like KeyValueStore#get that atomically finds
>>>>>>> the key or throws an exception if it is not in an owned partition
>>>>>>> at the time of lookup so you know to recheck the partition and retry.
>>>>>>>
>>>>>>> Thoughts?
>>>>>>>
>>>>>>> Thanks again,
>>>>>>> Steven
>>>>>>>


Re: Reliably implementing global KeyValueStore#get

Posted by Steven Schlansker <ss...@opentable.com>.
I was actually considering writing my own KeyValueStore backed
by e.g. a Postgres or the like.

Is there some feature Connect gains me that would make it better
than such an approach?

thanks

> On Jun 7, 2017, at 2:20 PM, Jan Filipiak <Ja...@trivago.com> wrote:
> 
> Hi,
> 
> have you thought about using connect to put data into a store that is more reasonable for your kind of query requirements?
> 
> Best Jan
> 
> On 07.06.2017 00:29, Steven Schlansker wrote:
>>> On Jun 6, 2017, at 2:52 PM, Damian Guy <da...@gmail.com> wrote:
>>> 
>>> Steven,
>>> 
>>> In practice, data shouldn't be migrating that often. If it is then you
>>> probably have bigger problems.
>> Understood and agreed, but when designing distributed systems, it usually
>> helps to model for the worst case rather than the "well that should never
>> happen" case, lest you find yourself fixing those bugs at 3am instead :)
>> 
>> I'd like to be able to induce extreme pain at the Kafka layer (change leader
>> every 3 seconds and migrate all partitions around randomly) and still have
>> my app behave correctly.
>> 
>>> You should be able to use the metadata api
>>> to find the instance the key should be on and then when you check that node
>>> you can also check with the metadata api that the key should still be on
>>> this host. If streams is rebalancing while you query an exception will be
>>> raised and you'll need to retry the request once the rebalance has
>>> completed.
>> Agreed here as well.  But let's assume I have a very fast replication
>> setup (assume it takes zero time, for the sake of argument) -- I'm fairly
>> sure there's still a race here as this exception only fires *during a migration*
>> not *after a migration that may have invalidated your metadata lookup completes*
>> 
>>> HTH,
>>> Damian
>>> 
>>> On Tue, 6 Jun 2017 at 18:11 Steven Schlansker <ss...@opentable.com>
>>> wrote:
>>> 
>>>>> On Jun 6, 2017, at 6:16 AM, Eno Thereska <en...@gmail.com> wrote:
>>>>> 
>>>>> Hi Steven,
>>>>> 
>>>>> Do you know beforehand if a key exists? If you know that and are getting
>>>> null() the code will have to retry by refreshing the metadata and going to
>>>> the new instance. If you don’t know beforehand if a key exists or not you
>>>> might have to check all instances of a store to make sure.
>>>> No, I am not presupposing that the key can exist -- this is a user visible
>>>> API and will
>>>> be prone to "accidents" :)
>>>> 
>>>> Thanks for the insight.  I worry that even checking all stores is not
>>>> truly sufficient,
>>>> as querying different all workers at different times in the presence of
>>>> migrating data
>>>> can still in theory miss it given pessimal execution.
>>>> 
>>>> I'm sure I've long wandered off into the hypothetical, but I dream of some
>>>> day being
>>>> cool like Jepsen :)
>>>> 
>>>>> Eno
>>>>> 
>>>>> 
>>>>>> On Jun 5, 2017, at 10:12 PM, Steven Schlansker <
>>>> sschlansker@opentable.com> wrote:
>>>>>> Hi everyone, me again :)
>>>>>> 
>>>>>> I'm still trying to implement my "remoting" layer that allows
>>>>>> my clients to see the partitioned Kafka Streams state
>>>>>> regardless of which instance they hit.  Roughly, my lookup is:
>>>>>> 
>>>>>> Message get(Key key) {
>>>>>>  RemoteInstance instance = selectPartition(key);
>>>>>>  return instance.get(key); // http remoting
>>>>>> }
>>>>>> 
>>>>>> RemoteInstance.get(Key key) { // http endpoint
>>>>>>  return readOnlyKeyValueStore.get(key);
>>>>>> }
>>>>>> 
>>>>>> However, the mapping of partitions to instances may change.
>>>>>> If you call KeyValueStore.get(K) where K is on a partition you
>>>>>> don't own, it returns null.  This is indistinguishable from a
>>>>>> successful get on a key that doesn't exist.
>>>>>> 
>>>>>> If one instance selects a sibling instance right as the partition is
>>>> failing
>>>>>> off of that instance, it may get routed there and by the time it gets
>>>>>> the request no longer "owns" the partition -- returns a false 'null'.
>>>>>> 
>>>>>> You can try re-checking after you get a null value, but that's
>>>> susceptible
>>>>>> to the same race -- it's unlikely but possible that the data migrates
>>>> *back*
>>>>>> before you do this re-check.
>>>>>> 
>>>>>> Is there any way to correctly implement this without races?  I'd imagine
>>>>>> you need a new primitive like KeyValueStore#get that atomically finds
>>>>>> the key or throws an exception if it is not in an owned partition
>>>>>> at the time of lookup so you know to recheck the partition and retry.
>>>>>> 
>>>>>> Thoughts?
>>>>>> 
>>>>>> Thanks again,
>>>>>> Steven
>>>>>> 
>>>> 
> 


Re: Reliably implementing global KeyValueStore#get

Posted by Jan Filipiak <Ja...@trivago.com>.
Hi,

have you thought about using connect to put data into a store that is 
more reasonable for your kind of query requirements?

Best Jan

On 07.06.2017 00:29, Steven Schlansker wrote:
>> On Jun 6, 2017, at 2:52 PM, Damian Guy <da...@gmail.com> wrote:
>>
>> Steven,
>>
>> In practice, data shouldn't be migrating that often. If it is then you
>> probably have bigger problems.
> Understood and agreed, but when designing distributed systems, it usually
> helps to model for the worst case rather than the "well that should never
> happen" case, lest you find yourself fixing those bugs at 3am instead :)
>
> I'd like to be able to induce extreme pain at the Kafka layer (change leader
> every 3 seconds and migrate all partitions around randomly) and still have
> my app behave correctly.
>
>> You should be able to use the metadata api
>> to find the instance the key should be on and then when you check that node
>> you can also check with the metadata api that the key should still be on
>> this host. If streams is rebalancing while you query an exception will be
>> raised and you'll need to retry the request once the rebalance has
>> completed.
> Agreed here as well.  But let's assume I have a very fast replication
> setup (assume it takes zero time, for the sake of argument) -- I'm fairly
> sure there's still a race here as this exception only fires *during a migration*
> not *after a migration that may have invalidated your metadata lookup completes*
>
>> HTH,
>> Damian
>>
>> On Tue, 6 Jun 2017 at 18:11 Steven Schlansker <ss...@opentable.com>
>> wrote:
>>
>>>> On Jun 6, 2017, at 6:16 AM, Eno Thereska <en...@gmail.com> wrote:
>>>>
>>>> Hi Steven,
>>>>
>>>> Do you know beforehand if a key exists? If you know that and are getting
>>> null() the code will have to retry by refreshing the metadata and going to
>>> the new instance. If you don’t know beforehand if a key exists or not you
>>> might have to check all instances of a store to make sure.
>>> No, I am not presupposing that the key can exist -- this is a user visible
>>> API and will
>>> be prone to "accidents" :)
>>>
>>> Thanks for the insight.  I worry that even checking all stores is not
>>> truly sufficient,
>>> as querying different all workers at different times in the presence of
>>> migrating data
>>> can still in theory miss it given pessimal execution.
>>>
>>> I'm sure I've long wandered off into the hypothetical, but I dream of some
>>> day being
>>> cool like Jepsen :)
>>>
>>>> Eno
>>>>
>>>>
>>>>> On Jun 5, 2017, at 10:12 PM, Steven Schlansker <
>>> sschlansker@opentable.com> wrote:
>>>>> Hi everyone, me again :)
>>>>>
>>>>> I'm still trying to implement my "remoting" layer that allows
>>>>> my clients to see the partitioned Kafka Streams state
>>>>> regardless of which instance they hit.  Roughly, my lookup is:
>>>>>
>>>>> Message get(Key key) {
>>>>>   RemoteInstance instance = selectPartition(key);
>>>>>   return instance.get(key); // http remoting
>>>>> }
>>>>>
>>>>> RemoteInstance.get(Key key) { // http endpoint
>>>>>   return readOnlyKeyValueStore.get(key);
>>>>> }
>>>>>
>>>>> However, the mapping of partitions to instances may change.
>>>>> If you call KeyValueStore.get(K) where K is on a partition you
>>>>> don't own, it returns null.  This is indistinguishable from a
>>>>> successful get on a key that doesn't exist.
>>>>>
>>>>> If one instance selects a sibling instance right as the partition is
>>> failing
>>>>> off of that instance, it may get routed there and by the time it gets
>>>>> the request no longer "owns" the partition -- returns a false 'null'.
>>>>>
>>>>> You can try re-checking after you get a null value, but that's
>>> susceptible
>>>>> to the same race -- it's unlikely but possible that the data migrates
>>> *back*
>>>>> before you do this re-check.
>>>>>
>>>>> Is there any way to correctly implement this without races?  I'd imagine
>>>>> you need a new primitive like KeyValueStore#get that atomically finds
>>>>> the key or throws an exception if it is not in an owned partition
>>>>> at the time of lookup so you know to recheck the partition and retry.
>>>>>
>>>>> Thoughts?
>>>>>
>>>>> Thanks again,
>>>>> Steven
>>>>>
>>>


Re: Reliably implementing global KeyValueStore#get

Posted by Eno Thereska <en...@gmail.com>.
Hi Steven,

You are right in principle. The thing is that what we shipped in Kafka is just the low level bare bones that in a sense belong to Kafka. A middle layer that keeps track of the data is absolutely needed, and it should hopefully hide the distributed system challenges from the end user. Now the question is how should such layer look like. I think in all systems there are some basic assumptions made about frequency of failures and rebalances just to keep the number of retries sane. I agree with you that in principle a rebalance could be always happening though.

Eno

> On 6 Jun 2017, at 23:29, Steven Schlansker <ss...@opentable.com> wrote:
> 
> 
>> On Jun 6, 2017, at 2:52 PM, Damian Guy <da...@gmail.com> wrote:
>> 
>> Steven,
>> 
>> In practice, data shouldn't be migrating that often. If it is then you
>> probably have bigger problems.
> 
> Understood and agreed, but when designing distributed systems, it usually
> helps to model for the worst case rather than the "well that should never
> happen" case, lest you find yourself fixing those bugs at 3am instead :)
> 
> I'd like to be able to induce extreme pain at the Kafka layer (change leader
> every 3 seconds and migrate all partitions around randomly) and still have
> my app behave correctly.
> 
>> You should be able to use the metadata api
>> to find the instance the key should be on and then when you check that node
>> you can also check with the metadata api that the key should still be on
>> this host. If streams is rebalancing while you query an exception will be
>> raised and you'll need to retry the request once the rebalance has
>> completed.
> 
> Agreed here as well.  But let's assume I have a very fast replication
> setup (assume it takes zero time, for the sake of argument) -- I'm fairly
> sure there's still a race here as this exception only fires *during a migration*
> not *after a migration that may have invalidated your metadata lookup completes*
> 
>> 
>> HTH,
>> Damian
>> 
>> On Tue, 6 Jun 2017 at 18:11 Steven Schlansker <ss...@opentable.com>
>> wrote:
>> 
>>> 
>>>> On Jun 6, 2017, at 6:16 AM, Eno Thereska <en...@gmail.com> wrote:
>>>> 
>>>> Hi Steven,
>>>> 
>>>> Do you know beforehand if a key exists? If you know that and are getting
>>> null() the code will have to retry by refreshing the metadata and going to
>>> the new instance. If you don’t know beforehand if a key exists or not you
>>> might have to check all instances of a store to make sure.
>>>> 
>>> 
>>> No, I am not presupposing that the key can exist -- this is a user visible
>>> API and will
>>> be prone to "accidents" :)
>>> 
>>> Thanks for the insight.  I worry that even checking all stores is not
>>> truly sufficient,
>>> as querying different all workers at different times in the presence of
>>> migrating data
>>> can still in theory miss it given pessimal execution.
>>> 
>>> I'm sure I've long wandered off into the hypothetical, but I dream of some
>>> day being
>>> cool like Jepsen :)
>>> 
>>>> Eno
>>>> 
>>>> 
>>>>> On Jun 5, 2017, at 10:12 PM, Steven Schlansker <
>>> sschlansker@opentable.com> wrote:
>>>>> 
>>>>> Hi everyone, me again :)
>>>>> 
>>>>> I'm still trying to implement my "remoting" layer that allows
>>>>> my clients to see the partitioned Kafka Streams state
>>>>> regardless of which instance they hit.  Roughly, my lookup is:
>>>>> 
>>>>> Message get(Key key) {
>>>>> RemoteInstance instance = selectPartition(key);
>>>>> return instance.get(key); // http remoting
>>>>> }
>>>>> 
>>>>> RemoteInstance.get(Key key) { // http endpoint
>>>>> return readOnlyKeyValueStore.get(key);
>>>>> }
>>>>> 
>>>>> However, the mapping of partitions to instances may change.
>>>>> If you call KeyValueStore.get(K) where K is on a partition you
>>>>> don't own, it returns null.  This is indistinguishable from a
>>>>> successful get on a key that doesn't exist.
>>>>> 
>>>>> If one instance selects a sibling instance right as the partition is
>>> failing
>>>>> off of that instance, it may get routed there and by the time it gets
>>>>> the request no longer "owns" the partition -- returns a false 'null'.
>>>>> 
>>>>> You can try re-checking after you get a null value, but that's
>>> susceptible
>>>>> to the same race -- it's unlikely but possible that the data migrates
>>> *back*
>>>>> before you do this re-check.
>>>>> 
>>>>> Is there any way to correctly implement this without races?  I'd imagine
>>>>> you need a new primitive like KeyValueStore#get that atomically finds
>>>>> the key or throws an exception if it is not in an owned partition
>>>>> at the time of lookup so you know to recheck the partition and retry.
>>>>> 
>>>>> Thoughts?
>>>>> 
>>>>> Thanks again,
>>>>> Steven
>>>>> 
>>>> 
>>> 
>>> 
> 


Re: Reliably implementing global KeyValueStore#get

Posted by Steven Schlansker <ss...@opentable.com>.
> On Jun 6, 2017, at 2:52 PM, Damian Guy <da...@gmail.com> wrote:
> 
> Steven,
> 
> In practice, data shouldn't be migrating that often. If it is then you
> probably have bigger problems.

Understood and agreed, but when designing distributed systems, it usually
helps to model for the worst case rather than the "well that should never
happen" case, lest you find yourself fixing those bugs at 3am instead :)

I'd like to be able to induce extreme pain at the Kafka layer (change leader
every 3 seconds and migrate all partitions around randomly) and still have
my app behave correctly.

> You should be able to use the metadata api
> to find the instance the key should be on and then when you check that node
> you can also check with the metadata api that the key should still be on
> this host. If streams is rebalancing while you query an exception will be
> raised and you'll need to retry the request once the rebalance has
> completed.

Agreed here as well.  But let's assume I have a very fast replication
setup (assume it takes zero time, for the sake of argument) -- I'm fairly
sure there's still a race here as this exception only fires *during a migration*
not *after a migration that may have invalidated your metadata lookup completes*

> 
> HTH,
> Damian
> 
> On Tue, 6 Jun 2017 at 18:11 Steven Schlansker <ss...@opentable.com>
> wrote:
> 
>> 
>>> On Jun 6, 2017, at 6:16 AM, Eno Thereska <en...@gmail.com> wrote:
>>> 
>>> Hi Steven,
>>> 
>>> Do you know beforehand if a key exists? If you know that and are getting
>> null() the code will have to retry by refreshing the metadata and going to
>> the new instance. If you don’t know beforehand if a key exists or not you
>> might have to check all instances of a store to make sure.
>>> 
>> 
>> No, I am not presupposing that the key can exist -- this is a user visible
>> API and will
>> be prone to "accidents" :)
>> 
>> Thanks for the insight.  I worry that even checking all stores is not
>> truly sufficient,
>> as querying different all workers at different times in the presence of
>> migrating data
>> can still in theory miss it given pessimal execution.
>> 
>> I'm sure I've long wandered off into the hypothetical, but I dream of some
>> day being
>> cool like Jepsen :)
>> 
>>> Eno
>>> 
>>> 
>>>> On Jun 5, 2017, at 10:12 PM, Steven Schlansker <
>> sschlansker@opentable.com> wrote:
>>>> 
>>>> Hi everyone, me again :)
>>>> 
>>>> I'm still trying to implement my "remoting" layer that allows
>>>> my clients to see the partitioned Kafka Streams state
>>>> regardless of which instance they hit.  Roughly, my lookup is:
>>>> 
>>>> Message get(Key key) {
>>>>  RemoteInstance instance = selectPartition(key);
>>>>  return instance.get(key); // http remoting
>>>> }
>>>> 
>>>> RemoteInstance.get(Key key) { // http endpoint
>>>>  return readOnlyKeyValueStore.get(key);
>>>> }
>>>> 
>>>> However, the mapping of partitions to instances may change.
>>>> If you call KeyValueStore.get(K) where K is on a partition you
>>>> don't own, it returns null.  This is indistinguishable from a
>>>> successful get on a key that doesn't exist.
>>>> 
>>>> If one instance selects a sibling instance right as the partition is
>> failing
>>>> off of that instance, it may get routed there and by the time it gets
>>>> the request no longer "owns" the partition -- returns a false 'null'.
>>>> 
>>>> You can try re-checking after you get a null value, but that's
>> susceptible
>>>> to the same race -- it's unlikely but possible that the data migrates
>> *back*
>>>> before you do this re-check.
>>>> 
>>>> Is there any way to correctly implement this without races?  I'd imagine
>>>> you need a new primitive like KeyValueStore#get that atomically finds
>>>> the key or throws an exception if it is not in an owned partition
>>>> at the time of lookup so you know to recheck the partition and retry.
>>>> 
>>>> Thoughts?
>>>> 
>>>> Thanks again,
>>>> Steven
>>>> 
>>> 
>> 
>> 


Re: Reliably implementing global KeyValueStore#get

Posted by Damian Guy <da...@gmail.com>.
Steven,

In practice, data shouldn't be migrating that often. If it is then you
probably have bigger problems. You should be able to use the metadata api
to find the instance the key should be on and then when you check that node
you can also check with the metadata api that the key should still be on
this host. If streams is rebalancing while you query an exception will be
raised and you'll need to retry the request once the rebalance has
completed.

HTH,
Damian

On Tue, 6 Jun 2017 at 18:11 Steven Schlansker <ss...@opentable.com>
wrote:

>
> > On Jun 6, 2017, at 6:16 AM, Eno Thereska <en...@gmail.com> wrote:
> >
> > Hi Steven,
> >
> > Do you know beforehand if a key exists? If you know that and are getting
> null() the code will have to retry by refreshing the metadata and going to
> the new instance. If you don’t know beforehand if a key exists or not you
> might have to check all instances of a store to make sure.
> >
>
> No, I am not presupposing that the key can exist -- this is a user visible
> API and will
> be prone to "accidents" :)
>
> Thanks for the insight.  I worry that even checking all stores is not
> truly sufficient,
> as querying different all workers at different times in the presence of
> migrating data
> can still in theory miss it given pessimal execution.
>
> I'm sure I've long wandered off into the hypothetical, but I dream of some
> day being
> cool like Jepsen :)
>
> > Eno
> >
> >
> >> On Jun 5, 2017, at 10:12 PM, Steven Schlansker <
> sschlansker@opentable.com> wrote:
> >>
> >> Hi everyone, me again :)
> >>
> >> I'm still trying to implement my "remoting" layer that allows
> >> my clients to see the partitioned Kafka Streams state
> >> regardless of which instance they hit.  Roughly, my lookup is:
> >>
> >> Message get(Key key) {
> >>   RemoteInstance instance = selectPartition(key);
> >>   return instance.get(key); // http remoting
> >> }
> >>
> >> RemoteInstance.get(Key key) { // http endpoint
> >>   return readOnlyKeyValueStore.get(key);
> >> }
> >>
> >> However, the mapping of partitions to instances may change.
> >> If you call KeyValueStore.get(K) where K is on a partition you
> >> don't own, it returns null.  This is indistinguishable from a
> >> successful get on a key that doesn't exist.
> >>
> >> If one instance selects a sibling instance right as the partition is
> failing
> >> off of that instance, it may get routed there and by the time it gets
> >> the request no longer "owns" the partition -- returns a false 'null'.
> >>
> >> You can try re-checking after you get a null value, but that's
> susceptible
> >> to the same race -- it's unlikely but possible that the data migrates
> *back*
> >> before you do this re-check.
> >>
> >> Is there any way to correctly implement this without races?  I'd imagine
> >> you need a new primitive like KeyValueStore#get that atomically finds
> >> the key or throws an exception if it is not in an owned partition
> >> at the time of lookup so you know to recheck the partition and retry.
> >>
> >> Thoughts?
> >>
> >> Thanks again,
> >> Steven
> >>
> >
>
>

Re: Reliably implementing global KeyValueStore#get

Posted by Steven Schlansker <ss...@opentable.com>.
> On Jun 6, 2017, at 6:16 AM, Eno Thereska <en...@gmail.com> wrote:
> 
> Hi Steven,
> 
> Do you know beforehand if a key exists? If you know that and are getting null() the code will have to retry by refreshing the metadata and going to the new instance. If you don’t know beforehand if a key exists or not you might have to check all instances of a store to make sure.
> 

No, I am not presupposing that the key can exist -- this is a user visible API and will
be prone to "accidents" :)

Thanks for the insight.  I worry that even checking all stores is not truly sufficient,
as querying different all workers at different times in the presence of migrating data
can still in theory miss it given pessimal execution.

I'm sure I've long wandered off into the hypothetical, but I dream of some day being
cool like Jepsen :)

> Eno
> 
> 
>> On Jun 5, 2017, at 10:12 PM, Steven Schlansker <ss...@opentable.com> wrote:
>> 
>> Hi everyone, me again :)
>> 
>> I'm still trying to implement my "remoting" layer that allows
>> my clients to see the partitioned Kafka Streams state
>> regardless of which instance they hit.  Roughly, my lookup is:
>> 
>> Message get(Key key) {
>>   RemoteInstance instance = selectPartition(key);
>>   return instance.get(key); // http remoting
>> }
>> 
>> RemoteInstance.get(Key key) { // http endpoint
>>   return readOnlyKeyValueStore.get(key);
>> }
>> 
>> However, the mapping of partitions to instances may change.
>> If you call KeyValueStore.get(K) where K is on a partition you
>> don't own, it returns null.  This is indistinguishable from a
>> successful get on a key that doesn't exist.
>> 
>> If one instance selects a sibling instance right as the partition is failing
>> off of that instance, it may get routed there and by the time it gets
>> the request no longer "owns" the partition -- returns a false 'null'.
>> 
>> You can try re-checking after you get a null value, but that's susceptible
>> to the same race -- it's unlikely but possible that the data migrates *back*
>> before you do this re-check.
>> 
>> Is there any way to correctly implement this without races?  I'd imagine
>> you need a new primitive like KeyValueStore#get that atomically finds
>> the key or throws an exception if it is not in an owned partition
>> at the time of lookup so you know to recheck the partition and retry.
>> 
>> Thoughts?
>> 
>> Thanks again,
>> Steven
>> 
> 


Re: Reliably implementing global KeyValueStore#get

Posted by Eno Thereska <en...@gmail.com>.
Hi Steven,

Do you know beforehand if a key exists? If you know that and are getting null() the code will have to retry by refreshing the metadata and going to the new instance. If you don’t know beforehand if a key exists or not you might have to check all instances of a store to make sure. 

Eno


> On Jun 5, 2017, at 10:12 PM, Steven Schlansker <ss...@opentable.com> wrote:
> 
> Hi everyone, me again :)
> 
> I'm still trying to implement my "remoting" layer that allows
> my clients to see the partitioned Kafka Streams state
> regardless of which instance they hit.  Roughly, my lookup is:
> 
> Message get(Key key) {
>    RemoteInstance instance = selectPartition(key);
>    return instance.get(key); // http remoting
> }
> 
> RemoteInstance.get(Key key) { // http endpoint
>    return readOnlyKeyValueStore.get(key);
> }
> 
> However, the mapping of partitions to instances may change.
> If you call KeyValueStore.get(K) where K is on a partition you
> don't own, it returns null.  This is indistinguishable from a
> successful get on a key that doesn't exist.
> 
> If one instance selects a sibling instance right as the partition is failing
> off of that instance, it may get routed there and by the time it gets
> the request no longer "owns" the partition -- returns a false 'null'.
> 
> You can try re-checking after you get a null value, but that's susceptible
> to the same race -- it's unlikely but possible that the data migrates *back*
> before you do this re-check.
> 
> Is there any way to correctly implement this without races?  I'd imagine
> you need a new primitive like KeyValueStore#get that atomically finds
> the key or throws an exception if it is not in an owned partition
> at the time of lookup so you know to recheck the partition and retry.
> 
> Thoughts?
> 
> Thanks again,
> Steven
>