You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Shekar Tippur <ct...@gmail.com> on 2017/08/04 23:16:12 UTC

Re: Kafka streams regex match

Damian,

I am getting a syntax error. I have responded on gist.
Appreciate any inputs.

- Shekar

On Sat, Jul 29, 2017 at 1:57 AM, Damian Guy <da...@gmail.com> wrote:

> Hi,
>
> I left a comment on your gist.
>
> Thanks,
> Damian
>
> On Fri, 28 Jul 2017 at 21:50 Shekar Tippur <ct...@gmail.com> wrote:
>
> > Damien,
> >
> > Here is a public gist:
> > https://gist.github.com/ctippur/9f0900b1719793d0c67f5bb143d16ec8
> >
> > - Shekar
> >
> > On Fri, Jul 28, 2017 at 11:45 AM, Damian Guy <da...@gmail.com>
> wrote:
> >
> > > It might be easier if you make a github gist with your code. It is
> quite
> > > difficult to see what is happening in an email.
> > >
> > > Cheers,
> > > Damian
> > > On Fri, 28 Jul 2017 at 19:22, Shekar Tippur <ct...@gmail.com> wrote:
> > >
> > > > Thanks a lot Damien.
> > > > I am able to get to see if the join worked (using foreach). I tried
> to
> > > add
> > > > the logic to query the store after starting the streams:
> > > > Looks like the code is not getting there. Here is the modified code:
> > > >
> > > > KafkaStreams streams = new KafkaStreams(builder, props);
> > > >
> > > > streams.start();
> > > >
> > > >
> > > > parser.foreach(new ForeachAction<String, JsonNode>() {
> > > >     @Override
> > > >     public void apply(String key, JsonNode value) {
> > > >         System.out.println(key + ": " + value);
> > > >         if (value == null){
> > > >             System.out.println("null match");
> > > >             ReadOnlyKeyValueStore<String, Long> keyValueStore =
> > > >                     null;
> > > >             try {
> > > >                 keyValueStore =
> > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > > > QueryableStoreTypes.keyValueStore(), streams);
> > > >             } catch (InterruptedException e) {
> > > >                 e.printStackTrace();
> > > >             }
> > > >
> > > >             KeyValueIterator  kviterator =
> > > > keyValueStore.range("test_nod","test_node");
> > > >         }
> > > >     }
> > > > });
> > > >
> > > >
> > > > On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy <da...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi,
> > > > > The store won't be queryable until after you have called
> > > streams.start().
> > > > > No stores have been created until the application is up and running
> > and
> > > > > they are dependent on the underlying partitions.
> > > > >
> > > > > To check that a stateful operation has produced a result you would
> > > > normally
> > > > > add another operation after the join, i.e.,
> > > > > stream.join(other,...).foreach(..) or stream.join(other,...).to("
> > > topic")
> > > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > > > On Thu, 27 Jul 2017 at 22:52 Shekar Tippur <ct...@gmail.com>
> > wrote:
> > > > >
> > > > > > One more thing.. How do we check if the stateful join operation
> > > > resulted
> > > > > in
> > > > > > a kstream of some value in it (size of kstream)? How do we check
> > the
> > > > > > content of a kstream?
> > > > > >
> > > > > > - S
> > > > > >
> > > > > > On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur <
> ctippur@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Damien,
> > > > > > >
> > > > > > > Thanks a lot for pointing out.
> > > > > > >
> > > > > > > I got a little further. I am kind of stuck with the sequencing.
> > > > Couple
> > > > > of
> > > > > > > issues:
> > > > > > > 1. I cannot initialise KafkaStreams before the parser.to().
> > > > > > > 2. Do I need to create a new KafkaStreams object when I create
> a
> > > > > > > KeyValueStore?
> > > > > > > 3. How do I initialize KeyValueIterator with <String,
> JsonNode> I
> > > > seem
> > > > > to
> > > > > > > get a error when I try:
> > > > > > > *KeyValueIterator <String,JsonNode> kviterator
> > > > > > > = keyValueStore.range("test_nod","test_node");*
> > > > > > >
> > > > > > > /////// START CODE /////////
> > > > > > > //parser is a kstream as a result of join
> > > > > > > if (parser.toString().matches("null")){
> > > > > > >
> > > > > > >     ReadOnlyKeyValueStore<String, Long> keyValueStore =
> > > > > > >             null;
> > > > > > >     KafkaStreams newstreams = new KafkaStreams(builder, props);
> > > > > > >     try {
> > > > > > >         keyValueStore =
> > > > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > > > > > > QueryableStoreTypes.keyValueStore(), newstreams);
> > > > > > >     } catch (InterruptedException e) {
> > > > > > >         e.printStackTrace();
> > > > > > >     }
> > > > > > > *    KeyValueIterator kviterator
> > > > > > > = keyValueStore.range("test_nod","test_node");*
> > > > > > > }else {
> > > > > > >
> > > > > > > *    parser.to <http://parser.to>(stringSerde, jsonSerde,
> > > > "parser");*}
> > > > > > >
> > > > > > > *KafkaStreams streams = new KafkaStreams(builder, props);*
> > > > > > > streams.start();
> > > > > > >
> > > > > > > /////// END CODE /////////
> > > > > > >
> > > > > > > - S
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy <
> > damian.guy@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > It is part of the ReadOnlyKeyValueStore interface:
> > > > > > > >
> > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > > > > > > main/java/org/apache/kafka/streams/state/
> > > ReadOnlyKeyValueStore.java
> > > > > > > >
> > > > > > > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur <
> ctippur@gmail.com>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > That's cool. This feature is a part of rocksdb object and
> not
> > > > > ktable?
> > > > > > > > >
> > > > > > > > > Sent from my iPhone
> > > > > > > > >
> > > > > > > > > > On Jul 27, 2017, at 07:57, Damian Guy <
> > damian.guy@gmail.com>
> > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > Yes they can be strings,
> > > > > > > > > >
> > > > > > > > > > so you could do something like:
> > > > > > > > > > store.range("test_host", "test_hosu");
> > > > > > > > > >
> > > > > > > > > > This would return an iterator containing all of the
> values
> > > > > > > (inclusive)
> > > > > > > > > from
> > > > > > > > > > "test_host" -> "test_hosu".
> > > > > > > > > >
> > > > > > > > > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur <
> > > ctippur@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > > > >>
> > > > > > > > > >> Can you please point me to an example? Can from and to
> be
> > a
> > > > > > string?
> > > > > > > > > >>
> > > > > > > > > >> Sent from my iPhone
> > > > > > > > > >>
> > > > > > > > > >>> On Jul 27, 2017, at 04:04, Damian Guy <
> > > damian.guy@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >>>
> > > > > > > > > >>> Hi,
> > > > > > > > > >>>
> > > > > > > > > >>> You can't use a regex, but you could use a range query.
> > > > > > > > > >>> i.e, keyValueStore.range(from, to)
> > > > > > > > > >>>
> > > > > > > > > >>> Thanks,
> > > > > > > > > >>> Damian
> > > > > > > > > >>>
> > > > > > > > > >>>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur <
> > > > ctippur@gmail.com
> > > > > >
> > > > > > > wrote:
> > > > > > > > > >>>>
> > > > > > > > > >>>> Hello,
> > > > > > > > > >>>>
> > > > > > > > > >>>> I am able to get the kstream to ktable join work. I
> have
> > > > some
> > > > > > use
> > > > > > > > > cases
> > > > > > > > > >>>> where the key is not always a exact match.
> > > > > > > > > >>>> I was wondering if there is a way to lookup keys based
> > on
> > > > > regex.
> > > > > > > > > >>>>
> > > > > > > > > >>>> For example,
> > > > > > > > > >>>> I have these entries for a ktable:
> > > > > > > > > >>>> test_host1,{ "source": "test_host", "UL1": "test1_l1"
> }
> > > > > > > > > >>>>
> > > > > > > > > >>>> test_host2,{ "source": "test_host2", "UL1":
> "test2_l2" }
> > > > > > > > > >>>>
> > > > > > > > > >>>> test_host3,{ "source": "test_host3", "UL1":
> "test3_l3" }
> > > > > > > > > >>>>
> > > > > > > > > >>>> blah,{ "source": "blah_host", "UL1": "blah_l3" }
> > > > > > > > > >>>>
> > > > > > > > > >>>> and this for a kstream:
> > > > > > > > > >>>>
> > > > > > > > > >>>> test_host,{ "source": "test_host", "custom": { "test
> ":
> > {
> > > > > > > > > >> "creation_time ":
> > > > > > > > > >>>> "1234 " } } }
> > > > > > > > > >>>>
> > > > > > > > > >>>> In this case, if the exact match does not work, I
> would
> > > like
> > > > > to
> > > > > > > lookup
> > > > > > > > > >>>> ktable for all entries that contains "test_host*" in
> it
> > > and
> > > > > have
> > > > > > > > > >>>> application logic to determine what would be the best
> > fit.
> > > > > > > > > >>>>
> > > > > > > > > >>>> Appreciate input.
> > > > > > > > > >>>>
> > > > > > > > > >>>> - Shekar
> > > > > > > > > >>>>
> > > > > > > > > >>
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Kafka streams regex match

Posted by Shekar Tippur <ct...@gmail.com>.
I am running this on a mac laptop. I am using defaults.

Sent from my iPhone

> On Aug 8, 2017, at 03:11, Damian Guy <da...@gmail.com> wrote:
> 
> Hi Shekar, that warning is expected during rebalances and should generally
> resolve itself.
> How many threads/app instances are you running?
> It is impossible to tell what is happening with the full logs.
> 
> Thanks,
> Damian
> 
>> On Mon, 7 Aug 2017 at 22:46 Shekar Tippur <ct...@gmail.com> wrote:
>> 
>> Damien,
>> 
>> Thanks for pointing out the error. I had tried a different version of
>> initializing the store.
>> 
>> Now that I am able to compile, I started to get the below error. I looked
>> up other suggestions for the same error and followed up to upgrade Kafka to
>> 0.11.0.0 version. I still get this error :/
>> 
>> [2017-08-07 14:40:41,264] WARN stream-thread
>> [streams-pipe-b67a7ffa-5535-4311-8886-ad6362617dc5-StreamThread-1] Could
>> not create task 0_0. Will retry:
>> (org.apache.kafka.streams.processor.internals.StreamThread)
>> 
>> org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock
>> the state directory for task 0_0
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:99)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:80)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:111)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>> 
>> at
>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>> 
>> at
>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>> 
>> at
>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>> 
>> at
>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>> 
>> at
>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>> 
>> at
>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>> 
>>> On Fri, Aug 4, 2017 at 4:16 PM, Shekar Tippur <ct...@gmail.com> wrote:
>>> 
>>> Damian,
>>> 
>>> I am getting a syntax error. I have responded on gist.
>>> Appreciate any inputs.
>>> 
>>> - Shekar
>>> 
>>> On Sat, Jul 29, 2017 at 1:57 AM, Damian Guy <da...@gmail.com>
>> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> I left a comment on your gist.
>>>> 
>>>> Thanks,
>>>> Damian
>>>> 
>>>>> On Fri, 28 Jul 2017 at 21:50 Shekar Tippur <ct...@gmail.com> wrote:
>>>>> 
>>>>> Damien,
>>>>> 
>>>>> Here is a public gist:
>>>>> https://gist.github.com/ctippur/9f0900b1719793d0c67f5bb143d16ec8
>>>>> 
>>>>> - Shekar
>>>>> 
>>>>> On Fri, Jul 28, 2017 at 11:45 AM, Damian Guy <da...@gmail.com>
>>>> wrote:
>>>>> 
>>>>>> It might be easier if you make a github gist with your code. It is
>>>> quite
>>>>>> difficult to see what is happening in an email.
>>>>>> 
>>>>>> Cheers,
>>>>>> Damian
>>>>>> On Fri, 28 Jul 2017 at 19:22, Shekar Tippur <ct...@gmail.com>
>>>> wrote:
>>>>>> 
>>>>>>> Thanks a lot Damien.
>>>>>>> I am able to get to see if the join worked (using foreach). I
>> tried
>>>> to
>>>>>> add
>>>>>>> the logic to query the store after starting the streams:
>>>>>>> Looks like the code is not getting there. Here is the modified
>> code:
>>>>>>> 
>>>>>>> KafkaStreams streams = new KafkaStreams(builder, props);
>>>>>>> 
>>>>>>> streams.start();
>>>>>>> 
>>>>>>> 
>>>>>>> parser.foreach(new ForeachAction<String, JsonNode>() {
>>>>>>>    @Override
>>>>>>>    public void apply(String key, JsonNode value) {
>>>>>>>        System.out.println(key + ": " + value);
>>>>>>>        if (value == null){
>>>>>>>            System.out.println("null match");
>>>>>>>            ReadOnlyKeyValueStore<String, Long> keyValueStore =
>>>>>>>                    null;
>>>>>>>            try {
>>>>>>>                keyValueStore =
>>>>>>> IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
>>>>>>> QueryableStoreTypes.keyValueStore(), streams);
>>>>>>>            } catch (InterruptedException e) {
>>>>>>>                e.printStackTrace();
>>>>>>>            }
>>>>>>> 
>>>>>>>            KeyValueIterator  kviterator =
>>>>>>> keyValueStore.range("test_nod","test_node");
>>>>>>>        }
>>>>>>>    }
>>>>>>> });
>>>>>>> 
>>>>>>> 
>>>>>>> On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy <
>> damian.guy@gmail.com>
>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> The store won't be queryable until after you have called
>>>>>> streams.start().
>>>>>>>> No stores have been created until the application is up and
>>>> running
>>>>> and
>>>>>>>> they are dependent on the underlying partitions.
>>>>>>>> 
>>>>>>>> To check that a stateful operation has produced a result you
>> would
>>>>>>> normally
>>>>>>>> add another operation after the join, i.e.,
>>>>>>>> stream.join(other,...).foreach(..) or
>> stream.join(other,...).to("
>>>>>> topic")
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Damian
>>>>>>>> 
>>>>>>>> On Thu, 27 Jul 2017 at 22:52 Shekar Tippur <ct...@gmail.com>
>>>>> wrote:
>>>>>>>> 
>>>>>>>>> One more thing.. How do we check if the stateful join
>> operation
>>>>>>> resulted
>>>>>>>> in
>>>>>>>>> a kstream of some value in it (size of kstream)? How do we
>> check
>>>>> the
>>>>>>>>> content of a kstream?
>>>>>>>>> 
>>>>>>>>> - S
>>>>>>>>> 
>>>>>>>>> On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur <
>>>> ctippur@gmail.com>
>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Damien,
>>>>>>>>>> 
>>>>>>>>>> Thanks a lot for pointing out.
>>>>>>>>>> 
>>>>>>>>>> I got a little further. I am kind of stuck with the
>>>> sequencing.
>>>>>>> Couple
>>>>>>>> of
>>>>>>>>>> issues:
>>>>>>>>>> 1. I cannot initialise KafkaStreams before the parser.to().
>>>>>>>>>> 2. Do I need to create a new KafkaStreams object when I
>>>> create a
>>>>>>>>>> KeyValueStore?
>>>>>>>>>> 3. How do I initialize KeyValueIterator with <String,
>>>> JsonNode> I
>>>>>>> seem
>>>>>>>> to
>>>>>>>>>> get a error when I try:
>>>>>>>>>> *KeyValueIterator <String,JsonNode> kviterator
>>>>>>>>>> = keyValueStore.range("test_nod","test_node");*
>>>>>>>>>> 
>>>>>>>>>> /////// START CODE /////////
>>>>>>>>>> //parser is a kstream as a result of join
>>>>>>>>>> if (parser.toString().matches("null")){
>>>>>>>>>> 
>>>>>>>>>>    ReadOnlyKeyValueStore<String, Long> keyValueStore =
>>>>>>>>>>            null;
>>>>>>>>>>    KafkaStreams newstreams = new KafkaStreams(builder,
>>>> props);
>>>>>>>>>>    try {
>>>>>>>>>>        keyValueStore =
>>>>>>>>> IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
>>>>>>>>>> QueryableStoreTypes.keyValueStore(), newstreams);
>>>>>>>>>>    } catch (InterruptedException e) {
>>>>>>>>>>        e.printStackTrace();
>>>>>>>>>>    }
>>>>>>>>>> *    KeyValueIterator kviterator
>>>>>>>>>> = keyValueStore.range("test_nod","test_node");*
>>>>>>>>>> }else {
>>>>>>>>>> 
>>>>>>>>>> *    parser.to <http://parser.to>(stringSerde, jsonSerde,
>>>>>>> "parser");*}
>>>>>>>>>> 
>>>>>>>>>> *KafkaStreams streams = new KafkaStreams(builder, props);*
>>>>>>>>>> streams.start();
>>>>>>>>>> 
>>>>>>>>>> /////// END CODE /////////
>>>>>>>>>> 
>>>>>>>>>> - S
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy <
>>>>> damian.guy@gmail.com
>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> It is part of the ReadOnlyKeyValueStore interface:
>>>>>>>>>>> 
>>>>>>>>>>> https://github.com/apache/kafka/blob/trunk/streams/src/
>>>>>>>>>> main/java/org/apache/kafka/streams/state/
>>>>>> ReadOnlyKeyValueStore.java
>>>>>>>>>>> 
>>>>>>>>>>> On Thu, 27 Jul 2017 at 17:17 Shekar Tippur <
>>>> ctippur@gmail.com>
>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> That's cool. This feature is a part of rocksdb object
>> and
>>>> not
>>>>>>>> ktable?
>>>>>>>>>>>> 
>>>>>>>>>>>> Sent from my iPhone
>>>>>>>>>>>> 
>>>>>>>>>>>>> On Jul 27, 2017, at 07:57, Damian Guy <
>>>>> damian.guy@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Yes they can be strings,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> so you could do something like:
>>>>>>>>>>>>> store.range("test_host", "test_hosu");
>>>>>>>>>>>>> 
>>>>>>>>>>>>> This would return an iterator containing all of the
>>>> values
>>>>>>>>>> (inclusive)
>>>>>>>>>>>> from
>>>>>>>>>>>>> "test_host" -> "test_hosu".
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur <
>>>>>> ctippur@gmail.com
>>>>>>>> 
>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Can you please point me to an example? Can from and
>> to
>>>> be
>>>>> a
>>>>>>>>> string?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Sent from my iPhone
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Jul 27, 2017, at 04:04, Damian Guy <
>>>>>> damian.guy@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> You can't use a regex, but you could use a range
>>>> query.
>>>>>>>>>>>>>>> i.e, keyValueStore.range(from, to)
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur <
>>>>>>> ctippur@gmail.com
>>>>>>>>> 
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I am able to get the kstream to ktable join work. I
>>>> have
>>>>>>> some
>>>>>>>>> use
>>>>>>>>>>>> cases
>>>>>>>>>>>>>>>> where the key is not always a exact match.
>>>>>>>>>>>>>>>> I was wondering if there is a way to lookup keys
>>>> based
>>>>> on
>>>>>>>> regex.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> For example,
>>>>>>>>>>>>>>>> I have these entries for a ktable:
>>>>>>>>>>>>>>>> test_host1,{ "source": "test_host", "UL1":
>>>> "test1_l1" }
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> test_host2,{ "source": "test_host2", "UL1":
>>>> "test2_l2" }
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> test_host3,{ "source": "test_host3", "UL1":
>>>> "test3_l3" }
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> blah,{ "source": "blah_host", "UL1": "blah_l3" }
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> and this for a kstream:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> test_host,{ "source": "test_host", "custom": {
>> "test
>>>> ":
>>>>> {
>>>>>>>>>>>>>> "creation_time ":
>>>>>>>>>>>>>>>> "1234 " } } }
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> In this case, if the exact match does not work, I
>>>> would
>>>>>> like
>>>>>>>> to
>>>>>>>>>> lookup
>>>>>>>>>>>>>>>> ktable for all entries that contains "test_host*"
>> in
>>>> it
>>>>>> and
>>>>>>>> have
>>>>>>>>>>>>>>>> application logic to determine what would be the
>> best
>>>>> fit.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Appreciate input.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> - Shekar
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
>> 

Re: Kafka streams regex match

Posted by Damian Guy <da...@gmail.com>.
Hi Shekar, that warning is expected during rebalances and should generally
resolve itself.
How many threads/app instances are you running?
It is impossible to tell what is happening with the full logs.

Thanks,
Damian

On Mon, 7 Aug 2017 at 22:46 Shekar Tippur <ct...@gmail.com> wrote:

> Damien,
>
> Thanks for pointing out the error. I had tried a different version of
> initializing the store.
>
> Now that I am able to compile, I started to get the below error. I looked
> up other suggestions for the same error and followed up to upgrade Kafka to
> 0.11.0.0 version. I still get this error :/
>
> [2017-08-07 14:40:41,264] WARN stream-thread
> [streams-pipe-b67a7ffa-5535-4311-8886-ad6362617dc5-StreamThread-1] Could
> not create task 0_0. Will retry:
> (org.apache.kafka.streams.processor.internals.StreamThread)
>
> org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock
> the state directory for task 0_0
>
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:99)
>
> at
>
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:80)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:111)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>
> On Fri, Aug 4, 2017 at 4:16 PM, Shekar Tippur <ct...@gmail.com> wrote:
>
> > Damian,
> >
> > I am getting a syntax error. I have responded on gist.
> > Appreciate any inputs.
> >
> > - Shekar
> >
> > On Sat, Jul 29, 2017 at 1:57 AM, Damian Guy <da...@gmail.com>
> wrote:
> >
> >> Hi,
> >>
> >> I left a comment on your gist.
> >>
> >> Thanks,
> >> Damian
> >>
> >> On Fri, 28 Jul 2017 at 21:50 Shekar Tippur <ct...@gmail.com> wrote:
> >>
> >> > Damien,
> >> >
> >> > Here is a public gist:
> >> > https://gist.github.com/ctippur/9f0900b1719793d0c67f5bb143d16ec8
> >> >
> >> > - Shekar
> >> >
> >> > On Fri, Jul 28, 2017 at 11:45 AM, Damian Guy <da...@gmail.com>
> >> wrote:
> >> >
> >> > > It might be easier if you make a github gist with your code. It is
> >> quite
> >> > > difficult to see what is happening in an email.
> >> > >
> >> > > Cheers,
> >> > > Damian
> >> > > On Fri, 28 Jul 2017 at 19:22, Shekar Tippur <ct...@gmail.com>
> >> wrote:
> >> > >
> >> > > > Thanks a lot Damien.
> >> > > > I am able to get to see if the join worked (using foreach). I
> tried
> >> to
> >> > > add
> >> > > > the logic to query the store after starting the streams:
> >> > > > Looks like the code is not getting there. Here is the modified
> code:
> >> > > >
> >> > > > KafkaStreams streams = new KafkaStreams(builder, props);
> >> > > >
> >> > > > streams.start();
> >> > > >
> >> > > >
> >> > > > parser.foreach(new ForeachAction<String, JsonNode>() {
> >> > > >     @Override
> >> > > >     public void apply(String key, JsonNode value) {
> >> > > >         System.out.println(key + ": " + value);
> >> > > >         if (value == null){
> >> > > >             System.out.println("null match");
> >> > > >             ReadOnlyKeyValueStore<String, Long> keyValueStore =
> >> > > >                     null;
> >> > > >             try {
> >> > > >                 keyValueStore =
> >> > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> >> > > > QueryableStoreTypes.keyValueStore(), streams);
> >> > > >             } catch (InterruptedException e) {
> >> > > >                 e.printStackTrace();
> >> > > >             }
> >> > > >
> >> > > >             KeyValueIterator  kviterator =
> >> > > > keyValueStore.range("test_nod","test_node");
> >> > > >         }
> >> > > >     }
> >> > > > });
> >> > > >
> >> > > >
> >> > > > On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy <
> damian.guy@gmail.com>
> >> > > wrote:
> >> > > >
> >> > > > > Hi,
> >> > > > > The store won't be queryable until after you have called
> >> > > streams.start().
> >> > > > > No stores have been created until the application is up and
> >> running
> >> > and
> >> > > > > they are dependent on the underlying partitions.
> >> > > > >
> >> > > > > To check that a stateful operation has produced a result you
> would
> >> > > > normally
> >> > > > > add another operation after the join, i.e.,
> >> > > > > stream.join(other,...).foreach(..) or
> stream.join(other,...).to("
> >> > > topic")
> >> > > > >
> >> > > > > Thanks,
> >> > > > > Damian
> >> > > > >
> >> > > > > On Thu, 27 Jul 2017 at 22:52 Shekar Tippur <ct...@gmail.com>
> >> > wrote:
> >> > > > >
> >> > > > > > One more thing.. How do we check if the stateful join
> operation
> >> > > > resulted
> >> > > > > in
> >> > > > > > a kstream of some value in it (size of kstream)? How do we
> check
> >> > the
> >> > > > > > content of a kstream?
> >> > > > > >
> >> > > > > > - S
> >> > > > > >
> >> > > > > > On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur <
> >> ctippur@gmail.com>
> >> > > > > wrote:
> >> > > > > >
> >> > > > > > > Damien,
> >> > > > > > >
> >> > > > > > > Thanks a lot for pointing out.
> >> > > > > > >
> >> > > > > > > I got a little further. I am kind of stuck with the
> >> sequencing.
> >> > > > Couple
> >> > > > > of
> >> > > > > > > issues:
> >> > > > > > > 1. I cannot initialise KafkaStreams before the parser.to().
> >> > > > > > > 2. Do I need to create a new KafkaStreams object when I
> >> create a
> >> > > > > > > KeyValueStore?
> >> > > > > > > 3. How do I initialize KeyValueIterator with <String,
> >> JsonNode> I
> >> > > > seem
> >> > > > > to
> >> > > > > > > get a error when I try:
> >> > > > > > > *KeyValueIterator <String,JsonNode> kviterator
> >> > > > > > > = keyValueStore.range("test_nod","test_node");*
> >> > > > > > >
> >> > > > > > > /////// START CODE /////////
> >> > > > > > > //parser is a kstream as a result of join
> >> > > > > > > if (parser.toString().matches("null")){
> >> > > > > > >
> >> > > > > > >     ReadOnlyKeyValueStore<String, Long> keyValueStore =
> >> > > > > > >             null;
> >> > > > > > >     KafkaStreams newstreams = new KafkaStreams(builder,
> >> props);
> >> > > > > > >     try {
> >> > > > > > >         keyValueStore =
> >> > > > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> >> > > > > > > QueryableStoreTypes.keyValueStore(), newstreams);
> >> > > > > > >     } catch (InterruptedException e) {
> >> > > > > > >         e.printStackTrace();
> >> > > > > > >     }
> >> > > > > > > *    KeyValueIterator kviterator
> >> > > > > > > = keyValueStore.range("test_nod","test_node");*
> >> > > > > > > }else {
> >> > > > > > >
> >> > > > > > > *    parser.to <http://parser.to>(stringSerde, jsonSerde,
> >> > > > "parser");*}
> >> > > > > > >
> >> > > > > > > *KafkaStreams streams = new KafkaStreams(builder, props);*
> >> > > > > > > streams.start();
> >> > > > > > >
> >> > > > > > > /////// END CODE /////////
> >> > > > > > >
> >> > > > > > > - S
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy <
> >> > damian.guy@gmail.com
> >> > > >
> >> > > > > > wrote:
> >> > > > > > > >
> >> > > > > > > > It is part of the ReadOnlyKeyValueStore interface:
> >> > > > > > > >
> >> > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/
> >> > > > > > > main/java/org/apache/kafka/streams/state/
> >> > > ReadOnlyKeyValueStore.java
> >> > > > > > > >
> >> > > > > > > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur <
> >> ctippur@gmail.com>
> >> > > > > wrote:
> >> > > > > > > >
> >> > > > > > > > > That's cool. This feature is a part of rocksdb object
> and
> >> not
> >> > > > > ktable?
> >> > > > > > > > >
> >> > > > > > > > > Sent from my iPhone
> >> > > > > > > > >
> >> > > > > > > > > > On Jul 27, 2017, at 07:57, Damian Guy <
> >> > damian.guy@gmail.com>
> >> > > > > > wrote:
> >> > > > > > > > > >
> >> > > > > > > > > > Yes they can be strings,
> >> > > > > > > > > >
> >> > > > > > > > > > so you could do something like:
> >> > > > > > > > > > store.range("test_host", "test_hosu");
> >> > > > > > > > > >
> >> > > > > > > > > > This would return an iterator containing all of the
> >> values
> >> > > > > > > (inclusive)
> >> > > > > > > > > from
> >> > > > > > > > > > "test_host" -> "test_hosu".
> >> > > > > > > > > >
> >> > > > > > > > > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur <
> >> > > ctippur@gmail.com
> >> > > > >
> >> > > > > > > wrote:
> >> > > > > > > > > >>
> >> > > > > > > > > >> Can you please point me to an example? Can from and
> to
> >> be
> >> > a
> >> > > > > > string?
> >> > > > > > > > > >>
> >> > > > > > > > > >> Sent from my iPhone
> >> > > > > > > > > >>
> >> > > > > > > > > >>> On Jul 27, 2017, at 04:04, Damian Guy <
> >> > > damian.guy@gmail.com>
> >> > > > > > > wrote:
> >> > > > > > > > > >>>
> >> > > > > > > > > >>> Hi,
> >> > > > > > > > > >>>
> >> > > > > > > > > >>> You can't use a regex, but you could use a range
> >> query.
> >> > > > > > > > > >>> i.e, keyValueStore.range(from, to)
> >> > > > > > > > > >>>
> >> > > > > > > > > >>> Thanks,
> >> > > > > > > > > >>> Damian
> >> > > > > > > > > >>>
> >> > > > > > > > > >>>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur <
> >> > > > ctippur@gmail.com
> >> > > > > >
> >> > > > > > > wrote:
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> Hello,
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> I am able to get the kstream to ktable join work. I
> >> have
> >> > > > some
> >> > > > > > use
> >> > > > > > > > > cases
> >> > > > > > > > > >>>> where the key is not always a exact match.
> >> > > > > > > > > >>>> I was wondering if there is a way to lookup keys
> >> based
> >> > on
> >> > > > > regex.
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> For example,
> >> > > > > > > > > >>>> I have these entries for a ktable:
> >> > > > > > > > > >>>> test_host1,{ "source": "test_host", "UL1":
> >> "test1_l1" }
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> test_host2,{ "source": "test_host2", "UL1":
> >> "test2_l2" }
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> test_host3,{ "source": "test_host3", "UL1":
> >> "test3_l3" }
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> blah,{ "source": "blah_host", "UL1": "blah_l3" }
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> and this for a kstream:
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> test_host,{ "source": "test_host", "custom": {
> "test
> >> ":
> >> > {
> >> > > > > > > > > >> "creation_time ":
> >> > > > > > > > > >>>> "1234 " } } }
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> In this case, if the exact match does not work, I
> >> would
> >> > > like
> >> > > > > to
> >> > > > > > > lookup
> >> > > > > > > > > >>>> ktable for all entries that contains "test_host*"
> in
> >> it
> >> > > and
> >> > > > > have
> >> > > > > > > > > >>>> application logic to determine what would be the
> best
> >> > fit.
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> Appreciate input.
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> - Shekar
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>
> >> > > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Re: Kafka streams regex match

Posted by Shekar Tippur <ct...@gmail.com>.
Damien,

Thanks for pointing out the error. I had tried a different version of
initializing the store.

Now that I am able to compile, I started to get the below error. I looked
up other suggestions for the same error and followed up to upgrade Kafka to
0.11.0.0 version. I still get this error :/

[2017-08-07 14:40:41,264] WARN stream-thread
[streams-pipe-b67a7ffa-5535-4311-8886-ad6362617dc5-StreamThread-1] Could
not create task 0_0. Will retry:
(org.apache.kafka.streams.processor.internals.StreamThread)

org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock
the state directory for task 0_0

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:99)

at
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:80)

at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:111)

at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)

at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)

at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)

at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)

at
org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)

at
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)

at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)

at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)

at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)

On Fri, Aug 4, 2017 at 4:16 PM, Shekar Tippur <ct...@gmail.com> wrote:

> Damian,
>
> I am getting a syntax error. I have responded on gist.
> Appreciate any inputs.
>
> - Shekar
>
> On Sat, Jul 29, 2017 at 1:57 AM, Damian Guy <da...@gmail.com> wrote:
>
>> Hi,
>>
>> I left a comment on your gist.
>>
>> Thanks,
>> Damian
>>
>> On Fri, 28 Jul 2017 at 21:50 Shekar Tippur <ct...@gmail.com> wrote:
>>
>> > Damien,
>> >
>> > Here is a public gist:
>> > https://gist.github.com/ctippur/9f0900b1719793d0c67f5bb143d16ec8
>> >
>> > - Shekar
>> >
>> > On Fri, Jul 28, 2017 at 11:45 AM, Damian Guy <da...@gmail.com>
>> wrote:
>> >
>> > > It might be easier if you make a github gist with your code. It is
>> quite
>> > > difficult to see what is happening in an email.
>> > >
>> > > Cheers,
>> > > Damian
>> > > On Fri, 28 Jul 2017 at 19:22, Shekar Tippur <ct...@gmail.com>
>> wrote:
>> > >
>> > > > Thanks a lot Damien.
>> > > > I am able to get to see if the join worked (using foreach). I tried
>> to
>> > > add
>> > > > the logic to query the store after starting the streams:
>> > > > Looks like the code is not getting there. Here is the modified code:
>> > > >
>> > > > KafkaStreams streams = new KafkaStreams(builder, props);
>> > > >
>> > > > streams.start();
>> > > >
>> > > >
>> > > > parser.foreach(new ForeachAction<String, JsonNode>() {
>> > > >     @Override
>> > > >     public void apply(String key, JsonNode value) {
>> > > >         System.out.println(key + ": " + value);
>> > > >         if (value == null){
>> > > >             System.out.println("null match");
>> > > >             ReadOnlyKeyValueStore<String, Long> keyValueStore =
>> > > >                     null;
>> > > >             try {
>> > > >                 keyValueStore =
>> > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
>> > > > QueryableStoreTypes.keyValueStore(), streams);
>> > > >             } catch (InterruptedException e) {
>> > > >                 e.printStackTrace();
>> > > >             }
>> > > >
>> > > >             KeyValueIterator  kviterator =
>> > > > keyValueStore.range("test_nod","test_node");
>> > > >         }
>> > > >     }
>> > > > });
>> > > >
>> > > >
>> > > > On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy <da...@gmail.com>
>> > > wrote:
>> > > >
>> > > > > Hi,
>> > > > > The store won't be queryable until after you have called
>> > > streams.start().
>> > > > > No stores have been created until the application is up and
>> running
>> > and
>> > > > > they are dependent on the underlying partitions.
>> > > > >
>> > > > > To check that a stateful operation has produced a result you would
>> > > > normally
>> > > > > add another operation after the join, i.e.,
>> > > > > stream.join(other,...).foreach(..) or stream.join(other,...).to("
>> > > topic")
>> > > > >
>> > > > > Thanks,
>> > > > > Damian
>> > > > >
>> > > > > On Thu, 27 Jul 2017 at 22:52 Shekar Tippur <ct...@gmail.com>
>> > wrote:
>> > > > >
>> > > > > > One more thing.. How do we check if the stateful join operation
>> > > > resulted
>> > > > > in
>> > > > > > a kstream of some value in it (size of kstream)? How do we check
>> > the
>> > > > > > content of a kstream?
>> > > > > >
>> > > > > > - S
>> > > > > >
>> > > > > > On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur <
>> ctippur@gmail.com>
>> > > > > wrote:
>> > > > > >
>> > > > > > > Damien,
>> > > > > > >
>> > > > > > > Thanks a lot for pointing out.
>> > > > > > >
>> > > > > > > I got a little further. I am kind of stuck with the
>> sequencing.
>> > > > Couple
>> > > > > of
>> > > > > > > issues:
>> > > > > > > 1. I cannot initialise KafkaStreams before the parser.to().
>> > > > > > > 2. Do I need to create a new KafkaStreams object when I
>> create a
>> > > > > > > KeyValueStore?
>> > > > > > > 3. How do I initialize KeyValueIterator with <String,
>> JsonNode> I
>> > > > seem
>> > > > > to
>> > > > > > > get a error when I try:
>> > > > > > > *KeyValueIterator <String,JsonNode> kviterator
>> > > > > > > = keyValueStore.range("test_nod","test_node");*
>> > > > > > >
>> > > > > > > /////// START CODE /////////
>> > > > > > > //parser is a kstream as a result of join
>> > > > > > > if (parser.toString().matches("null")){
>> > > > > > >
>> > > > > > >     ReadOnlyKeyValueStore<String, Long> keyValueStore =
>> > > > > > >             null;
>> > > > > > >     KafkaStreams newstreams = new KafkaStreams(builder,
>> props);
>> > > > > > >     try {
>> > > > > > >         keyValueStore =
>> > > > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
>> > > > > > > QueryableStoreTypes.keyValueStore(), newstreams);
>> > > > > > >     } catch (InterruptedException e) {
>> > > > > > >         e.printStackTrace();
>> > > > > > >     }
>> > > > > > > *    KeyValueIterator kviterator
>> > > > > > > = keyValueStore.range("test_nod","test_node");*
>> > > > > > > }else {
>> > > > > > >
>> > > > > > > *    parser.to <http://parser.to>(stringSerde, jsonSerde,
>> > > > "parser");*}
>> > > > > > >
>> > > > > > > *KafkaStreams streams = new KafkaStreams(builder, props);*
>> > > > > > > streams.start();
>> > > > > > >
>> > > > > > > /////// END CODE /////////
>> > > > > > >
>> > > > > > > - S
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy <
>> > damian.guy@gmail.com
>> > > >
>> > > > > > wrote:
>> > > > > > > >
>> > > > > > > > It is part of the ReadOnlyKeyValueStore interface:
>> > > > > > > >
>> > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/
>> > > > > > > main/java/org/apache/kafka/streams/state/
>> > > ReadOnlyKeyValueStore.java
>> > > > > > > >
>> > > > > > > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur <
>> ctippur@gmail.com>
>> > > > > wrote:
>> > > > > > > >
>> > > > > > > > > That's cool. This feature is a part of rocksdb object and
>> not
>> > > > > ktable?
>> > > > > > > > >
>> > > > > > > > > Sent from my iPhone
>> > > > > > > > >
>> > > > > > > > > > On Jul 27, 2017, at 07:57, Damian Guy <
>> > damian.guy@gmail.com>
>> > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > Yes they can be strings,
>> > > > > > > > > >
>> > > > > > > > > > so you could do something like:
>> > > > > > > > > > store.range("test_host", "test_hosu");
>> > > > > > > > > >
>> > > > > > > > > > This would return an iterator containing all of the
>> values
>> > > > > > > (inclusive)
>> > > > > > > > > from
>> > > > > > > > > > "test_host" -> "test_hosu".
>> > > > > > > > > >
>> > > > > > > > > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur <
>> > > ctippur@gmail.com
>> > > > >
>> > > > > > > wrote:
>> > > > > > > > > >>
>> > > > > > > > > >> Can you please point me to an example? Can from and to
>> be
>> > a
>> > > > > > string?
>> > > > > > > > > >>
>> > > > > > > > > >> Sent from my iPhone
>> > > > > > > > > >>
>> > > > > > > > > >>> On Jul 27, 2017, at 04:04, Damian Guy <
>> > > damian.guy@gmail.com>
>> > > > > > > wrote:
>> > > > > > > > > >>>
>> > > > > > > > > >>> Hi,
>> > > > > > > > > >>>
>> > > > > > > > > >>> You can't use a regex, but you could use a range
>> query.
>> > > > > > > > > >>> i.e, keyValueStore.range(from, to)
>> > > > > > > > > >>>
>> > > > > > > > > >>> Thanks,
>> > > > > > > > > >>> Damian
>> > > > > > > > > >>>
>> > > > > > > > > >>>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur <
>> > > > ctippur@gmail.com
>> > > > > >
>> > > > > > > wrote:
>> > > > > > > > > >>>>
>> > > > > > > > > >>>> Hello,
>> > > > > > > > > >>>>
>> > > > > > > > > >>>> I am able to get the kstream to ktable join work. I
>> have
>> > > > some
>> > > > > > use
>> > > > > > > > > cases
>> > > > > > > > > >>>> where the key is not always a exact match.
>> > > > > > > > > >>>> I was wondering if there is a way to lookup keys
>> based
>> > on
>> > > > > regex.
>> > > > > > > > > >>>>
>> > > > > > > > > >>>> For example,
>> > > > > > > > > >>>> I have these entries for a ktable:
>> > > > > > > > > >>>> test_host1,{ "source": "test_host", "UL1":
>> "test1_l1" }
>> > > > > > > > > >>>>
>> > > > > > > > > >>>> test_host2,{ "source": "test_host2", "UL1":
>> "test2_l2" }
>> > > > > > > > > >>>>
>> > > > > > > > > >>>> test_host3,{ "source": "test_host3", "UL1":
>> "test3_l3" }
>> > > > > > > > > >>>>
>> > > > > > > > > >>>> blah,{ "source": "blah_host", "UL1": "blah_l3" }
>> > > > > > > > > >>>>
>> > > > > > > > > >>>> and this for a kstream:
>> > > > > > > > > >>>>
>> > > > > > > > > >>>> test_host,{ "source": "test_host", "custom": { "test
>> ":
>> > {
>> > > > > > > > > >> "creation_time ":
>> > > > > > > > > >>>> "1234 " } } }
>> > > > > > > > > >>>>
>> > > > > > > > > >>>> In this case, if the exact match does not work, I
>> would
>> > > like
>> > > > > to
>> > > > > > > lookup
>> > > > > > > > > >>>> ktable for all entries that contains "test_host*" in
>> it
>> > > and
>> > > > > have
>> > > > > > > > > >>>> application logic to determine what would be the best
>> > fit.
>> > > > > > > > > >>>>
>> > > > > > > > > >>>> Appreciate input.
>> > > > > > > > > >>>>
>> > > > > > > > > >>>> - Shekar
>> > > > > > > > > >>>>
>> > > > > > > > > >>
>> > > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>