You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Boris Lublinsky <bo...@lightbend.com> on 2017/11/13 17:43:41 UTC

Queryable state

I have updated my queryable state example, based on https://github.com/confluentinc/examples/tree/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries <https://github.com/confluentinc/examples/tree/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries>

To 1.0.0
And now when I am trying trying to get instances for store when running on a local machine, I am getting an empty array, from

public List<HostStoreInfo> streamsMetadataForStore(final  String store) {
    // Get metadata for all of the instances of this Kafka Streams application hosting the store
    final Collection<StreamsMetadata> metadata = streams.allMetadataForStore(store);
    return mapInstancesToHostStoreInfo(metadata);
}
This is happening because in StreamsMetadata state,

public synchronized Collection<StreamsMetadata> getAllMetadataForStore(final String storeName) {
    Objects.requireNonNull(storeName, "storeName cannot be null");

    if (!isInitialized()) {
        return Collections.emptyList();
    }
IsInitialized method

private boolean isInitialized() {
    return clusterMetadata != null && !clusterMetadata.topics().isEmpty();
}
Check for the cluster, which is null

Boris Lublinsky
FDP Architect
boris.lublinsky@lightbend.com <ma...@lightbend.com>
https://www.lightbend.com/


Re: Queryable state

Posted by alexey yakubovich <al...@yahoo.com.INVALID>.
 O, Kafka. Franz? How are you? Here and there? What is lightbend? This one: https://www.lightbend.com/? 
I had/have cancer. is seems stable now. So for fun I am looking for a job, elusively remotely one.If you know or happen to heard about one, please send it to me.  I feel mostly pretty well, Good enough to play with development remotely, not good enough to work in the office.
Anyway, have more light and don't  bend too low. 
Alexey    On Thursday, November 16, 2017, 10:43:10 PM CST, Boris Lublinsky <bo...@lightbend.com> wrote:  
 
 Thanks Guozhang

I do not think its ever updated.
I waited for a while.
I did implement workaround.
Can you also look at my embedded Kafka question

Boris Lublinsky
FDP Architect
boris.lublinsky@lightbend.com
https://www.lightbend.com/

> On Nov 16, 2017, at 10:29 PM, Guozhang Wang <wa...@gmail.com> wrote:
> 
> Hello Boris,
> 
> The reason of this check is to make sure that the cluster metadata has been
> updated at least once, meaning that the instance has gone through the
> initialization phase of the rebalance and have received the assignment
> information already. Before this phase, any metadata returned may be
> incorrect.
> 
> To get around this scenario, you may need to wait on the state of the
> running instance to go from Rebalancing to Running, which means it has been
> stabilized with the global metadata. This can be done by passing in your
> customized state change listener:
> 
> public void setStateListener(final KafkaStreams.StateListener listener)
> 
> 
> Guozhang
> 
> 
> On Mon, Nov 13, 2017 at 9:43 AM, Boris Lublinsky <
> boris.lublinsky@lightbend.com <ma...@lightbend.com>> wrote:
> 
>> I have updated my queryable state example, based on
>> https://github.com/confluentinc/examples/tree/3.2.x/kafka- <https://github.com/confluentinc/examples/tree/3.2.x/kafka->
>> streams/src/main/java/io/confluent/examples/streams/interactivequeries <
>> https://github.com/confluentinc/examples/tree/3.2.x/kafka- <https://github.com/confluentinc/examples/tree/3.2.x/kafka->
>> streams/src/main/java/io/confluent/examples/streams/interactivequeries>
>> 
>> To 1.0.0
>> And now when I am trying trying to get instances for store when running on
>> a local machine, I am getting an empty array, from
>> 
>> public List<HostStoreInfo> streamsMetadataForStore(final  String store) {
>>    // Get metadata for all of the instances of this Kafka Streams
>> application hosting the store
>>    final Collection<StreamsMetadata> metadata =
>> streams.allMetadataForStore(store);
>>    return mapInstancesToHostStoreInfo(metadata);
>> }
>> This is happening because in StreamsMetadata state,
>> 
>> public synchronized Collection<StreamsMetadata>
>> getAllMetadataForStore(final String storeName) {
>>    Objects.requireNonNull(storeName, "storeName cannot be null");
>> 
>>    if (!isInitialized()) {
>>        return Collections.emptyList();
>>    }
>> IsInitialized method
>> 
>> private boolean isInitialized() {
>>    return clusterMetadata != null && !clusterMetadata.topics().isEmpty();
>> }
>> Check for the cluster, which is null
>> 
>> Boris Lublinsky
>> FDP Architect
>> boris.lublinsky@lightbend.com <ma...@lightbend.com> <mailto:boris.lublinsky@lightbend.com <ma...@lightbend.com>>
>> https://www.lightbend.com/ <https://www.lightbend.com/>
>> 
>> 
> 
> 
> -- 
> -- Guozhang
  

Re: Queryable state

Posted by Boris Lublinsky <bo...@lightbend.com>.
Thanks Guozhang

I do not think its ever updated.
I waited for a while.
I did implement workaround.
Can you also look at my embedded Kafka question

Boris Lublinsky
FDP Architect
boris.lublinsky@lightbend.com
https://www.lightbend.com/

> On Nov 16, 2017, at 10:29 PM, Guozhang Wang <wa...@gmail.com> wrote:
> 
> Hello Boris,
> 
> The reason of this check is to make sure that the cluster metadata has been
> updated at least once, meaning that the instance has gone through the
> initialization phase of the rebalance and have received the assignment
> information already. Before this phase, any metadata returned may be
> incorrect.
> 
> To get around this scenario, you may need to wait on the state of the
> running instance to go from Rebalancing to Running, which means it has been
> stabilized with the global metadata. This can be done by passing in your
> customized state change listener:
> 
> public void setStateListener(final KafkaStreams.StateListener listener)
> 
> 
> Guozhang
> 
> 
> On Mon, Nov 13, 2017 at 9:43 AM, Boris Lublinsky <
> boris.lublinsky@lightbend.com <ma...@lightbend.com>> wrote:
> 
>> I have updated my queryable state example, based on
>> https://github.com/confluentinc/examples/tree/3.2.x/kafka- <https://github.com/confluentinc/examples/tree/3.2.x/kafka->
>> streams/src/main/java/io/confluent/examples/streams/interactivequeries <
>> https://github.com/confluentinc/examples/tree/3.2.x/kafka- <https://github.com/confluentinc/examples/tree/3.2.x/kafka->
>> streams/src/main/java/io/confluent/examples/streams/interactivequeries>
>> 
>> To 1.0.0
>> And now when I am trying trying to get instances for store when running on
>> a local machine, I am getting an empty array, from
>> 
>> public List<HostStoreInfo> streamsMetadataForStore(final  String store) {
>>    // Get metadata for all of the instances of this Kafka Streams
>> application hosting the store
>>    final Collection<StreamsMetadata> metadata =
>> streams.allMetadataForStore(store);
>>    return mapInstancesToHostStoreInfo(metadata);
>> }
>> This is happening because in StreamsMetadata state,
>> 
>> public synchronized Collection<StreamsMetadata>
>> getAllMetadataForStore(final String storeName) {
>>    Objects.requireNonNull(storeName, "storeName cannot be null");
>> 
>>    if (!isInitialized()) {
>>        return Collections.emptyList();
>>    }
>> IsInitialized method
>> 
>> private boolean isInitialized() {
>>    return clusterMetadata != null && !clusterMetadata.topics().isEmpty();
>> }
>> Check for the cluster, which is null
>> 
>> Boris Lublinsky
>> FDP Architect
>> boris.lublinsky@lightbend.com <ma...@lightbend.com> <mailto:boris.lublinsky@lightbend.com <ma...@lightbend.com>>
>> https://www.lightbend.com/ <https://www.lightbend.com/>
>> 
>> 
> 
> 
> -- 
> -- Guozhang


Re: Queryable state

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Boris,

The reason of this check is to make sure that the cluster metadata has been
updated at least once, meaning that the instance has gone through the
initialization phase of the rebalance and have received the assignment
information already. Before this phase, any metadata returned may be
incorrect.

To get around this scenario, you may need to wait on the state of the
running instance to go from Rebalancing to Running, which means it has been
stabilized with the global metadata. This can be done by passing in your
customized state change listener:

public void setStateListener(final KafkaStreams.StateListener listener)


Guozhang


On Mon, Nov 13, 2017 at 9:43 AM, Boris Lublinsky <
boris.lublinsky@lightbend.com> wrote:

> I have updated my queryable state example, based on
> https://github.com/confluentinc/examples/tree/3.2.x/kafka-
> streams/src/main/java/io/confluent/examples/streams/interactivequeries <
> https://github.com/confluentinc/examples/tree/3.2.x/kafka-
> streams/src/main/java/io/confluent/examples/streams/interactivequeries>
>
> To 1.0.0
> And now when I am trying trying to get instances for store when running on
> a local machine, I am getting an empty array, from
>
> public List<HostStoreInfo> streamsMetadataForStore(final  String store) {
>     // Get metadata for all of the instances of this Kafka Streams
> application hosting the store
>     final Collection<StreamsMetadata> metadata =
> streams.allMetadataForStore(store);
>     return mapInstancesToHostStoreInfo(metadata);
> }
> This is happening because in StreamsMetadata state,
>
> public synchronized Collection<StreamsMetadata>
> getAllMetadataForStore(final String storeName) {
>     Objects.requireNonNull(storeName, "storeName cannot be null");
>
>     if (!isInitialized()) {
>         return Collections.emptyList();
>     }
> IsInitialized method
>
> private boolean isInitialized() {
>     return clusterMetadata != null && !clusterMetadata.topics().isEmpty();
> }
> Check for the cluster, which is null
>
> Boris Lublinsky
> FDP Architect
> boris.lublinsky@lightbend.com <ma...@lightbend.com>
> https://www.lightbend.com/
>
>


-- 
-- Guozhang