You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Shekar Tippur <ct...@gmail.com> on 2017/07/26 21:34:35 UTC

Kafka streams regex match

Hello,

I am able to get the kstream to ktable join work. I have some use cases
where the key is not always a exact match.
I was wondering if there is a way to lookup keys based on regex.

For example,
I have these entries for a ktable:
test_host1,{ "source": "test_host", "UL1": "test1_l1" }

test_host2,{ "source": "test_host2", "UL1": "test2_l2" }

test_host3,{ "source": "test_host3", "UL1": "test3_l3" }

blah,{ "source": "blah_host", "UL1": "blah_l3" }

and this for a kstream:

test_host,{ "source": "test_host", "custom": { "test ": { "creation_time ":
"1234 " } } }

In this case, if the exact match does not work, I would like to lookup
ktable for all entries that contains "test_host*" in it and have
application logic to determine what would be the best fit.

Appreciate input.

- Shekar

Re: Kafka streams regex match

Posted by Shekar Tippur <ct...@gmail.com>.
I am running this on a mac laptop. I am using defaults.

Sent from my iPhone

> On Aug 8, 2017, at 03:11, Damian Guy <da...@gmail.com> wrote:
> 
> Hi Shekar, that warning is expected during rebalances and should generally
> resolve itself.
> How many threads/app instances are you running?
> It is impossible to tell what is happening with the full logs.
> 
> Thanks,
> Damian
> 
>> On Mon, 7 Aug 2017 at 22:46 Shekar Tippur <ct...@gmail.com> wrote:
>> 
>> Damien,
>> 
>> Thanks for pointing out the error. I had tried a different version of
>> initializing the store.
>> 
>> Now that I am able to compile, I started to get the below error. I looked
>> up other suggestions for the same error and followed up to upgrade Kafka to
>> 0.11.0.0 version. I still get this error :/
>> 
>> [2017-08-07 14:40:41,264] WARN stream-thread
>> [streams-pipe-b67a7ffa-5535-4311-8886-ad6362617dc5-StreamThread-1] Could
>> not create task 0_0. Will retry:
>> (org.apache.kafka.streams.processor.internals.StreamThread)
>> 
>> org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock
>> the state directory for task 0_0
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:99)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:80)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:111)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>> 
>> at
>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>> 
>> at
>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>> 
>> at
>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>> 
>> at
>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>> 
>> at
>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>> 
>> at
>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>> 
>>> On Fri, Aug 4, 2017 at 4:16 PM, Shekar Tippur <ct...@gmail.com> wrote:
>>> 
>>> Damian,
>>> 
>>> I am getting a syntax error. I have responded on gist.
>>> Appreciate any inputs.
>>> 
>>> - Shekar
>>> 
>>> On Sat, Jul 29, 2017 at 1:57 AM, Damian Guy <da...@gmail.com>
>> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> I left a comment on your gist.
>>>> 
>>>> Thanks,
>>>> Damian
>>>> 
>>>>> On Fri, 28 Jul 2017 at 21:50 Shekar Tippur <ct...@gmail.com> wrote:
>>>>> 
>>>>> Damien,
>>>>> 
>>>>> Here is a public gist:
>>>>> https://gist.github.com/ctippur/9f0900b1719793d0c67f5bb143d16ec8
>>>>> 
>>>>> - Shekar
>>>>> 
>>>>> On Fri, Jul 28, 2017 at 11:45 AM, Damian Guy <da...@gmail.com>
>>>> wrote:
>>>>> 
>>>>>> It might be easier if you make a github gist with your code. It is
>>>> quite
>>>>>> difficult to see what is happening in an email.
>>>>>> 
>>>>>> Cheers,
>>>>>> Damian
>>>>>> On Fri, 28 Jul 2017 at 19:22, Shekar Tippur <ct...@gmail.com>
>>>> wrote:
>>>>>> 
>>>>>>> Thanks a lot Damien.
>>>>>>> I am able to get to see if the join worked (using foreach). I
>> tried
>>>> to
>>>>>> add
>>>>>>> the logic to query the store after starting the streams:
>>>>>>> Looks like the code is not getting there. Here is the modified
>> code:
>>>>>>> 
>>>>>>> KafkaStreams streams = new KafkaStreams(builder, props);
>>>>>>> 
>>>>>>> streams.start();
>>>>>>> 
>>>>>>> 
>>>>>>> parser.foreach(new ForeachAction<String, JsonNode>() {
>>>>>>>    @Override
>>>>>>>    public void apply(String key, JsonNode value) {
>>>>>>>        System.out.println(key + ": " + value);
>>>>>>>        if (value == null){
>>>>>>>            System.out.println("null match");
>>>>>>>            ReadOnlyKeyValueStore<String, Long> keyValueStore =
>>>>>>>                    null;
>>>>>>>            try {
>>>>>>>                keyValueStore =
>>>>>>> IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
>>>>>>> QueryableStoreTypes.keyValueStore(), streams);
>>>>>>>            } catch (InterruptedException e) {
>>>>>>>                e.printStackTrace();
>>>>>>>            }
>>>>>>> 
>>>>>>>            KeyValueIterator  kviterator =
>>>>>>> keyValueStore.range("test_nod","test_node");
>>>>>>>        }
>>>>>>>    }
>>>>>>> });
>>>>>>> 
>>>>>>> 
>>>>>>> On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy <
>> damian.guy@gmail.com>
>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> The store won't be queryable until after you have called
>>>>>> streams.start().
>>>>>>>> No stores have been created until the application is up and
>>>> running
>>>>> and
>>>>>>>> they are dependent on the underlying partitions.
>>>>>>>> 
>>>>>>>> To check that a stateful operation has produced a result you
>> would
>>>>>>> normally
>>>>>>>> add another operation after the join, i.e.,
>>>>>>>> stream.join(other,...).foreach(..) or
>> stream.join(other,...).to("
>>>>>> topic")
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Damian
>>>>>>>> 
>>>>>>>> On Thu, 27 Jul 2017 at 22:52 Shekar Tippur <ct...@gmail.com>
>>>>> wrote:
>>>>>>>> 
>>>>>>>>> One more thing.. How do we check if the stateful join
>> operation
>>>>>>> resulted
>>>>>>>> in
>>>>>>>>> a kstream of some value in it (size of kstream)? How do we
>> check
>>>>> the
>>>>>>>>> content of a kstream?
>>>>>>>>> 
>>>>>>>>> - S
>>>>>>>>> 
>>>>>>>>> On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur <
>>>> ctippur@gmail.com>
>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Damien,
>>>>>>>>>> 
>>>>>>>>>> Thanks a lot for pointing out.
>>>>>>>>>> 
>>>>>>>>>> I got a little further. I am kind of stuck with the
>>>> sequencing.
>>>>>>> Couple
>>>>>>>> of
>>>>>>>>>> issues:
>>>>>>>>>> 1. I cannot initialise KafkaStreams before the parser.to().
>>>>>>>>>> 2. Do I need to create a new KafkaStreams object when I
>>>> create a
>>>>>>>>>> KeyValueStore?
>>>>>>>>>> 3. How do I initialize KeyValueIterator with <String,
>>>> JsonNode> I
>>>>>>> seem
>>>>>>>> to
>>>>>>>>>> get a error when I try:
>>>>>>>>>> *KeyValueIterator <String,JsonNode> kviterator
>>>>>>>>>> = keyValueStore.range("test_nod","test_node");*
>>>>>>>>>> 
>>>>>>>>>> /////// START CODE /////////
>>>>>>>>>> //parser is a kstream as a result of join
>>>>>>>>>> if (parser.toString().matches("null")){
>>>>>>>>>> 
>>>>>>>>>>    ReadOnlyKeyValueStore<String, Long> keyValueStore =
>>>>>>>>>>            null;
>>>>>>>>>>    KafkaStreams newstreams = new KafkaStreams(builder,
>>>> props);
>>>>>>>>>>    try {
>>>>>>>>>>        keyValueStore =
>>>>>>>>> IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
>>>>>>>>>> QueryableStoreTypes.keyValueStore(), newstreams);
>>>>>>>>>>    } catch (InterruptedException e) {
>>>>>>>>>>        e.printStackTrace();
>>>>>>>>>>    }
>>>>>>>>>> *    KeyValueIterator kviterator
>>>>>>>>>> = keyValueStore.range("test_nod","test_node");*
>>>>>>>>>> }else {
>>>>>>>>>> 
>>>>>>>>>> *    parser.to <http://parser.to>(stringSerde, jsonSerde,
>>>>>>> "parser");*}
>>>>>>>>>> 
>>>>>>>>>> *KafkaStreams streams = new KafkaStreams(builder, props);*
>>>>>>>>>> streams.start();
>>>>>>>>>> 
>>>>>>>>>> /////// END CODE /////////
>>>>>>>>>> 
>>>>>>>>>> - S
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy <
>>>>> damian.guy@gmail.com
>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> It is part of the ReadOnlyKeyValueStore interface:
>>>>>>>>>>> 
>>>>>>>>>>> https://github.com/apache/kafka/blob/trunk/streams/src/
>>>>>>>>>> main/java/org/apache/kafka/streams/state/
>>>>>> ReadOnlyKeyValueStore.java
>>>>>>>>>>> 
>>>>>>>>>>> On Thu, 27 Jul 2017 at 17:17 Shekar Tippur <
>>>> ctippur@gmail.com>
>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> That's cool. This feature is a part of rocksdb object
>> and
>>>> not
>>>>>>>> ktable?
>>>>>>>>>>>> 
>>>>>>>>>>>> Sent from my iPhone
>>>>>>>>>>>> 
>>>>>>>>>>>>> On Jul 27, 2017, at 07:57, Damian Guy <
>>>>> damian.guy@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Yes they can be strings,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> so you could do something like:
>>>>>>>>>>>>> store.range("test_host", "test_hosu");
>>>>>>>>>>>>> 
>>>>>>>>>>>>> This would return an iterator containing all of the
>>>> values
>>>>>>>>>> (inclusive)
>>>>>>>>>>>> from
>>>>>>>>>>>>> "test_host" -> "test_hosu".
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur <
>>>>>> ctippur@gmail.com
>>>>>>>> 
>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Can you please point me to an example? Can from and
>> to
>>>> be
>>>>> a
>>>>>>>>> string?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Sent from my iPhone
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Jul 27, 2017, at 04:04, Damian Guy <
>>>>>> damian.guy@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> You can't use a regex, but you could use a range
>>>> query.
>>>>>>>>>>>>>>> i.e, keyValueStore.range(from, to)
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur <
>>>>>>> ctippur@gmail.com
>>>>>>>>> 
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I am able to get the kstream to ktable join work. I
>>>> have
>>>>>>> some
>>>>>>>>> use
>>>>>>>>>>>> cases
>>>>>>>>>>>>>>>> where the key is not always a exact match.
>>>>>>>>>>>>>>>> I was wondering if there is a way to lookup keys
>>>> based
>>>>> on
>>>>>>>> regex.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> For example,
>>>>>>>>>>>>>>>> I have these entries for a ktable:
>>>>>>>>>>>>>>>> test_host1,{ "source": "test_host", "UL1":
>>>> "test1_l1" }
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> test_host2,{ "source": "test_host2", "UL1":
>>>> "test2_l2" }
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> test_host3,{ "source": "test_host3", "UL1":
>>>> "test3_l3" }
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> blah,{ "source": "blah_host", "UL1": "blah_l3" }
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> and this for a kstream:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> test_host,{ "source": "test_host", "custom": {
>> "test
>>>> ":
>>>>> {
>>>>>>>>>>>>>> "creation_time ":
>>>>>>>>>>>>>>>> "1234 " } } }
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> In this case, if the exact match does not work, I
>>>> would
>>>>>> like
>>>>>>>> to
>>>>>>>>>> lookup
>>>>>>>>>>>>>>>> ktable for all entries that contains "test_host*"
>> in
>>>> it
>>>>>> and
>>>>>>>> have
>>>>>>>>>>>>>>>> application logic to determine what would be the
>> best
>>>>> fit.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Appreciate input.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> - Shekar
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
>> 

Re: Kafka streams regex match

Posted by Damian Guy <da...@gmail.com>.
Hi Shekar, that warning is expected during rebalances and should generally
resolve itself.
How many threads/app instances are you running?
It is impossible to tell what is happening with the full logs.

Thanks,
Damian

On Mon, 7 Aug 2017 at 22:46 Shekar Tippur <ct...@gmail.com> wrote:

> Damien,
>
> Thanks for pointing out the error. I had tried a different version of
> initializing the store.
>
> Now that I am able to compile, I started to get the below error. I looked
> up other suggestions for the same error and followed up to upgrade Kafka to
> 0.11.0.0 version. I still get this error :/
>
> [2017-08-07 14:40:41,264] WARN stream-thread
> [streams-pipe-b67a7ffa-5535-4311-8886-ad6362617dc5-StreamThread-1] Could
> not create task 0_0. Will retry:
> (org.apache.kafka.streams.processor.internals.StreamThread)
>
> org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock
> the state directory for task 0_0
>
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:99)
>
> at
>
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:80)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:111)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>
> On Fri, Aug 4, 2017 at 4:16 PM, Shekar Tippur <ct...@gmail.com> wrote:
>
> > Damian,
> >
> > I am getting a syntax error. I have responded on gist.
> > Appreciate any inputs.
> >
> > - Shekar
> >
> > On Sat, Jul 29, 2017 at 1:57 AM, Damian Guy <da...@gmail.com>
> wrote:
> >
> >> Hi,
> >>
> >> I left a comment on your gist.
> >>
> >> Thanks,
> >> Damian
> >>
> >> On Fri, 28 Jul 2017 at 21:50 Shekar Tippur <ct...@gmail.com> wrote:
> >>
> >> > Damien,
> >> >
> >> > Here is a public gist:
> >> > https://gist.github.com/ctippur/9f0900b1719793d0c67f5bb143d16ec8
> >> >
> >> > - Shekar
> >> >
> >> > On Fri, Jul 28, 2017 at 11:45 AM, Damian Guy <da...@gmail.com>
> >> wrote:
> >> >
> >> > > It might be easier if you make a github gist with your code. It is
> >> quite
> >> > > difficult to see what is happening in an email.
> >> > >
> >> > > Cheers,
> >> > > Damian
> >> > > On Fri, 28 Jul 2017 at 19:22, Shekar Tippur <ct...@gmail.com>
> >> wrote:
> >> > >
> >> > > > Thanks a lot Damien.
> >> > > > I am able to get to see if the join worked (using foreach). I
> tried
> >> to
> >> > > add
> >> > > > the logic to query the store after starting the streams:
> >> > > > Looks like the code is not getting there. Here is the modified
> code:
> >> > > >
> >> > > > KafkaStreams streams = new KafkaStreams(builder, props);
> >> > > >
> >> > > > streams.start();
> >> > > >
> >> > > >
> >> > > > parser.foreach(new ForeachAction<String, JsonNode>() {
> >> > > >     @Override
> >> > > >     public void apply(String key, JsonNode value) {
> >> > > >         System.out.println(key + ": " + value);
> >> > > >         if (value == null){
> >> > > >             System.out.println("null match");
> >> > > >             ReadOnlyKeyValueStore<String, Long> keyValueStore =
> >> > > >                     null;
> >> > > >             try {
> >> > > >                 keyValueStore =
> >> > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> >> > > > QueryableStoreTypes.keyValueStore(), streams);
> >> > > >             } catch (InterruptedException e) {
> >> > > >                 e.printStackTrace();
> >> > > >             }
> >> > > >
> >> > > >             KeyValueIterator  kviterator =
> >> > > > keyValueStore.range("test_nod","test_node");
> >> > > >         }
> >> > > >     }
> >> > > > });
> >> > > >
> >> > > >
> >> > > > On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy <
> damian.guy@gmail.com>
> >> > > wrote:
> >> > > >
> >> > > > > Hi,
> >> > > > > The store won't be queryable until after you have called
> >> > > streams.start().
> >> > > > > No stores have been created until the application is up and
> >> running
> >> > and
> >> > > > > they are dependent on the underlying partitions.
> >> > > > >
> >> > > > > To check that a stateful operation has produced a result you
> would
> >> > > > normally
> >> > > > > add another operation after the join, i.e.,
> >> > > > > stream.join(other,...).foreach(..) or
> stream.join(other,...).to("
> >> > > topic")
> >> > > > >
> >> > > > > Thanks,
> >> > > > > Damian
> >> > > > >
> >> > > > > On Thu, 27 Jul 2017 at 22:52 Shekar Tippur <ct...@gmail.com>
> >> > wrote:
> >> > > > >
> >> > > > > > One more thing.. How do we check if the stateful join
> operation
> >> > > > resulted
> >> > > > > in
> >> > > > > > a kstream of some value in it (size of kstream)? How do we
> check
> >> > the
> >> > > > > > content of a kstream?
> >> > > > > >
> >> > > > > > - S
> >> > > > > >
> >> > > > > > On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur <
> >> ctippur@gmail.com>
> >> > > > > wrote:
> >> > > > > >
> >> > > > > > > Damien,
> >> > > > > > >
> >> > > > > > > Thanks a lot for pointing out.
> >> > > > > > >
> >> > > > > > > I got a little further. I am kind of stuck with the
> >> sequencing.
> >> > > > Couple
> >> > > > > of
> >> > > > > > > issues:
> >> > > > > > > 1. I cannot initialise KafkaStreams before the parser.to().
> >> > > > > > > 2. Do I need to create a new KafkaStreams object when I
> >> create a
> >> > > > > > > KeyValueStore?
> >> > > > > > > 3. How do I initialize KeyValueIterator with <String,
> >> JsonNode> I
> >> > > > seem
> >> > > > > to
> >> > > > > > > get a error when I try:
> >> > > > > > > *KeyValueIterator <String,JsonNode> kviterator
> >> > > > > > > = keyValueStore.range("test_nod","test_node");*
> >> > > > > > >
> >> > > > > > > /////// START CODE /////////
> >> > > > > > > //parser is a kstream as a result of join
> >> > > > > > > if (parser.toString().matches("null")){
> >> > > > > > >
> >> > > > > > >     ReadOnlyKeyValueStore<String, Long> keyValueStore =
> >> > > > > > >             null;
> >> > > > > > >     KafkaStreams newstreams = new KafkaStreams(builder,
> >> props);
> >> > > > > > >     try {
> >> > > > > > >         keyValueStore =
> >> > > > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> >> > > > > > > QueryableStoreTypes.keyValueStore(), newstreams);
> >> > > > > > >     } catch (InterruptedException e) {
> >> > > > > > >         e.printStackTrace();
> >> > > > > > >     }
> >> > > > > > > *    KeyValueIterator kviterator
> >> > > > > > > = keyValueStore.range("test_nod","test_node");*
> >> > > > > > > }else {
> >> > > > > > >
> >> > > > > > > *    parser.to <http://parser.to>(stringSerde, jsonSerde,
> >> > > > "parser");*}
> >> > > > > > >
> >> > > > > > > *KafkaStreams streams = new KafkaStreams(builder, props);*
> >> > > > > > > streams.start();
> >> > > > > > >
> >> > > > > > > /////// END CODE /////////
> >> > > > > > >
> >> > > > > > > - S
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy <
> >> > damian.guy@gmail.com
> >> > > >
> >> > > > > > wrote:
> >> > > > > > > >
> >> > > > > > > > It is part of the ReadOnlyKeyValueStore interface:
> >> > > > > > > >
> >> > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/
> >> > > > > > > main/java/org/apache/kafka/streams/state/
> >> > > ReadOnlyKeyValueStore.java
> >> > > > > > > >
> >> > > > > > > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur <
> >> ctippur@gmail.com>
> >> > > > > wrote:
> >> > > > > > > >
> >> > > > > > > > > That's cool. This feature is a part of rocksdb object
> and
> >> not
> >> > > > > ktable?
> >> > > > > > > > >
> >> > > > > > > > > Sent from my iPhone
> >> > > > > > > > >
> >> > > > > > > > > > On Jul 27, 2017, at 07:57, Damian Guy <
> >> > damian.guy@gmail.com>
> >> > > > > > wrote:
> >> > > > > > > > > >
> >> > > > > > > > > > Yes they can be strings,
> >> > > > > > > > > >
> >> > > > > > > > > > so you could do something like:
> >> > > > > > > > > > store.range("test_host", "test_hosu");
> >> > > > > > > > > >
> >> > > > > > > > > > This would return an iterator containing all of the
> >> values
> >> > > > > > > (inclusive)
> >> > > > > > > > > from
> >> > > > > > > > > > "test_host" -> "test_hosu".
> >> > > > > > > > > >
> >> > > > > > > > > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur <
> >> > > ctippur@gmail.com
> >> > > > >
> >> > > > > > > wrote:
> >> > > > > > > > > >>
> >> > > > > > > > > >> Can you please point me to an example? Can from and
> to
> >> be
> >> > a
> >> > > > > > string?
> >> > > > > > > > > >>
> >> > > > > > > > > >> Sent from my iPhone
> >> > > > > > > > > >>
> >> > > > > > > > > >>> On Jul 27, 2017, at 04:04, Damian Guy <
> >> > > damian.guy@gmail.com>
> >> > > > > > > wrote:
> >> > > > > > > > > >>>
> >> > > > > > > > > >>> Hi,
> >> > > > > > > > > >>>
> >> > > > > > > > > >>> You can't use a regex, but you could use a range
> >> query.
> >> > > > > > > > > >>> i.e, keyValueStore.range(from, to)
> >> > > > > > > > > >>>
> >> > > > > > > > > >>> Thanks,
> >> > > > > > > > > >>> Damian
> >> > > > > > > > > >>>
> >> > > > > > > > > >>>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur <
> >> > > > ctippur@gmail.com
> >> > > > > >
> >> > > > > > > wrote:
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> Hello,
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> I am able to get the kstream to ktable join work. I
> >> have
> >> > > > some
> >> > > > > > use
> >> > > > > > > > > cases
> >> > > > > > > > > >>>> where the key is not always a exact match.
> >> > > > > > > > > >>>> I was wondering if there is a way to lookup keys
> >> based
> >> > on
> >> > > > > regex.
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> For example,
> >> > > > > > > > > >>>> I have these entries for a ktable:
> >> > > > > > > > > >>>> test_host1,{ "source": "test_host", "UL1":
> >> "test1_l1" }
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> test_host2,{ "source": "test_host2", "UL1":
> >> "test2_l2" }
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> test_host3,{ "source": "test_host3", "UL1":
> >> "test3_l3" }
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> blah,{ "source": "blah_host", "UL1": "blah_l3" }
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> and this for a kstream:
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> test_host,{ "source": "test_host", "custom": {
> "test
> >> ":
> >> > {
> >> > > > > > > > > >> "creation_time ":
> >> > > > > > > > > >>>> "1234 " } } }
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> In this case, if the exact match does not work, I
> >> would
> >> > > like
> >> > > > > to
> >> > > > > > > lookup
> >> > > > > > > > > >>>> ktable for all entries that contains "test_host*"
> in
> >> it
> >> > > and
> >> > > > > have
> >> > > > > > > > > >>>> application logic to determine what would be the
> best
> >> > fit.
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> Appreciate input.
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> - Shekar
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>
> >> > > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Re: Kafka streams regex match

Posted by Shekar Tippur <ct...@gmail.com>.
Damien,

Thanks for pointing out the error. I had tried a different version of
initializing the store.

Now that I am able to compile, I started to get the below error. I looked
up other suggestions for the same error and followed up to upgrade Kafka to
0.11.0.0 version. I still get this error :/

[2017-08-07 14:40:41,264] WARN stream-thread
[streams-pipe-b67a7ffa-5535-4311-8886-ad6362617dc5-StreamThread-1] Could
not create task 0_0. Will retry:
(org.apache.kafka.streams.processor.internals.StreamThread)

org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock
the state directory for task 0_0

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:99)

at
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:80)

at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:111)

at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)

at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)

at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)

at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)

at
org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)

at
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)

at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)

at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)

at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)

On Fri, Aug 4, 2017 at 4:16 PM, Shekar Tippur <ct...@gmail.com> wrote:

> Damian,
>
> I am getting a syntax error. I have responded on gist.
> Appreciate any inputs.
>
> - Shekar
>
> On Sat, Jul 29, 2017 at 1:57 AM, Damian Guy <da...@gmail.com> wrote:
>
>> Hi,
>>
>> I left a comment on your gist.
>>
>> Thanks,
>> Damian
>>
>> On Fri, 28 Jul 2017 at 21:50 Shekar Tippur <ct...@gmail.com> wrote:
>>
>> > Damien,
>> >
>> > Here is a public gist:
>> > https://gist.github.com/ctippur/9f0900b1719793d0c67f5bb143d16ec8
>> >
>> > - Shekar
>> >
>> > On Fri, Jul 28, 2017 at 11:45 AM, Damian Guy <da...@gmail.com>
>> wrote:
>> >
>> > > It might be easier if you make a github gist with your code. It is
>> quite
>> > > difficult to see what is happening in an email.
>> > >
>> > > Cheers,
>> > > Damian
>> > > On Fri, 28 Jul 2017 at 19:22, Shekar Tippur <ct...@gmail.com>
>> wrote:
>> > >
>> > > > Thanks a lot Damien.
>> > > > I am able to get to see if the join worked (using foreach). I tried
>> to
>> > > add
>> > > > the logic to query the store after starting the streams:
>> > > > Looks like the code is not getting there. Here is the modified code:
>> > > >
>> > > > KafkaStreams streams = new KafkaStreams(builder, props);
>> > > >
>> > > > streams.start();
>> > > >
>> > > >
>> > > > parser.foreach(new ForeachAction<String, JsonNode>() {
>> > > >     @Override
>> > > >     public void apply(String key, JsonNode value) {
>> > > >         System.out.println(key + ": " + value);
>> > > >         if (value == null){
>> > > >             System.out.println("null match");
>> > > >             ReadOnlyKeyValueStore<String, Long> keyValueStore =
>> > > >                     null;
>> > > >             try {
>> > > >                 keyValueStore =
>> > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
>> > > > QueryableStoreTypes.keyValueStore(), streams);
>> > > >             } catch (InterruptedException e) {
>> > > >                 e.printStackTrace();
>> > > >             }
>> > > >
>> > > >             KeyValueIterator  kviterator =
>> > > > keyValueStore.range("test_nod","test_node");
>> > > >         }
>> > > >     }
>> > > > });
>> > > >
>> > > >
>> > > > On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy <da...@gmail.com>
>> > > wrote:
>> > > >
>> > > > > Hi,
>> > > > > The store won't be queryable until after you have called
>> > > streams.start().
>> > > > > No stores have been created until the application is up and
>> running
>> > and
>> > > > > they are dependent on the underlying partitions.
>> > > > >
>> > > > > To check that a stateful operation has produced a result you would
>> > > > normally
>> > > > > add another operation after the join, i.e.,
>> > > > > stream.join(other,...).foreach(..) or stream.join(other,...).to("
>> > > topic")
>> > > > >
>> > > > > Thanks,
>> > > > > Damian
>> > > > >
>> > > > > On Thu, 27 Jul 2017 at 22:52 Shekar Tippur <ct...@gmail.com>
>> > wrote:
>> > > > >
>> > > > > > One more thing.. How do we check if the stateful join operation
>> > > > resulted
>> > > > > in
>> > > > > > a kstream of some value in it (size of kstream)? How do we check
>> > the
>> > > > > > content of a kstream?
>> > > > > >
>> > > > > > - S
>> > > > > >
>> > > > > > On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur <
>> ctippur@gmail.com>
>> > > > > wrote:
>> > > > > >
>> > > > > > > Damien,
>> > > > > > >
>> > > > > > > Thanks a lot for pointing out.
>> > > > > > >
>> > > > > > > I got a little further. I am kind of stuck with the
>> sequencing.
>> > > > Couple
>> > > > > of
>> > > > > > > issues:
>> > > > > > > 1. I cannot initialise KafkaStreams before the parser.to().
>> > > > > > > 2. Do I need to create a new KafkaStreams object when I
>> create a
>> > > > > > > KeyValueStore?
>> > > > > > > 3. How do I initialize KeyValueIterator with <String,
>> JsonNode> I
>> > > > seem
>> > > > > to
>> > > > > > > get a error when I try:
>> > > > > > > *KeyValueIterator <String,JsonNode> kviterator
>> > > > > > > = keyValueStore.range("test_nod","test_node");*
>> > > > > > >
>> > > > > > > /////// START CODE /////////
>> > > > > > > //parser is a kstream as a result of join
>> > > > > > > if (parser.toString().matches("null")){
>> > > > > > >
>> > > > > > >     ReadOnlyKeyValueStore<String, Long> keyValueStore =
>> > > > > > >             null;
>> > > > > > >     KafkaStreams newstreams = new KafkaStreams(builder,
>> props);
>> > > > > > >     try {
>> > > > > > >         keyValueStore =
>> > > > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
>> > > > > > > QueryableStoreTypes.keyValueStore(), newstreams);
>> > > > > > >     } catch (InterruptedException e) {
>> > > > > > >         e.printStackTrace();
>> > > > > > >     }
>> > > > > > > *    KeyValueIterator kviterator
>> > > > > > > = keyValueStore.range("test_nod","test_node");*
>> > > > > > > }else {
>> > > > > > >
>> > > > > > > *    parser.to <http://parser.to>(stringSerde, jsonSerde,
>> > > > "parser");*}
>> > > > > > >
>> > > > > > > *KafkaStreams streams = new KafkaStreams(builder, props);*
>> > > > > > > streams.start();
>> > > > > > >
>> > > > > > > /////// END CODE /////////
>> > > > > > >
>> > > > > > > - S
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy <
>> > damian.guy@gmail.com
>> > > >
>> > > > > > wrote:
>> > > > > > > >
>> > > > > > > > It is part of the ReadOnlyKeyValueStore interface:
>> > > > > > > >
>> > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/
>> > > > > > > main/java/org/apache/kafka/streams/state/
>> > > ReadOnlyKeyValueStore.java
>> > > > > > > >
>> > > > > > > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur <
>> ctippur@gmail.com>
>> > > > > wrote:
>> > > > > > > >
>> > > > > > > > > That's cool. This feature is a part of rocksdb object and
>> not
>> > > > > ktable?
>> > > > > > > > >
>> > > > > > > > > Sent from my iPhone
>> > > > > > > > >
>> > > > > > > > > > On Jul 27, 2017, at 07:57, Damian Guy <
>> > damian.guy@gmail.com>
>> > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > Yes they can be strings,
>> > > > > > > > > >
>> > > > > > > > > > so you could do something like:
>> > > > > > > > > > store.range("test_host", "test_hosu");
>> > > > > > > > > >
>> > > > > > > > > > This would return an iterator containing all of the
>> values
>> > > > > > > (inclusive)
>> > > > > > > > > from
>> > > > > > > > > > "test_host" -> "test_hosu".
>> > > > > > > > > >
>> > > > > > > > > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur <
>> > > ctippur@gmail.com
>> > > > >
>> > > > > > > wrote:
>> > > > > > > > > >>
>> > > > > > > > > >> Can you please point me to an example? Can from and to
>> be
>> > a
>> > > > > > string?
>> > > > > > > > > >>
>> > > > > > > > > >> Sent from my iPhone
>> > > > > > > > > >>
>> > > > > > > > > >>> On Jul 27, 2017, at 04:04, Damian Guy <
>> > > damian.guy@gmail.com>
>> > > > > > > wrote:
>> > > > > > > > > >>>
>> > > > > > > > > >>> Hi,
>> > > > > > > > > >>>
>> > > > > > > > > >>> You can't use a regex, but you could use a range
>> query.
>> > > > > > > > > >>> i.e, keyValueStore.range(from, to)
>> > > > > > > > > >>>
>> > > > > > > > > >>> Thanks,
>> > > > > > > > > >>> Damian
>> > > > > > > > > >>>
>> > > > > > > > > >>>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur <
>> > > > ctippur@gmail.com
>> > > > > >
>> > > > > > > wrote:
>> > > > > > > > > >>>>
>> > > > > > > > > >>>> Hello,
>> > > > > > > > > >>>>
>> > > > > > > > > >>>> I am able to get the kstream to ktable join work. I
>> have
>> > > > some
>> > > > > > use
>> > > > > > > > > cases
>> > > > > > > > > >>>> where the key is not always a exact match.
>> > > > > > > > > >>>> I was wondering if there is a way to lookup keys
>> based
>> > on
>> > > > > regex.
>> > > > > > > > > >>>>
>> > > > > > > > > >>>> For example,
>> > > > > > > > > >>>> I have these entries for a ktable:
>> > > > > > > > > >>>> test_host1,{ "source": "test_host", "UL1":
>> "test1_l1" }
>> > > > > > > > > >>>>
>> > > > > > > > > >>>> test_host2,{ "source": "test_host2", "UL1":
>> "test2_l2" }
>> > > > > > > > > >>>>
>> > > > > > > > > >>>> test_host3,{ "source": "test_host3", "UL1":
>> "test3_l3" }
>> > > > > > > > > >>>>
>> > > > > > > > > >>>> blah,{ "source": "blah_host", "UL1": "blah_l3" }
>> > > > > > > > > >>>>
>> > > > > > > > > >>>> and this for a kstream:
>> > > > > > > > > >>>>
>> > > > > > > > > >>>> test_host,{ "source": "test_host", "custom": { "test
>> ":
>> > {
>> > > > > > > > > >> "creation_time ":
>> > > > > > > > > >>>> "1234 " } } }
>> > > > > > > > > >>>>
>> > > > > > > > > >>>> In this case, if the exact match does not work, I
>> would
>> > > like
>> > > > > to
>> > > > > > > lookup
>> > > > > > > > > >>>> ktable for all entries that contains "test_host*" in
>> it
>> > > and
>> > > > > have
>> > > > > > > > > >>>> application logic to determine what would be the best
>> > fit.
>> > > > > > > > > >>>>
>> > > > > > > > > >>>> Appreciate input.
>> > > > > > > > > >>>>
>> > > > > > > > > >>>> - Shekar
>> > > > > > > > > >>>>
>> > > > > > > > > >>
>> > > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Re: Kafka streams regex match

Posted by Shekar Tippur <ct...@gmail.com>.
Damian,

I am getting a syntax error. I have responded on gist.
Appreciate any inputs.

- Shekar

On Sat, Jul 29, 2017 at 1:57 AM, Damian Guy <da...@gmail.com> wrote:

> Hi,
>
> I left a comment on your gist.
>
> Thanks,
> Damian
>
> On Fri, 28 Jul 2017 at 21:50 Shekar Tippur <ct...@gmail.com> wrote:
>
> > Damien,
> >
> > Here is a public gist:
> > https://gist.github.com/ctippur/9f0900b1719793d0c67f5bb143d16ec8
> >
> > - Shekar
> >
> > On Fri, Jul 28, 2017 at 11:45 AM, Damian Guy <da...@gmail.com>
> wrote:
> >
> > > It might be easier if you make a github gist with your code. It is
> quite
> > > difficult to see what is happening in an email.
> > >
> > > Cheers,
> > > Damian
> > > On Fri, 28 Jul 2017 at 19:22, Shekar Tippur <ct...@gmail.com> wrote:
> > >
> > > > Thanks a lot Damien.
> > > > I am able to get to see if the join worked (using foreach). I tried
> to
> > > add
> > > > the logic to query the store after starting the streams:
> > > > Looks like the code is not getting there. Here is the modified code:
> > > >
> > > > KafkaStreams streams = new KafkaStreams(builder, props);
> > > >
> > > > streams.start();
> > > >
> > > >
> > > > parser.foreach(new ForeachAction<String, JsonNode>() {
> > > >     @Override
> > > >     public void apply(String key, JsonNode value) {
> > > >         System.out.println(key + ": " + value);
> > > >         if (value == null){
> > > >             System.out.println("null match");
> > > >             ReadOnlyKeyValueStore<String, Long> keyValueStore =
> > > >                     null;
> > > >             try {
> > > >                 keyValueStore =
> > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > > > QueryableStoreTypes.keyValueStore(), streams);
> > > >             } catch (InterruptedException e) {
> > > >                 e.printStackTrace();
> > > >             }
> > > >
> > > >             KeyValueIterator  kviterator =
> > > > keyValueStore.range("test_nod","test_node");
> > > >         }
> > > >     }
> > > > });
> > > >
> > > >
> > > > On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy <da...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi,
> > > > > The store won't be queryable until after you have called
> > > streams.start().
> > > > > No stores have been created until the application is up and running
> > and
> > > > > they are dependent on the underlying partitions.
> > > > >
> > > > > To check that a stateful operation has produced a result you would
> > > > normally
> > > > > add another operation after the join, i.e.,
> > > > > stream.join(other,...).foreach(..) or stream.join(other,...).to("
> > > topic")
> > > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > > > On Thu, 27 Jul 2017 at 22:52 Shekar Tippur <ct...@gmail.com>
> > wrote:
> > > > >
> > > > > > One more thing.. How do we check if the stateful join operation
> > > > resulted
> > > > > in
> > > > > > a kstream of some value in it (size of kstream)? How do we check
> > the
> > > > > > content of a kstream?
> > > > > >
> > > > > > - S
> > > > > >
> > > > > > On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur <
> ctippur@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Damien,
> > > > > > >
> > > > > > > Thanks a lot for pointing out.
> > > > > > >
> > > > > > > I got a little further. I am kind of stuck with the sequencing.
> > > > Couple
> > > > > of
> > > > > > > issues:
> > > > > > > 1. I cannot initialise KafkaStreams before the parser.to().
> > > > > > > 2. Do I need to create a new KafkaStreams object when I create
> a
> > > > > > > KeyValueStore?
> > > > > > > 3. How do I initialize KeyValueIterator with <String,
> JsonNode> I
> > > > seem
> > > > > to
> > > > > > > get a error when I try:
> > > > > > > *KeyValueIterator <String,JsonNode> kviterator
> > > > > > > = keyValueStore.range("test_nod","test_node");*
> > > > > > >
> > > > > > > /////// START CODE /////////
> > > > > > > //parser is a kstream as a result of join
> > > > > > > if (parser.toString().matches("null")){
> > > > > > >
> > > > > > >     ReadOnlyKeyValueStore<String, Long> keyValueStore =
> > > > > > >             null;
> > > > > > >     KafkaStreams newstreams = new KafkaStreams(builder, props);
> > > > > > >     try {
> > > > > > >         keyValueStore =
> > > > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > > > > > > QueryableStoreTypes.keyValueStore(), newstreams);
> > > > > > >     } catch (InterruptedException e) {
> > > > > > >         e.printStackTrace();
> > > > > > >     }
> > > > > > > *    KeyValueIterator kviterator
> > > > > > > = keyValueStore.range("test_nod","test_node");*
> > > > > > > }else {
> > > > > > >
> > > > > > > *    parser.to <http://parser.to>(stringSerde, jsonSerde,
> > > > "parser");*}
> > > > > > >
> > > > > > > *KafkaStreams streams = new KafkaStreams(builder, props);*
> > > > > > > streams.start();
> > > > > > >
> > > > > > > /////// END CODE /////////
> > > > > > >
> > > > > > > - S
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy <
> > damian.guy@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > It is part of the ReadOnlyKeyValueStore interface:
> > > > > > > >
> > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > > > > > > main/java/org/apache/kafka/streams/state/
> > > ReadOnlyKeyValueStore.java
> > > > > > > >
> > > > > > > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur <
> ctippur@gmail.com>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > That's cool. This feature is a part of rocksdb object and
> not
> > > > > ktable?
> > > > > > > > >
> > > > > > > > > Sent from my iPhone
> > > > > > > > >
> > > > > > > > > > On Jul 27, 2017, at 07:57, Damian Guy <
> > damian.guy@gmail.com>
> > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > Yes they can be strings,
> > > > > > > > > >
> > > > > > > > > > so you could do something like:
> > > > > > > > > > store.range("test_host", "test_hosu");
> > > > > > > > > >
> > > > > > > > > > This would return an iterator containing all of the
> values
> > > > > > > (inclusive)
> > > > > > > > > from
> > > > > > > > > > "test_host" -> "test_hosu".
> > > > > > > > > >
> > > > > > > > > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur <
> > > ctippur@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > > > >>
> > > > > > > > > >> Can you please point me to an example? Can from and to
> be
> > a
> > > > > > string?
> > > > > > > > > >>
> > > > > > > > > >> Sent from my iPhone
> > > > > > > > > >>
> > > > > > > > > >>> On Jul 27, 2017, at 04:04, Damian Guy <
> > > damian.guy@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >>>
> > > > > > > > > >>> Hi,
> > > > > > > > > >>>
> > > > > > > > > >>> You can't use a regex, but you could use a range query.
> > > > > > > > > >>> i.e, keyValueStore.range(from, to)
> > > > > > > > > >>>
> > > > > > > > > >>> Thanks,
> > > > > > > > > >>> Damian
> > > > > > > > > >>>
> > > > > > > > > >>>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur <
> > > > ctippur@gmail.com
> > > > > >
> > > > > > > wrote:
> > > > > > > > > >>>>
> > > > > > > > > >>>> Hello,
> > > > > > > > > >>>>
> > > > > > > > > >>>> I am able to get the kstream to ktable join work. I
> have
> > > > some
> > > > > > use
> > > > > > > > > cases
> > > > > > > > > >>>> where the key is not always a exact match.
> > > > > > > > > >>>> I was wondering if there is a way to lookup keys based
> > on
> > > > > regex.
> > > > > > > > > >>>>
> > > > > > > > > >>>> For example,
> > > > > > > > > >>>> I have these entries for a ktable:
> > > > > > > > > >>>> test_host1,{ "source": "test_host", "UL1": "test1_l1"
> }
> > > > > > > > > >>>>
> > > > > > > > > >>>> test_host2,{ "source": "test_host2", "UL1":
> "test2_l2" }
> > > > > > > > > >>>>
> > > > > > > > > >>>> test_host3,{ "source": "test_host3", "UL1":
> "test3_l3" }
> > > > > > > > > >>>>
> > > > > > > > > >>>> blah,{ "source": "blah_host", "UL1": "blah_l3" }
> > > > > > > > > >>>>
> > > > > > > > > >>>> and this for a kstream:
> > > > > > > > > >>>>
> > > > > > > > > >>>> test_host,{ "source": "test_host", "custom": { "test
> ":
> > {
> > > > > > > > > >> "creation_time ":
> > > > > > > > > >>>> "1234 " } } }
> > > > > > > > > >>>>
> > > > > > > > > >>>> In this case, if the exact match does not work, I
> would
> > > like
> > > > > to
> > > > > > > lookup
> > > > > > > > > >>>> ktable for all entries that contains "test_host*" in
> it
> > > and
> > > > > have
> > > > > > > > > >>>> application logic to determine what would be the best
> > fit.
> > > > > > > > > >>>>
> > > > > > > > > >>>> Appreciate input.
> > > > > > > > > >>>>
> > > > > > > > > >>>> - Shekar
> > > > > > > > > >>>>
> > > > > > > > > >>
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Kafka streams regex match

Posted by Damian Guy <da...@gmail.com>.
Hi,

I left a comment on your gist.

Thanks,
Damian

On Fri, 28 Jul 2017 at 21:50 Shekar Tippur <ct...@gmail.com> wrote:

> Damien,
>
> Here is a public gist:
> https://gist.github.com/ctippur/9f0900b1719793d0c67f5bb143d16ec8
>
> - Shekar
>
> On Fri, Jul 28, 2017 at 11:45 AM, Damian Guy <da...@gmail.com> wrote:
>
> > It might be easier if you make a github gist with your code. It is quite
> > difficult to see what is happening in an email.
> >
> > Cheers,
> > Damian
> > On Fri, 28 Jul 2017 at 19:22, Shekar Tippur <ct...@gmail.com> wrote:
> >
> > > Thanks a lot Damien.
> > > I am able to get to see if the join worked (using foreach). I tried to
> > add
> > > the logic to query the store after starting the streams:
> > > Looks like the code is not getting there. Here is the modified code:
> > >
> > > KafkaStreams streams = new KafkaStreams(builder, props);
> > >
> > > streams.start();
> > >
> > >
> > > parser.foreach(new ForeachAction<String, JsonNode>() {
> > >     @Override
> > >     public void apply(String key, JsonNode value) {
> > >         System.out.println(key + ": " + value);
> > >         if (value == null){
> > >             System.out.println("null match");
> > >             ReadOnlyKeyValueStore<String, Long> keyValueStore =
> > >                     null;
> > >             try {
> > >                 keyValueStore =
> > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > > QueryableStoreTypes.keyValueStore(), streams);
> > >             } catch (InterruptedException e) {
> > >                 e.printStackTrace();
> > >             }
> > >
> > >             KeyValueIterator  kviterator =
> > > keyValueStore.range("test_nod","test_node");
> > >         }
> > >     }
> > > });
> > >
> > >
> > > On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy <da...@gmail.com>
> > wrote:
> > >
> > > > Hi,
> > > > The store won't be queryable until after you have called
> > streams.start().
> > > > No stores have been created until the application is up and running
> and
> > > > they are dependent on the underlying partitions.
> > > >
> > > > To check that a stateful operation has produced a result you would
> > > normally
> > > > add another operation after the join, i.e.,
> > > > stream.join(other,...).foreach(..) or stream.join(other,...).to("
> > topic")
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Thu, 27 Jul 2017 at 22:52 Shekar Tippur <ct...@gmail.com>
> wrote:
> > > >
> > > > > One more thing.. How do we check if the stateful join operation
> > > resulted
> > > > in
> > > > > a kstream of some value in it (size of kstream)? How do we check
> the
> > > > > content of a kstream?
> > > > >
> > > > > - S
> > > > >
> > > > > On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur <ct...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Damien,
> > > > > >
> > > > > > Thanks a lot for pointing out.
> > > > > >
> > > > > > I got a little further. I am kind of stuck with the sequencing.
> > > Couple
> > > > of
> > > > > > issues:
> > > > > > 1. I cannot initialise KafkaStreams before the parser.to().
> > > > > > 2. Do I need to create a new KafkaStreams object when I create a
> > > > > > KeyValueStore?
> > > > > > 3. How do I initialize KeyValueIterator with <String, JsonNode> I
> > > seem
> > > > to
> > > > > > get a error when I try:
> > > > > > *KeyValueIterator <String,JsonNode> kviterator
> > > > > > = keyValueStore.range("test_nod","test_node");*
> > > > > >
> > > > > > /////// START CODE /////////
> > > > > > //parser is a kstream as a result of join
> > > > > > if (parser.toString().matches("null")){
> > > > > >
> > > > > >     ReadOnlyKeyValueStore<String, Long> keyValueStore =
> > > > > >             null;
> > > > > >     KafkaStreams newstreams = new KafkaStreams(builder, props);
> > > > > >     try {
> > > > > >         keyValueStore =
> > > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > > > > > QueryableStoreTypes.keyValueStore(), newstreams);
> > > > > >     } catch (InterruptedException e) {
> > > > > >         e.printStackTrace();
> > > > > >     }
> > > > > > *    KeyValueIterator kviterator
> > > > > > = keyValueStore.range("test_nod","test_node");*
> > > > > > }else {
> > > > > >
> > > > > > *    parser.to <http://parser.to>(stringSerde, jsonSerde,
> > > "parser");*}
> > > > > >
> > > > > > *KafkaStreams streams = new KafkaStreams(builder, props);*
> > > > > > streams.start();
> > > > > >
> > > > > > /////// END CODE /////////
> > > > > >
> > > > > > - S
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy <
> damian.guy@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > It is part of the ReadOnlyKeyValueStore interface:
> > > > > > >
> > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > > > > > main/java/org/apache/kafka/streams/state/
> > ReadOnlyKeyValueStore.java
> > > > > > >
> > > > > > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur <ct...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > > That's cool. This feature is a part of rocksdb object and not
> > > > ktable?
> > > > > > > >
> > > > > > > > Sent from my iPhone
> > > > > > > >
> > > > > > > > > On Jul 27, 2017, at 07:57, Damian Guy <
> damian.guy@gmail.com>
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > Yes they can be strings,
> > > > > > > > >
> > > > > > > > > so you could do something like:
> > > > > > > > > store.range("test_host", "test_hosu");
> > > > > > > > >
> > > > > > > > > This would return an iterator containing all of the values
> > > > > > (inclusive)
> > > > > > > > from
> > > > > > > > > "test_host" -> "test_hosu".
> > > > > > > > >
> > > > > > > > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur <
> > ctippur@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > > >>
> > > > > > > > >> Can you please point me to an example? Can from and to be
> a
> > > > > string?
> > > > > > > > >>
> > > > > > > > >> Sent from my iPhone
> > > > > > > > >>
> > > > > > > > >>> On Jul 27, 2017, at 04:04, Damian Guy <
> > damian.guy@gmail.com>
> > > > > > wrote:
> > > > > > > > >>>
> > > > > > > > >>> Hi,
> > > > > > > > >>>
> > > > > > > > >>> You can't use a regex, but you could use a range query.
> > > > > > > > >>> i.e, keyValueStore.range(from, to)
> > > > > > > > >>>
> > > > > > > > >>> Thanks,
> > > > > > > > >>> Damian
> > > > > > > > >>>
> > > > > > > > >>>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur <
> > > ctippur@gmail.com
> > > > >
> > > > > > wrote:
> > > > > > > > >>>>
> > > > > > > > >>>> Hello,
> > > > > > > > >>>>
> > > > > > > > >>>> I am able to get the kstream to ktable join work. I have
> > > some
> > > > > use
> > > > > > > > cases
> > > > > > > > >>>> where the key is not always a exact match.
> > > > > > > > >>>> I was wondering if there is a way to lookup keys based
> on
> > > > regex.
> > > > > > > > >>>>
> > > > > > > > >>>> For example,
> > > > > > > > >>>> I have these entries for a ktable:
> > > > > > > > >>>> test_host1,{ "source": "test_host", "UL1": "test1_l1" }
> > > > > > > > >>>>
> > > > > > > > >>>> test_host2,{ "source": "test_host2", "UL1": "test2_l2" }
> > > > > > > > >>>>
> > > > > > > > >>>> test_host3,{ "source": "test_host3", "UL1": "test3_l3" }
> > > > > > > > >>>>
> > > > > > > > >>>> blah,{ "source": "blah_host", "UL1": "blah_l3" }
> > > > > > > > >>>>
> > > > > > > > >>>> and this for a kstream:
> > > > > > > > >>>>
> > > > > > > > >>>> test_host,{ "source": "test_host", "custom": { "test ":
> {
> > > > > > > > >> "creation_time ":
> > > > > > > > >>>> "1234 " } } }
> > > > > > > > >>>>
> > > > > > > > >>>> In this case, if the exact match does not work, I would
> > like
> > > > to
> > > > > > lookup
> > > > > > > > >>>> ktable for all entries that contains "test_host*" in it
> > and
> > > > have
> > > > > > > > >>>> application logic to determine what would be the best
> fit.
> > > > > > > > >>>>
> > > > > > > > >>>> Appreciate input.
> > > > > > > > >>>>
> > > > > > > > >>>> - Shekar
> > > > > > > > >>>>
> > > > > > > > >>
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Kafka streams regex match

Posted by Shekar Tippur <ct...@gmail.com>.
Damien,

Here is a public gist:
https://gist.github.com/ctippur/9f0900b1719793d0c67f5bb143d16ec8

- Shekar

On Fri, Jul 28, 2017 at 11:45 AM, Damian Guy <da...@gmail.com> wrote:

> It might be easier if you make a github gist with your code. It is quite
> difficult to see what is happening in an email.
>
> Cheers,
> Damian
> On Fri, 28 Jul 2017 at 19:22, Shekar Tippur <ct...@gmail.com> wrote:
>
> > Thanks a lot Damien.
> > I am able to get to see if the join worked (using foreach). I tried to
> add
> > the logic to query the store after starting the streams:
> > Looks like the code is not getting there. Here is the modified code:
> >
> > KafkaStreams streams = new KafkaStreams(builder, props);
> >
> > streams.start();
> >
> >
> > parser.foreach(new ForeachAction<String, JsonNode>() {
> >     @Override
> >     public void apply(String key, JsonNode value) {
> >         System.out.println(key + ": " + value);
> >         if (value == null){
> >             System.out.println("null match");
> >             ReadOnlyKeyValueStore<String, Long> keyValueStore =
> >                     null;
> >             try {
> >                 keyValueStore =
> > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > QueryableStoreTypes.keyValueStore(), streams);
> >             } catch (InterruptedException e) {
> >                 e.printStackTrace();
> >             }
> >
> >             KeyValueIterator  kviterator =
> > keyValueStore.range("test_nod","test_node");
> >         }
> >     }
> > });
> >
> >
> > On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy <da...@gmail.com>
> wrote:
> >
> > > Hi,
> > > The store won't be queryable until after you have called
> streams.start().
> > > No stores have been created until the application is up and running and
> > > they are dependent on the underlying partitions.
> > >
> > > To check that a stateful operation has produced a result you would
> > normally
> > > add another operation after the join, i.e.,
> > > stream.join(other,...).foreach(..) or stream.join(other,...).to("
> topic")
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Thu, 27 Jul 2017 at 22:52 Shekar Tippur <ct...@gmail.com> wrote:
> > >
> > > > One more thing.. How do we check if the stateful join operation
> > resulted
> > > in
> > > > a kstream of some value in it (size of kstream)? How do we check the
> > > > content of a kstream?
> > > >
> > > > - S
> > > >
> > > > On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur <ct...@gmail.com>
> > > wrote:
> > > >
> > > > > Damien,
> > > > >
> > > > > Thanks a lot for pointing out.
> > > > >
> > > > > I got a little further. I am kind of stuck with the sequencing.
> > Couple
> > > of
> > > > > issues:
> > > > > 1. I cannot initialise KafkaStreams before the parser.to().
> > > > > 2. Do I need to create a new KafkaStreams object when I create a
> > > > > KeyValueStore?
> > > > > 3. How do I initialize KeyValueIterator with <String, JsonNode> I
> > seem
> > > to
> > > > > get a error when I try:
> > > > > *KeyValueIterator <String,JsonNode> kviterator
> > > > > = keyValueStore.range("test_nod","test_node");*
> > > > >
> > > > > /////// START CODE /////////
> > > > > //parser is a kstream as a result of join
> > > > > if (parser.toString().matches("null")){
> > > > >
> > > > >     ReadOnlyKeyValueStore<String, Long> keyValueStore =
> > > > >             null;
> > > > >     KafkaStreams newstreams = new KafkaStreams(builder, props);
> > > > >     try {
> > > > >         keyValueStore =
> > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > > > > QueryableStoreTypes.keyValueStore(), newstreams);
> > > > >     } catch (InterruptedException e) {
> > > > >         e.printStackTrace();
> > > > >     }
> > > > > *    KeyValueIterator kviterator
> > > > > = keyValueStore.range("test_nod","test_node");*
> > > > > }else {
> > > > >
> > > > > *    parser.to <http://parser.to>(stringSerde, jsonSerde,
> > "parser");*}
> > > > >
> > > > > *KafkaStreams streams = new KafkaStreams(builder, props);*
> > > > > streams.start();
> > > > >
> > > > > /////// END CODE /////////
> > > > >
> > > > > - S
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy <damian.guy@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > It is part of the ReadOnlyKeyValueStore interface:
> > > > > >
> > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > > > > main/java/org/apache/kafka/streams/state/
> ReadOnlyKeyValueStore.java
> > > > > >
> > > > > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur <ct...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > That's cool. This feature is a part of rocksdb object and not
> > > ktable?
> > > > > > >
> > > > > > > Sent from my iPhone
> > > > > > >
> > > > > > > > On Jul 27, 2017, at 07:57, Damian Guy <da...@gmail.com>
> > > > wrote:
> > > > > > > >
> > > > > > > > Yes they can be strings,
> > > > > > > >
> > > > > > > > so you could do something like:
> > > > > > > > store.range("test_host", "test_hosu");
> > > > > > > >
> > > > > > > > This would return an iterator containing all of the values
> > > > > (inclusive)
> > > > > > > from
> > > > > > > > "test_host" -> "test_hosu".
> > > > > > > >
> > > > > > > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur <
> ctippur@gmail.com
> > >
> > > > > wrote:
> > > > > > > >>
> > > > > > > >> Can you please point me to an example? Can from and to be a
> > > > string?
> > > > > > > >>
> > > > > > > >> Sent from my iPhone
> > > > > > > >>
> > > > > > > >>> On Jul 27, 2017, at 04:04, Damian Guy <
> damian.guy@gmail.com>
> > > > > wrote:
> > > > > > > >>>
> > > > > > > >>> Hi,
> > > > > > > >>>
> > > > > > > >>> You can't use a regex, but you could use a range query.
> > > > > > > >>> i.e, keyValueStore.range(from, to)
> > > > > > > >>>
> > > > > > > >>> Thanks,
> > > > > > > >>> Damian
> > > > > > > >>>
> > > > > > > >>>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur <
> > ctippur@gmail.com
> > > >
> > > > > wrote:
> > > > > > > >>>>
> > > > > > > >>>> Hello,
> > > > > > > >>>>
> > > > > > > >>>> I am able to get the kstream to ktable join work. I have
> > some
> > > > use
> > > > > > > cases
> > > > > > > >>>> where the key is not always a exact match.
> > > > > > > >>>> I was wondering if there is a way to lookup keys based on
> > > regex.
> > > > > > > >>>>
> > > > > > > >>>> For example,
> > > > > > > >>>> I have these entries for a ktable:
> > > > > > > >>>> test_host1,{ "source": "test_host", "UL1": "test1_l1" }
> > > > > > > >>>>
> > > > > > > >>>> test_host2,{ "source": "test_host2", "UL1": "test2_l2" }
> > > > > > > >>>>
> > > > > > > >>>> test_host3,{ "source": "test_host3", "UL1": "test3_l3" }
> > > > > > > >>>>
> > > > > > > >>>> blah,{ "source": "blah_host", "UL1": "blah_l3" }
> > > > > > > >>>>
> > > > > > > >>>> and this for a kstream:
> > > > > > > >>>>
> > > > > > > >>>> test_host,{ "source": "test_host", "custom": { "test ": {
> > > > > > > >> "creation_time ":
> > > > > > > >>>> "1234 " } } }
> > > > > > > >>>>
> > > > > > > >>>> In this case, if the exact match does not work, I would
> like
> > > to
> > > > > lookup
> > > > > > > >>>> ktable for all entries that contains "test_host*" in it
> and
> > > have
> > > > > > > >>>> application logic to determine what would be the best fit.
> > > > > > > >>>>
> > > > > > > >>>> Appreciate input.
> > > > > > > >>>>
> > > > > > > >>>> - Shekar
> > > > > > > >>>>
> > > > > > > >>
> > > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Kafka streams regex match

Posted by Damian Guy <da...@gmail.com>.
It might be easier if you make a github gist with your code. It is quite
difficult to see what is happening in an email.

Cheers,
Damian
On Fri, 28 Jul 2017 at 19:22, Shekar Tippur <ct...@gmail.com> wrote:

> Thanks a lot Damien.
> I am able to get to see if the join worked (using foreach). I tried to add
> the logic to query the store after starting the streams:
> Looks like the code is not getting there. Here is the modified code:
>
> KafkaStreams streams = new KafkaStreams(builder, props);
>
> streams.start();
>
>
> parser.foreach(new ForeachAction<String, JsonNode>() {
>     @Override
>     public void apply(String key, JsonNode value) {
>         System.out.println(key + ": " + value);
>         if (value == null){
>             System.out.println("null match");
>             ReadOnlyKeyValueStore<String, Long> keyValueStore =
>                     null;
>             try {
>                 keyValueStore =
> IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> QueryableStoreTypes.keyValueStore(), streams);
>             } catch (InterruptedException e) {
>                 e.printStackTrace();
>             }
>
>             KeyValueIterator  kviterator =
> keyValueStore.range("test_nod","test_node");
>         }
>     }
> });
>
>
> On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy <da...@gmail.com> wrote:
>
> > Hi,
> > The store won't be queryable until after you have called streams.start().
> > No stores have been created until the application is up and running and
> > they are dependent on the underlying partitions.
> >
> > To check that a stateful operation has produced a result you would
> normally
> > add another operation after the join, i.e.,
> > stream.join(other,...).foreach(..) or stream.join(other,...).to("topic")
> >
> > Thanks,
> > Damian
> >
> > On Thu, 27 Jul 2017 at 22:52 Shekar Tippur <ct...@gmail.com> wrote:
> >
> > > One more thing.. How do we check if the stateful join operation
> resulted
> > in
> > > a kstream of some value in it (size of kstream)? How do we check the
> > > content of a kstream?
> > >
> > > - S
> > >
> > > On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur <ct...@gmail.com>
> > wrote:
> > >
> > > > Damien,
> > > >
> > > > Thanks a lot for pointing out.
> > > >
> > > > I got a little further. I am kind of stuck with the sequencing.
> Couple
> > of
> > > > issues:
> > > > 1. I cannot initialise KafkaStreams before the parser.to().
> > > > 2. Do I need to create a new KafkaStreams object when I create a
> > > > KeyValueStore?
> > > > 3. How do I initialize KeyValueIterator with <String, JsonNode> I
> seem
> > to
> > > > get a error when I try:
> > > > *KeyValueIterator <String,JsonNode> kviterator
> > > > = keyValueStore.range("test_nod","test_node");*
> > > >
> > > > /////// START CODE /////////
> > > > //parser is a kstream as a result of join
> > > > if (parser.toString().matches("null")){
> > > >
> > > >     ReadOnlyKeyValueStore<String, Long> keyValueStore =
> > > >             null;
> > > >     KafkaStreams newstreams = new KafkaStreams(builder, props);
> > > >     try {
> > > >         keyValueStore =
> > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > > > QueryableStoreTypes.keyValueStore(), newstreams);
> > > >     } catch (InterruptedException e) {
> > > >         e.printStackTrace();
> > > >     }
> > > > *    KeyValueIterator kviterator
> > > > = keyValueStore.range("test_nod","test_node");*
> > > > }else {
> > > >
> > > > *    parser.to <http://parser.to>(stringSerde, jsonSerde,
> "parser");*}
> > > >
> > > > *KafkaStreams streams = new KafkaStreams(builder, props);*
> > > > streams.start();
> > > >
> > > > /////// END CODE /////////
> > > >
> > > > - S
> > > >
> > > >
> > > >
> > > > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy <da...@gmail.com>
> > > wrote:
> > > > >
> > > > > It is part of the ReadOnlyKeyValueStore interface:
> > > > >
> > > > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > > > main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
> > > > >
> > > > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur <ct...@gmail.com>
> > wrote:
> > > > >
> > > > > > That's cool. This feature is a part of rocksdb object and not
> > ktable?
> > > > > >
> > > > > > Sent from my iPhone
> > > > > >
> > > > > > > On Jul 27, 2017, at 07:57, Damian Guy <da...@gmail.com>
> > > wrote:
> > > > > > >
> > > > > > > Yes they can be strings,
> > > > > > >
> > > > > > > so you could do something like:
> > > > > > > store.range("test_host", "test_hosu");
> > > > > > >
> > > > > > > This would return an iterator containing all of the values
> > > > (inclusive)
> > > > > > from
> > > > > > > "test_host" -> "test_hosu".
> > > > > > >
> > > > > > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur <ctippur@gmail.com
> >
> > > > wrote:
> > > > > > >>
> > > > > > >> Can you please point me to an example? Can from and to be a
> > > string?
> > > > > > >>
> > > > > > >> Sent from my iPhone
> > > > > > >>
> > > > > > >>> On Jul 27, 2017, at 04:04, Damian Guy <da...@gmail.com>
> > > > wrote:
> > > > > > >>>
> > > > > > >>> Hi,
> > > > > > >>>
> > > > > > >>> You can't use a regex, but you could use a range query.
> > > > > > >>> i.e, keyValueStore.range(from, to)
> > > > > > >>>
> > > > > > >>> Thanks,
> > > > > > >>> Damian
> > > > > > >>>
> > > > > > >>>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur <
> ctippur@gmail.com
> > >
> > > > wrote:
> > > > > > >>>>
> > > > > > >>>> Hello,
> > > > > > >>>>
> > > > > > >>>> I am able to get the kstream to ktable join work. I have
> some
> > > use
> > > > > > cases
> > > > > > >>>> where the key is not always a exact match.
> > > > > > >>>> I was wondering if there is a way to lookup keys based on
> > regex.
> > > > > > >>>>
> > > > > > >>>> For example,
> > > > > > >>>> I have these entries for a ktable:
> > > > > > >>>> test_host1,{ "source": "test_host", "UL1": "test1_l1" }
> > > > > > >>>>
> > > > > > >>>> test_host2,{ "source": "test_host2", "UL1": "test2_l2" }
> > > > > > >>>>
> > > > > > >>>> test_host3,{ "source": "test_host3", "UL1": "test3_l3" }
> > > > > > >>>>
> > > > > > >>>> blah,{ "source": "blah_host", "UL1": "blah_l3" }
> > > > > > >>>>
> > > > > > >>>> and this for a kstream:
> > > > > > >>>>
> > > > > > >>>> test_host,{ "source": "test_host", "custom": { "test ": {
> > > > > > >> "creation_time ":
> > > > > > >>>> "1234 " } } }
> > > > > > >>>>
> > > > > > >>>> In this case, if the exact match does not work, I would like
> > to
> > > > lookup
> > > > > > >>>> ktable for all entries that contains "test_host*" in it and
> > have
> > > > > > >>>> application logic to determine what would be the best fit.
> > > > > > >>>>
> > > > > > >>>> Appreciate input.
> > > > > > >>>>
> > > > > > >>>> - Shekar
> > > > > > >>>>
> > > > > > >>
> > > > > >
> > > >
> > >
> >
>

Re: Kafka streams regex match

Posted by Shekar Tippur <ct...@gmail.com>.
Thanks a lot Damien.
I am able to get to see if the join worked (using foreach). I tried to add
the logic to query the store after starting the streams:
Looks like the code is not getting there. Here is the modified code:

KafkaStreams streams = new KafkaStreams(builder, props);

streams.start();


parser.foreach(new ForeachAction<String, JsonNode>() {
    @Override
    public void apply(String key, JsonNode value) {
        System.out.println(key + ": " + value);
        if (value == null){
            System.out.println("null match");
            ReadOnlyKeyValueStore<String, Long> keyValueStore =
                    null;
            try {
                keyValueStore =
IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
QueryableStoreTypes.keyValueStore(), streams);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            KeyValueIterator  kviterator =
keyValueStore.range("test_nod","test_node");
        }
    }
});


On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy <da...@gmail.com> wrote:

> Hi,
> The store won't be queryable until after you have called streams.start().
> No stores have been created until the application is up and running and
> they are dependent on the underlying partitions.
>
> To check that a stateful operation has produced a result you would normally
> add another operation after the join, i.e.,
> stream.join(other,...).foreach(..) or stream.join(other,...).to("topic")
>
> Thanks,
> Damian
>
> On Thu, 27 Jul 2017 at 22:52 Shekar Tippur <ct...@gmail.com> wrote:
>
> > One more thing.. How do we check if the stateful join operation resulted
> in
> > a kstream of some value in it (size of kstream)? How do we check the
> > content of a kstream?
> >
> > - S
> >
> > On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur <ct...@gmail.com>
> wrote:
> >
> > > Damien,
> > >
> > > Thanks a lot for pointing out.
> > >
> > > I got a little further. I am kind of stuck with the sequencing. Couple
> of
> > > issues:
> > > 1. I cannot initialise KafkaStreams before the parser.to().
> > > 2. Do I need to create a new KafkaStreams object when I create a
> > > KeyValueStore?
> > > 3. How do I initialize KeyValueIterator with <String, JsonNode> I seem
> to
> > > get a error when I try:
> > > *KeyValueIterator <String,JsonNode> kviterator
> > > = keyValueStore.range("test_nod","test_node");*
> > >
> > > /////// START CODE /////////
> > > //parser is a kstream as a result of join
> > > if (parser.toString().matches("null")){
> > >
> > >     ReadOnlyKeyValueStore<String, Long> keyValueStore =
> > >             null;
> > >     KafkaStreams newstreams = new KafkaStreams(builder, props);
> > >     try {
> > >         keyValueStore =
> > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > > QueryableStoreTypes.keyValueStore(), newstreams);
> > >     } catch (InterruptedException e) {
> > >         e.printStackTrace();
> > >     }
> > > *    KeyValueIterator kviterator
> > > = keyValueStore.range("test_nod","test_node");*
> > > }else {
> > >
> > > *    parser.to <http://parser.to>(stringSerde, jsonSerde, "parser");*}
> > >
> > > *KafkaStreams streams = new KafkaStreams(builder, props);*
> > > streams.start();
> > >
> > > /////// END CODE /////////
> > >
> > > - S
> > >
> > >
> > >
> > > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy <da...@gmail.com>
> > wrote:
> > > >
> > > > It is part of the ReadOnlyKeyValueStore interface:
> > > >
> > > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > > main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
> > > >
> > > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur <ct...@gmail.com>
> wrote:
> > > >
> > > > > That's cool. This feature is a part of rocksdb object and not
> ktable?
> > > > >
> > > > > Sent from my iPhone
> > > > >
> > > > > > On Jul 27, 2017, at 07:57, Damian Guy <da...@gmail.com>
> > wrote:
> > > > > >
> > > > > > Yes they can be strings,
> > > > > >
> > > > > > so you could do something like:
> > > > > > store.range("test_host", "test_hosu");
> > > > > >
> > > > > > This would return an iterator containing all of the values
> > > (inclusive)
> > > > > from
> > > > > > "test_host" -> "test_hosu".
> > > > > >
> > > > > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur <ct...@gmail.com>
> > > wrote:
> > > > > >>
> > > > > >> Can you please point me to an example? Can from and to be a
> > string?
> > > > > >>
> > > > > >> Sent from my iPhone
> > > > > >>
> > > > > >>> On Jul 27, 2017, at 04:04, Damian Guy <da...@gmail.com>
> > > wrote:
> > > > > >>>
> > > > > >>> Hi,
> > > > > >>>
> > > > > >>> You can't use a regex, but you could use a range query.
> > > > > >>> i.e, keyValueStore.range(from, to)
> > > > > >>>
> > > > > >>> Thanks,
> > > > > >>> Damian
> > > > > >>>
> > > > > >>>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur <ctippur@gmail.com
> >
> > > wrote:
> > > > > >>>>
> > > > > >>>> Hello,
> > > > > >>>>
> > > > > >>>> I am able to get the kstream to ktable join work. I have some
> > use
> > > > > cases
> > > > > >>>> where the key is not always a exact match.
> > > > > >>>> I was wondering if there is a way to lookup keys based on
> regex.
> > > > > >>>>
> > > > > >>>> For example,
> > > > > >>>> I have these entries for a ktable:
> > > > > >>>> test_host1,{ "source": "test_host", "UL1": "test1_l1" }
> > > > > >>>>
> > > > > >>>> test_host2,{ "source": "test_host2", "UL1": "test2_l2" }
> > > > > >>>>
> > > > > >>>> test_host3,{ "source": "test_host3", "UL1": "test3_l3" }
> > > > > >>>>
> > > > > >>>> blah,{ "source": "blah_host", "UL1": "blah_l3" }
> > > > > >>>>
> > > > > >>>> and this for a kstream:
> > > > > >>>>
> > > > > >>>> test_host,{ "source": "test_host", "custom": { "test ": {
> > > > > >> "creation_time ":
> > > > > >>>> "1234 " } } }
> > > > > >>>>
> > > > > >>>> In this case, if the exact match does not work, I would like
> to
> > > lookup
> > > > > >>>> ktable for all entries that contains "test_host*" in it and
> have
> > > > > >>>> application logic to determine what would be the best fit.
> > > > > >>>>
> > > > > >>>> Appreciate input.
> > > > > >>>>
> > > > > >>>> - Shekar
> > > > > >>>>
> > > > > >>
> > > > >
> > >
> >
>

Re: Kafka streams regex match

Posted by Damian Guy <da...@gmail.com>.
Hi,
The store won't be queryable until after you have called streams.start().
No stores have been created until the application is up and running and
they are dependent on the underlying partitions.

To check that a stateful operation has produced a result you would normally
add another operation after the join, i.e.,
stream.join(other,...).foreach(..) or stream.join(other,...).to("topic")

Thanks,
Damian

On Thu, 27 Jul 2017 at 22:52 Shekar Tippur <ct...@gmail.com> wrote:

> One more thing.. How do we check if the stateful join operation resulted in
> a kstream of some value in it (size of kstream)? How do we check the
> content of a kstream?
>
> - S
>
> On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur <ct...@gmail.com> wrote:
>
> > Damien,
> >
> > Thanks a lot for pointing out.
> >
> > I got a little further. I am kind of stuck with the sequencing. Couple of
> > issues:
> > 1. I cannot initialise KafkaStreams before the parser.to().
> > 2. Do I need to create a new KafkaStreams object when I create a
> > KeyValueStore?
> > 3. How do I initialize KeyValueIterator with <String, JsonNode> I seem to
> > get a error when I try:
> > *KeyValueIterator <String,JsonNode> kviterator
> > = keyValueStore.range("test_nod","test_node");*
> >
> > /////// START CODE /////////
> > //parser is a kstream as a result of join
> > if (parser.toString().matches("null")){
> >
> >     ReadOnlyKeyValueStore<String, Long> keyValueStore =
> >             null;
> >     KafkaStreams newstreams = new KafkaStreams(builder, props);
> >     try {
> >         keyValueStore =
> IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > QueryableStoreTypes.keyValueStore(), newstreams);
> >     } catch (InterruptedException e) {
> >         e.printStackTrace();
> >     }
> > *    KeyValueIterator kviterator
> > = keyValueStore.range("test_nod","test_node");*
> > }else {
> >
> > *    parser.to <http://parser.to>(stringSerde, jsonSerde, "parser");*}
> >
> > *KafkaStreams streams = new KafkaStreams(builder, props);*
> > streams.start();
> >
> > /////// END CODE /////////
> >
> > - S
> >
> >
> >
> > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy <da...@gmail.com>
> wrote:
> > >
> > > It is part of the ReadOnlyKeyValueStore interface:
> > >
> > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
> > >
> > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur <ct...@gmail.com> wrote:
> > >
> > > > That's cool. This feature is a part of rocksdb object and not ktable?
> > > >
> > > > Sent from my iPhone
> > > >
> > > > > On Jul 27, 2017, at 07:57, Damian Guy <da...@gmail.com>
> wrote:
> > > > >
> > > > > Yes they can be strings,
> > > > >
> > > > > so you could do something like:
> > > > > store.range("test_host", "test_hosu");
> > > > >
> > > > > This would return an iterator containing all of the values
> > (inclusive)
> > > > from
> > > > > "test_host" -> "test_hosu".
> > > > >
> > > > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur <ct...@gmail.com>
> > wrote:
> > > > >>
> > > > >> Can you please point me to an example? Can from and to be a
> string?
> > > > >>
> > > > >> Sent from my iPhone
> > > > >>
> > > > >>> On Jul 27, 2017, at 04:04, Damian Guy <da...@gmail.com>
> > wrote:
> > > > >>>
> > > > >>> Hi,
> > > > >>>
> > > > >>> You can't use a regex, but you could use a range query.
> > > > >>> i.e, keyValueStore.range(from, to)
> > > > >>>
> > > > >>> Thanks,
> > > > >>> Damian
> > > > >>>
> > > > >>>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur <ct...@gmail.com>
> > wrote:
> > > > >>>>
> > > > >>>> Hello,
> > > > >>>>
> > > > >>>> I am able to get the kstream to ktable join work. I have some
> use
> > > > cases
> > > > >>>> where the key is not always a exact match.
> > > > >>>> I was wondering if there is a way to lookup keys based on regex.
> > > > >>>>
> > > > >>>> For example,
> > > > >>>> I have these entries for a ktable:
> > > > >>>> test_host1,{ "source": "test_host", "UL1": "test1_l1" }
> > > > >>>>
> > > > >>>> test_host2,{ "source": "test_host2", "UL1": "test2_l2" }
> > > > >>>>
> > > > >>>> test_host3,{ "source": "test_host3", "UL1": "test3_l3" }
> > > > >>>>
> > > > >>>> blah,{ "source": "blah_host", "UL1": "blah_l3" }
> > > > >>>>
> > > > >>>> and this for a kstream:
> > > > >>>>
> > > > >>>> test_host,{ "source": "test_host", "custom": { "test ": {
> > > > >> "creation_time ":
> > > > >>>> "1234 " } } }
> > > > >>>>
> > > > >>>> In this case, if the exact match does not work, I would like to
> > lookup
> > > > >>>> ktable for all entries that contains "test_host*" in it and have
> > > > >>>> application logic to determine what would be the best fit.
> > > > >>>>
> > > > >>>> Appreciate input.
> > > > >>>>
> > > > >>>> - Shekar
> > > > >>>>
> > > > >>
> > > >
> >
>

Re: Kafka streams regex match

Posted by Shekar Tippur <ct...@gmail.com>.
One more thing.. How do we check if the stateful join operation resulted in
a kstream of some value in it (size of kstream)? How do we check the
content of a kstream?

- S

On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur <ct...@gmail.com> wrote:

> Damien,
>
> Thanks a lot for pointing out.
>
> I got a little further. I am kind of stuck with the sequencing. Couple of
> issues:
> 1. I cannot initialise KafkaStreams before the parser.to().
> 2. Do I need to create a new KafkaStreams object when I create a
> KeyValueStore?
> 3. How do I initialize KeyValueIterator with <String, JsonNode> I seem to
> get a error when I try:
> *KeyValueIterator <String,JsonNode> kviterator
> = keyValueStore.range("test_nod","test_node");*
>
> /////// START CODE /////////
> //parser is a kstream as a result of join
> if (parser.toString().matches("null")){
>
>     ReadOnlyKeyValueStore<String, Long> keyValueStore =
>             null;
>     KafkaStreams newstreams = new KafkaStreams(builder, props);
>     try {
>         keyValueStore = IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> QueryableStoreTypes.keyValueStore(), newstreams);
>     } catch (InterruptedException e) {
>         e.printStackTrace();
>     }
> *    KeyValueIterator kviterator
> = keyValueStore.range("test_nod","test_node");*
> }else {
>
> *    parser.to <http://parser.to>(stringSerde, jsonSerde, "parser");*}
>
> *KafkaStreams streams = new KafkaStreams(builder, props);*
> streams.start();
>
> /////// END CODE /////////
>
> - S
>
>
>
> On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy <da...@gmail.com> wrote:
> >
> > It is part of the ReadOnlyKeyValueStore interface:
> >
> > https://github.com/apache/kafka/blob/trunk/streams/src/
> main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
> >
> > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur <ct...@gmail.com> wrote:
> >
> > > That's cool. This feature is a part of rocksdb object and not ktable?
> > >
> > > Sent from my iPhone
> > >
> > > > On Jul 27, 2017, at 07:57, Damian Guy <da...@gmail.com> wrote:
> > > >
> > > > Yes they can be strings,
> > > >
> > > > so you could do something like:
> > > > store.range("test_host", "test_hosu");
> > > >
> > > > This would return an iterator containing all of the values
> (inclusive)
> > > from
> > > > "test_host" -> "test_hosu".
> > > >
> > > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur <ct...@gmail.com>
> wrote:
> > > >>
> > > >> Can you please point me to an example? Can from and to be a string?
> > > >>
> > > >> Sent from my iPhone
> > > >>
> > > >>> On Jul 27, 2017, at 04:04, Damian Guy <da...@gmail.com>
> wrote:
> > > >>>
> > > >>> Hi,
> > > >>>
> > > >>> You can't use a regex, but you could use a range query.
> > > >>> i.e, keyValueStore.range(from, to)
> > > >>>
> > > >>> Thanks,
> > > >>> Damian
> > > >>>
> > > >>>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur <ct...@gmail.com>
> wrote:
> > > >>>>
> > > >>>> Hello,
> > > >>>>
> > > >>>> I am able to get the kstream to ktable join work. I have some use
> > > cases
> > > >>>> where the key is not always a exact match.
> > > >>>> I was wondering if there is a way to lookup keys based on regex.
> > > >>>>
> > > >>>> For example,
> > > >>>> I have these entries for a ktable:
> > > >>>> test_host1,{ "source": "test_host", "UL1": "test1_l1" }
> > > >>>>
> > > >>>> test_host2,{ "source": "test_host2", "UL1": "test2_l2" }
> > > >>>>
> > > >>>> test_host3,{ "source": "test_host3", "UL1": "test3_l3" }
> > > >>>>
> > > >>>> blah,{ "source": "blah_host", "UL1": "blah_l3" }
> > > >>>>
> > > >>>> and this for a kstream:
> > > >>>>
> > > >>>> test_host,{ "source": "test_host", "custom": { "test ": {
> > > >> "creation_time ":
> > > >>>> "1234 " } } }
> > > >>>>
> > > >>>> In this case, if the exact match does not work, I would like to
> lookup
> > > >>>> ktable for all entries that contains "test_host*" in it and have
> > > >>>> application logic to determine what would be the best fit.
> > > >>>>
> > > >>>> Appreciate input.
> > > >>>>
> > > >>>> - Shekar
> > > >>>>
> > > >>
> > >
>

Re: Kafka streams regex match

Posted by Shekar Tippur <ct...@gmail.com>.
Damien,

Thanks a lot for pointing out.

I got a little further. I am kind of stuck with the sequencing. Couple of
issues:
1. I cannot initialise KafkaStreams before the parser.to().
2. Do I need to create a new KafkaStreams object when I create a
KeyValueStore?
3. How do I initialize KeyValueIterator with <String, JsonNode> I seem to
get a error when I try:
*KeyValueIterator <String,JsonNode> kviterator
= keyValueStore.range("test_nod","test_node");*

/////// START CODE /////////
//parser is a kstream as a result of join
if (parser.toString().matches("null")){

    ReadOnlyKeyValueStore<String, Long> keyValueStore =
            null;
    KafkaStreams newstreams = new KafkaStreams(builder, props);
    try {
        keyValueStore =
IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
QueryableStoreTypes.keyValueStore(), newstreams);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
*    KeyValueIterator kviterator
= keyValueStore.range("test_nod","test_node");*
}else {

*    parser.to <http://parser.to>(stringSerde, jsonSerde, "parser");*}

*KafkaStreams streams = new KafkaStreams(builder, props);*
streams.start();

/////// END CODE /////////

- S


On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy <da...@gmail.com> wrote:
>
> It is part of the ReadOnlyKeyValueStore interface:
>
>
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
>
> On Thu, 27 Jul 2017 at 17:17 Shekar Tippur <ct...@gmail.com> wrote:
>
> > That's cool. This feature is a part of rocksdb object and not ktable?
> >
> > Sent from my iPhone
> >
> > > On Jul 27, 2017, at 07:57, Damian Guy <da...@gmail.com> wrote:
> > >
> > > Yes they can be strings,
> > >
> > > so you could do something like:
> > > store.range("test_host", "test_hosu");
> > >
> > > This would return an iterator containing all of the values (inclusive)
> > from
> > > "test_host" -> "test_hosu".
> > >
> > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur <ct...@gmail.com> wrote:
> > >>
> > >> Can you please point me to an example? Can from and to be a string?
> > >>
> > >> Sent from my iPhone
> > >>
> > >>> On Jul 27, 2017, at 04:04, Damian Guy <da...@gmail.com> wrote:
> > >>>
> > >>> Hi,
> > >>>
> > >>> You can't use a regex, but you could use a range query.
> > >>> i.e, keyValueStore.range(from, to)
> > >>>
> > >>> Thanks,
> > >>> Damian
> > >>>
> > >>>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur <ct...@gmail.com>
wrote:
> > >>>>
> > >>>> Hello,
> > >>>>
> > >>>> I am able to get the kstream to ktable join work. I have some use
> > cases
> > >>>> where the key is not always a exact match.
> > >>>> I was wondering if there is a way to lookup keys based on regex.
> > >>>>
> > >>>> For example,
> > >>>> I have these entries for a ktable:
> > >>>> test_host1,{ "source": "test_host", "UL1": "test1_l1" }
> > >>>>
> > >>>> test_host2,{ "source": "test_host2", "UL1": "test2_l2" }
> > >>>>
> > >>>> test_host3,{ "source": "test_host3", "UL1": "test3_l3" }
> > >>>>
> > >>>> blah,{ "source": "blah_host", "UL1": "blah_l3" }
> > >>>>
> > >>>> and this for a kstream:
> > >>>>
> > >>>> test_host,{ "source": "test_host", "custom": { "test ": {
> > >> "creation_time ":
> > >>>> "1234 " } } }
> > >>>>
> > >>>> In this case, if the exact match does not work, I would like to
lookup
> > >>>> ktable for all entries that contains "test_host*" in it and have
> > >>>> application logic to determine what would be the best fit.
> > >>>>
> > >>>> Appreciate input.
> > >>>>
> > >>>> - Shekar
> > >>>>
> > >>
> >

Re: Kafka streams regex match

Posted by Damian Guy <da...@gmail.com>.
It is part of the ReadOnlyKeyValueStore interface:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java

On Thu, 27 Jul 2017 at 17:17 Shekar Tippur <ct...@gmail.com> wrote:

> That's cool. This feature is a part of rocksdb object and not ktable?
>
> Sent from my iPhone
>
> > On Jul 27, 2017, at 07:57, Damian Guy <da...@gmail.com> wrote:
> >
> > Yes they can be strings,
> >
> > so you could do something like:
> > store.range("test_host", "test_hosu");
> >
> > This would return an iterator containing all of the values (inclusive)
> from
> > "test_host" -> "test_hosu".
> >
> >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur <ct...@gmail.com> wrote:
> >>
> >> Can you please point me to an example? Can from and to be a string?
> >>
> >> Sent from my iPhone
> >>
> >>> On Jul 27, 2017, at 04:04, Damian Guy <da...@gmail.com> wrote:
> >>>
> >>> Hi,
> >>>
> >>> You can't use a regex, but you could use a range query.
> >>> i.e, keyValueStore.range(from, to)
> >>>
> >>> Thanks,
> >>> Damian
> >>>
> >>>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur <ct...@gmail.com> wrote:
> >>>>
> >>>> Hello,
> >>>>
> >>>> I am able to get the kstream to ktable join work. I have some use
> cases
> >>>> where the key is not always a exact match.
> >>>> I was wondering if there is a way to lookup keys based on regex.
> >>>>
> >>>> For example,
> >>>> I have these entries for a ktable:
> >>>> test_host1,{ "source": "test_host", "UL1": "test1_l1" }
> >>>>
> >>>> test_host2,{ "source": "test_host2", "UL1": "test2_l2" }
> >>>>
> >>>> test_host3,{ "source": "test_host3", "UL1": "test3_l3" }
> >>>>
> >>>> blah,{ "source": "blah_host", "UL1": "blah_l3" }
> >>>>
> >>>> and this for a kstream:
> >>>>
> >>>> test_host,{ "source": "test_host", "custom": { "test ": {
> >> "creation_time ":
> >>>> "1234 " } } }
> >>>>
> >>>> In this case, if the exact match does not work, I would like to lookup
> >>>> ktable for all entries that contains "test_host*" in it and have
> >>>> application logic to determine what would be the best fit.
> >>>>
> >>>> Appreciate input.
> >>>>
> >>>> - Shekar
> >>>>
> >>
>

Re: Kafka streams regex match

Posted by Shekar Tippur <ct...@gmail.com>.
That's cool. This feature is a part of rocksdb object and not ktable?

Sent from my iPhone

> On Jul 27, 2017, at 07:57, Damian Guy <da...@gmail.com> wrote:
> 
> Yes they can be strings,
> 
> so you could do something like:
> store.range("test_host", "test_hosu");
> 
> This would return an iterator containing all of the values (inclusive) from
> "test_host" -> "test_hosu".
> 
>> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur <ct...@gmail.com> wrote:
>> 
>> Can you please point me to an example? Can from and to be a string?
>> 
>> Sent from my iPhone
>> 
>>> On Jul 27, 2017, at 04:04, Damian Guy <da...@gmail.com> wrote:
>>> 
>>> Hi,
>>> 
>>> You can't use a regex, but you could use a range query.
>>> i.e, keyValueStore.range(from, to)
>>> 
>>> Thanks,
>>> Damian
>>> 
>>>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur <ct...@gmail.com> wrote:
>>>> 
>>>> Hello,
>>>> 
>>>> I am able to get the kstream to ktable join work. I have some use cases
>>>> where the key is not always a exact match.
>>>> I was wondering if there is a way to lookup keys based on regex.
>>>> 
>>>> For example,
>>>> I have these entries for a ktable:
>>>> test_host1,{ "source": "test_host", "UL1": "test1_l1" }
>>>> 
>>>> test_host2,{ "source": "test_host2", "UL1": "test2_l2" }
>>>> 
>>>> test_host3,{ "source": "test_host3", "UL1": "test3_l3" }
>>>> 
>>>> blah,{ "source": "blah_host", "UL1": "blah_l3" }
>>>> 
>>>> and this for a kstream:
>>>> 
>>>> test_host,{ "source": "test_host", "custom": { "test ": {
>> "creation_time ":
>>>> "1234 " } } }
>>>> 
>>>> In this case, if the exact match does not work, I would like to lookup
>>>> ktable for all entries that contains "test_host*" in it and have
>>>> application logic to determine what would be the best fit.
>>>> 
>>>> Appreciate input.
>>>> 
>>>> - Shekar
>>>> 
>> 

Re: Kafka streams regex match

Posted by Damian Guy <da...@gmail.com>.
Yes they can be strings,

so you could do something like:
store.range("test_host", "test_hosu");

This would return an iterator containing all of the values (inclusive) from
"test_host" -> "test_hosu".

On Thu, 27 Jul 2017 at 14:48 Shekar Tippur <ct...@gmail.com> wrote:

> Can you please point me to an example? Can from and to be a string?
>
> Sent from my iPhone
>
> > On Jul 27, 2017, at 04:04, Damian Guy <da...@gmail.com> wrote:
> >
> > Hi,
> >
> > You can't use a regex, but you could use a range query.
> > i.e, keyValueStore.range(from, to)
> >
> > Thanks,
> > Damian
> >
> >> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur <ct...@gmail.com> wrote:
> >>
> >> Hello,
> >>
> >> I am able to get the kstream to ktable join work. I have some use cases
> >> where the key is not always a exact match.
> >> I was wondering if there is a way to lookup keys based on regex.
> >>
> >> For example,
> >> I have these entries for a ktable:
> >> test_host1,{ "source": "test_host", "UL1": "test1_l1" }
> >>
> >> test_host2,{ "source": "test_host2", "UL1": "test2_l2" }
> >>
> >> test_host3,{ "source": "test_host3", "UL1": "test3_l3" }
> >>
> >> blah,{ "source": "blah_host", "UL1": "blah_l3" }
> >>
> >> and this for a kstream:
> >>
> >> test_host,{ "source": "test_host", "custom": { "test ": {
> "creation_time ":
> >> "1234 " } } }
> >>
> >> In this case, if the exact match does not work, I would like to lookup
> >> ktable for all entries that contains "test_host*" in it and have
> >> application logic to determine what would be the best fit.
> >>
> >> Appreciate input.
> >>
> >> - Shekar
> >>
>

Re: Kafka streams regex match

Posted by Shekar Tippur <ct...@gmail.com>.
Can you please point me to an example? Can from and to be a string?

Sent from my iPhone

> On Jul 27, 2017, at 04:04, Damian Guy <da...@gmail.com> wrote:
> 
> Hi,
> 
> You can't use a regex, but you could use a range query.
> i.e, keyValueStore.range(from, to)
> 
> Thanks,
> Damian
> 
>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur <ct...@gmail.com> wrote:
>> 
>> Hello,
>> 
>> I am able to get the kstream to ktable join work. I have some use cases
>> where the key is not always a exact match.
>> I was wondering if there is a way to lookup keys based on regex.
>> 
>> For example,
>> I have these entries for a ktable:
>> test_host1,{ "source": "test_host", "UL1": "test1_l1" }
>> 
>> test_host2,{ "source": "test_host2", "UL1": "test2_l2" }
>> 
>> test_host3,{ "source": "test_host3", "UL1": "test3_l3" }
>> 
>> blah,{ "source": "blah_host", "UL1": "blah_l3" }
>> 
>> and this for a kstream:
>> 
>> test_host,{ "source": "test_host", "custom": { "test ": { "creation_time ":
>> "1234 " } } }
>> 
>> In this case, if the exact match does not work, I would like to lookup
>> ktable for all entries that contains "test_host*" in it and have
>> application logic to determine what would be the best fit.
>> 
>> Appreciate input.
>> 
>> - Shekar
>> 

Re: Kafka streams regex match

Posted by Damian Guy <da...@gmail.com>.
Hi,

You can't use a regex, but you could use a range query.
i.e, keyValueStore.range(from, to)

Thanks,
Damian

On Wed, 26 Jul 2017 at 22:34 Shekar Tippur <ct...@gmail.com> wrote:

> Hello,
>
> I am able to get the kstream to ktable join work. I have some use cases
> where the key is not always a exact match.
> I was wondering if there is a way to lookup keys based on regex.
>
> For example,
> I have these entries for a ktable:
> test_host1,{ "source": "test_host", "UL1": "test1_l1" }
>
> test_host2,{ "source": "test_host2", "UL1": "test2_l2" }
>
> test_host3,{ "source": "test_host3", "UL1": "test3_l3" }
>
> blah,{ "source": "blah_host", "UL1": "blah_l3" }
>
> and this for a kstream:
>
> test_host,{ "source": "test_host", "custom": { "test ": { "creation_time ":
> "1234 " } } }
>
> In this case, if the exact match does not work, I would like to lookup
> ktable for all entries that contains "test_host*" in it and have
> application logic to determine what would be the best fit.
>
> Appreciate input.
>
> - Shekar
>