You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ranjit Kumar <ra...@gmail.com> on 2017/11/09 10:51:57 UTC

kafka streams with multiple threads and state store

Hi All,

I want to use one state store in all my kafka stream threads in my
application, how can i do it.

1. i created one topic (name: test2) with 3 partitions .
2. wrote kafka stream with num.stream.threads = 3 in java code
3. using state store (name: count2) in my application.

But state store (count2) is acting like local to thread, but it should be
unique to entire application and the same value to be reflected every where
how can i do it ?

Do i need to take care any synch also ?

Code:
====
package com.javatpoint;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.Stores;

import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Properties;
import java.lang.*;

/**
 * Hello world!
 *
 */
public class App
{
    public static void main( String[] args )
    {
/*        StateStoreSupplier testStore = Stores.create("count2")
                .withKeys(Serdes.String())
                .withValues(Serdes.Long())
                .persistent()
                .build();*/
        StateStoreSupplier testStore = Stores.create("count2")
                .withStringKeys()
                .withLongValues()
                .persistent()
                .build();

//        TopologyBuilder builder = new TopologyBuilder();
        final KStreamBuilder builder = new KStreamBuilder();

        builder.addSource("source", "test2").addProcessor("process",
TestProcessor::new, "source").addStateStore(testStore, "process");

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
//        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.ByteArray().getClass().getName());
//        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.ByteArray().getClass().getName());

        props.put("auto.offset.reset", "latest");
        props.put("num.stream.threads", 3);

            System.out.println("test1");
        KafkaStreams streams = new KafkaStreams(builder, props);
            System.out.println("test2");
        streams.start();
    }

//    public static class TestProcessor implements Processor<byte[],
byte[]> {
    public static class TestProcessor implements Processor<String, String> {
         private  KeyValueStore<String, Long> kvStore;
            private ProcessorContext context;

        @Override
        public void init(ProcessorContext context) {
                this.context = context;
//            context.getStateStore("count2");
            System.out.println("Initialized");
                this.kvStore = (KeyValueStore<String, Long>)
                                    context.getStateStore("count2");

        }

        @Override
       public void process(String k, String v) {
 //       public void process(byte[] k, byte[] v) {
            System.out.println("Processing " + k + " -> " + v);
                try {


                        Long oldValue = this.kvStore.get(v);
            System.out.println("Oldval " + oldValue + " -> Key " + v);
                        if (oldValue == null) {
                                this.kvStore.put(v, 1L);
                        }
                        else
                        {
                                this.kvStore.put(v, oldValue + 1L);
                        }
                          Thread.sleep(10000);
           } catch (Exception e) {
             System.out.println(e);
           }
        }

        @Override
        public void punctuate(long timestamp) {

        }

        @Override
        public void close() {

        }
    }
}

Fwd: kafka streams with multiple threads and state store

Posted by Ranjit Kumar <ra...@gmail.com>.
Hi All,


I am new to kafka.

I have some fundamental questions related to choosing architecture for m
requirement can please suggest me.

I am developing some IOT project, please find the details below.

1. I have moving objects which are  continuously sending their position +
some other information  to central cloud every second and application VM is
running on cloud server to receive the data
2. Every moving object information application VM need to distribute to all
other objects with in 200mt's radius of that particular object


I am doing in this way, please suggest me .

My Question is *"How can i access central state store in all threads to
access all the objects"*

[image: Inline image 1]


all threads state store information is very dynamic and every thread
requires Global DB and should be persistent.

Can you please provide me gobal state store example code with stream.

Every kafka stream will decode this data and will do some processing also
will this be a good design ?

Thanks & Regards,
Ranjit

















On Fri, Nov 10, 2017 at 4:06 PM, Damian Guy <da...@gmail.com> wrote:

> Hi Ranjit, it sounds like you might want to use a global table for this.
> You can use StreamsBuilder#globalTable(String, Materialized) to create the
> global table. You could do something like:
>
> KeyValueBytesStoreSupplier supplier =
> Stores.inMemoryKeyValueStore("global-store");
> Materialized<String, String, KeyValueStore<Bytes, byte[]>>
> materialized = Materialized.as(supplier);
> builder.globalTable("topic",
> materialized.withKeySerde(Serdes.String()).withValueSerde(
> Serdes.String()));
>
>
> On Fri, 10 Nov 2017 at 09:24 Ranjit Kumar <ra...@gmail.com> wrote:
>
> > Hi Guozhang,
> >
> > Thanks for the information.
> >
> > My requirement is some thing like this.
> >
> > 1. i want to read the data from one topic (which is continuously
> feeding),
> > so i though of using the kafka streams with threads
> > 2. want to store the data in one in memory data base (not the local data
> > store per thread)
> >
> > If i have to write my own Statestore logic with handling of
> synchronization
> > is it equal to having my own global data structure in all threads ?
> >
> > Any performance impact will be their with our own sync ? Can you pelase
> > share if you have any sample programs or links describing on this .
> >
> > Thanks & Regards,
> > Ranjit
> >
> > On Fri, Nov 10, 2017 at 4:38 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Ranjit,
> > >
> > > Note that the "testStore" instance you are passing is a
> > StateStoreSupplier
> > > which will generate a new StateStore instance for each thread's task.
> > >
> > > If you really want to have all the thread's share the same state store
> > you
> > > should implement your own StateStoreSupplier that only return the same
> > > StateStore instance in its "get()" call; however, keep in mind that in
> > this
> > > case this state store could be concurrently accessed by multi-threads
> > which
> > > is not protected by the library itself (by default single-thread access
> > is
> > > guaranteed on the state stores).
> > >
> > >
> > > Guozhang
> > >
> > > On Thu, Nov 9, 2017 at 2:51 AM, Ranjit Kumar <ra...@gmail.com>
> > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I want to use one state store in all my kafka stream threads in my
> > > > application, how can i do it.
> > > >
> > > > 1. i created one topic (name: test2) with 3 partitions .
> > > > 2. wrote kafka stream with num.stream.threads = 3 in java code
> > > > 3. using state store (name: count2) in my application.
> > > >
> > > > But state store (count2) is acting like local to thread, but it
> should
> > be
> > > > unique to entire application and the same value to be reflected every
> > > where
> > > > how can i do it ?
> > > >
> > > > Do i need to take care any synch also ?
> > > >
> > > > Code:
> > > > ====
> > > > package com.javatpoint;
> > > > import org.apache.kafka.common.serialization.Serdes;
> > > > import org.apache.kafka.streams.KafkaStreams;
> > > > import org.apache.kafka.streams.StreamsConfig;
> > > > import org.apache.kafka.streams.processor.Processor;
> > > > import org.apache.kafka.streams.processor.ProcessorContext;
> > > > import org.apache.kafka.streams.processor.StateStoreSupplier;
> > > > import org.apache.kafka.streams.processor.TopologyBuilder;
> > > > import org.apache.kafka.streams.state.Stores;
> > > >
> > > > import org.apache.kafka.streams.kstream.KStreamBuilder;
> > > > import org.apache.kafka.streams.processor.StateStoreSupplier;
> > > > import org.apache.kafka.streams.state.KeyValueStore;
> > > >
> > > > import java.util.Properties;
> > > > import java.lang.*;
> > > >
> > > > /**
> > > >  * Hello world!
> > > >  *
> > > >  */
> > > > public class App
> > > > {
> > > >     public static void main( String[] args )
> > > >     {
> > > > /*        StateStoreSupplier testStore = Stores.create("count2")
> > > >                 .withKeys(Serdes.String())
> > > >                 .withValues(Serdes.Long())
> > > >                 .persistent()
> > > >                 .build();*/
> > > >         StateStoreSupplier testStore = Stores.create("count2")
> > > >                 .withStringKeys()
> > > >                 .withLongValues()
> > > >                 .persistent()
> > > >                 .build();
> > > >
> > > > //        TopologyBuilder builder = new TopologyBuilder();
> > > >         final KStreamBuilder builder = new KStreamBuilder();
> > > >
> > > >         builder.addSource("source", "test2").addProcessor("process",
> > > > TestProcessor::new, "source").addStateStore(testStore, "process");
> > > >
> > > >         Properties props = new Properties();
> > > >         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
> > > >         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > > > "localhost:9092");
> > > >         props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > > > Serdes.String().getClass());
> > > >         props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > > > Serdes.String().getClass());
> > > > //        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > > > Serdes.ByteArray().getClass().getName());
> > > > //        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > > > Serdes.ByteArray().getClass().getName());
> > > >
> > > >         props.put("auto.offset.reset", "latest");
> > > >         props.put("num.stream.threads", 3);
> > > >
> > > >             System.out.println("test1");
> > > >         KafkaStreams streams = new KafkaStreams(builder, props);
> > > >             System.out.println("test2");
> > > >         streams.start();
> > > >     }
> > > >
> > > > //    public static class TestProcessor implements Processor<byte[],
> > > > byte[]> {
> > > >     public static class TestProcessor implements Processor<String,
> > > String>
> > > > {
> > > >          private  KeyValueStore<String, Long> kvStore;
> > > >             private ProcessorContext context;
> > > >
> > > >         @Override
> > > >         public void init(ProcessorContext context) {
> > > >                 this.context = context;
> > > > //            context.getStateStore("count2");
> > > >             System.out.println("Initialized");
> > > >                 this.kvStore = (KeyValueStore<String, Long>)
> > > >                                     context.getStateStore("count2");
> > > >
> > > >         }
> > > >
> > > >         @Override
> > > >        public void process(String k, String v) {
> > > >  //       public void process(byte[] k, byte[] v) {
> > > >             System.out.println("Processing " + k + " -> " + v);
> > > >                 try {
> > > >
> > > >
> > > >                         Long oldValue = this.kvStore.get(v);
> > > >             System.out.println("Oldval " + oldValue + " -> Key " +
> v);
> > > >                         if (oldValue == null) {
> > > >                                 this.kvStore.put(v, 1L);
> > > >                         }
> > > >                         else
> > > >                         {
> > > >                                 this.kvStore.put(v, oldValue + 1L);
> > > >                         }
> > > >                           Thread.sleep(10000);
> > > >            } catch (Exception e) {
> > > >              System.out.println(e);
> > > >            }
> > > >         }
> > > >
> > > >         @Override
> > > >         public void punctuate(long timestamp) {
> > > >
> > > >         }
> > > >
> > > >         @Override
> > > >         public void close() {
> > > >
> > > >         }
> > > >     }
> > > > }
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Re: kafka streams with multiple threads and state store

Posted by Damian Guy <da...@gmail.com>.
Hi Ranjit, it sounds like you might want to use a global table for this.
You can use StreamsBuilder#globalTable(String, Materialized) to create the
global table. You could do something like:

KeyValueBytesStoreSupplier supplier =
Stores.inMemoryKeyValueStore("global-store");
Materialized<String, String, KeyValueStore<Bytes, byte[]>>
materialized = Materialized.as(supplier);
builder.globalTable("topic",
materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));


On Fri, 10 Nov 2017 at 09:24 Ranjit Kumar <ra...@gmail.com> wrote:

> Hi Guozhang,
>
> Thanks for the information.
>
> My requirement is some thing like this.
>
> 1. i want to read the data from one topic (which is continuously feeding),
> so i though of using the kafka streams with threads
> 2. want to store the data in one in memory data base (not the local data
> store per thread)
>
> If i have to write my own Statestore logic with handling of synchronization
> is it equal to having my own global data structure in all threads ?
>
> Any performance impact will be their with our own sync ? Can you pelase
> share if you have any sample programs or links describing on this .
>
> Thanks & Regards,
> Ranjit
>
> On Fri, Nov 10, 2017 at 4:38 AM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Ranjit,
> >
> > Note that the "testStore" instance you are passing is a
> StateStoreSupplier
> > which will generate a new StateStore instance for each thread's task.
> >
> > If you really want to have all the thread's share the same state store
> you
> > should implement your own StateStoreSupplier that only return the same
> > StateStore instance in its "get()" call; however, keep in mind that in
> this
> > case this state store could be concurrently accessed by multi-threads
> which
> > is not protected by the library itself (by default single-thread access
> is
> > guaranteed on the state stores).
> >
> >
> > Guozhang
> >
> > On Thu, Nov 9, 2017 at 2:51 AM, Ranjit Kumar <ra...@gmail.com>
> wrote:
> >
> > > Hi All,
> > >
> > > I want to use one state store in all my kafka stream threads in my
> > > application, how can i do it.
> > >
> > > 1. i created one topic (name: test2) with 3 partitions .
> > > 2. wrote kafka stream with num.stream.threads = 3 in java code
> > > 3. using state store (name: count2) in my application.
> > >
> > > But state store (count2) is acting like local to thread, but it should
> be
> > > unique to entire application and the same value to be reflected every
> > where
> > > how can i do it ?
> > >
> > > Do i need to take care any synch also ?
> > >
> > > Code:
> > > ====
> > > package com.javatpoint;
> > > import org.apache.kafka.common.serialization.Serdes;
> > > import org.apache.kafka.streams.KafkaStreams;
> > > import org.apache.kafka.streams.StreamsConfig;
> > > import org.apache.kafka.streams.processor.Processor;
> > > import org.apache.kafka.streams.processor.ProcessorContext;
> > > import org.apache.kafka.streams.processor.StateStoreSupplier;
> > > import org.apache.kafka.streams.processor.TopologyBuilder;
> > > import org.apache.kafka.streams.state.Stores;
> > >
> > > import org.apache.kafka.streams.kstream.KStreamBuilder;
> > > import org.apache.kafka.streams.processor.StateStoreSupplier;
> > > import org.apache.kafka.streams.state.KeyValueStore;
> > >
> > > import java.util.Properties;
> > > import java.lang.*;
> > >
> > > /**
> > >  * Hello world!
> > >  *
> > >  */
> > > public class App
> > > {
> > >     public static void main( String[] args )
> > >     {
> > > /*        StateStoreSupplier testStore = Stores.create("count2")
> > >                 .withKeys(Serdes.String())
> > >                 .withValues(Serdes.Long())
> > >                 .persistent()
> > >                 .build();*/
> > >         StateStoreSupplier testStore = Stores.create("count2")
> > >                 .withStringKeys()
> > >                 .withLongValues()
> > >                 .persistent()
> > >                 .build();
> > >
> > > //        TopologyBuilder builder = new TopologyBuilder();
> > >         final KStreamBuilder builder = new KStreamBuilder();
> > >
> > >         builder.addSource("source", "test2").addProcessor("process",
> > > TestProcessor::new, "source").addStateStore(testStore, "process");
> > >
> > >         Properties props = new Properties();
> > >         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
> > >         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > > "localhost:9092");
> > >         props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > > Serdes.String().getClass());
> > >         props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > > Serdes.String().getClass());
> > > //        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > > Serdes.ByteArray().getClass().getName());
> > > //        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > > Serdes.ByteArray().getClass().getName());
> > >
> > >         props.put("auto.offset.reset", "latest");
> > >         props.put("num.stream.threads", 3);
> > >
> > >             System.out.println("test1");
> > >         KafkaStreams streams = new KafkaStreams(builder, props);
> > >             System.out.println("test2");
> > >         streams.start();
> > >     }
> > >
> > > //    public static class TestProcessor implements Processor<byte[],
> > > byte[]> {
> > >     public static class TestProcessor implements Processor<String,
> > String>
> > > {
> > >          private  KeyValueStore<String, Long> kvStore;
> > >             private ProcessorContext context;
> > >
> > >         @Override
> > >         public void init(ProcessorContext context) {
> > >                 this.context = context;
> > > //            context.getStateStore("count2");
> > >             System.out.println("Initialized");
> > >                 this.kvStore = (KeyValueStore<String, Long>)
> > >                                     context.getStateStore("count2");
> > >
> > >         }
> > >
> > >         @Override
> > >        public void process(String k, String v) {
> > >  //       public void process(byte[] k, byte[] v) {
> > >             System.out.println("Processing " + k + " -> " + v);
> > >                 try {
> > >
> > >
> > >                         Long oldValue = this.kvStore.get(v);
> > >             System.out.println("Oldval " + oldValue + " -> Key " + v);
> > >                         if (oldValue == null) {
> > >                                 this.kvStore.put(v, 1L);
> > >                         }
> > >                         else
> > >                         {
> > >                                 this.kvStore.put(v, oldValue + 1L);
> > >                         }
> > >                           Thread.sleep(10000);
> > >            } catch (Exception e) {
> > >              System.out.println(e);
> > >            }
> > >         }
> > >
> > >         @Override
> > >         public void punctuate(long timestamp) {
> > >
> > >         }
> > >
> > >         @Override
> > >         public void close() {
> > >
> > >         }
> > >     }
> > > }
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: kafka streams with multiple threads and state store

Posted by Ranjit Kumar <ra...@gmail.com>.
Hi Guozhang,

Thanks for the information.

My requirement is some thing like this.

1. i want to read the data from one topic (which is continuously feeding),
so i though of using the kafka streams with threads
2. want to store the data in one in memory data base (not the local data
store per thread)

If i have to write my own Statestore logic with handling of synchronization
is it equal to having my own global data structure in all threads ?

Any performance impact will be their with our own sync ? Can you pelase
share if you have any sample programs or links describing on this .

Thanks & Regards,
Ranjit

On Fri, Nov 10, 2017 at 4:38 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Ranjit,
>
> Note that the "testStore" instance you are passing is a StateStoreSupplier
> which will generate a new StateStore instance for each thread's task.
>
> If you really want to have all the thread's share the same state store you
> should implement your own StateStoreSupplier that only return the same
> StateStore instance in its "get()" call; however, keep in mind that in this
> case this state store could be concurrently accessed by multi-threads which
> is not protected by the library itself (by default single-thread access is
> guaranteed on the state stores).
>
>
> Guozhang
>
> On Thu, Nov 9, 2017 at 2:51 AM, Ranjit Kumar <ra...@gmail.com> wrote:
>
> > Hi All,
> >
> > I want to use one state store in all my kafka stream threads in my
> > application, how can i do it.
> >
> > 1. i created one topic (name: test2) with 3 partitions .
> > 2. wrote kafka stream with num.stream.threads = 3 in java code
> > 3. using state store (name: count2) in my application.
> >
> > But state store (count2) is acting like local to thread, but it should be
> > unique to entire application and the same value to be reflected every
> where
> > how can i do it ?
> >
> > Do i need to take care any synch also ?
> >
> > Code:
> > ====
> > package com.javatpoint;
> > import org.apache.kafka.common.serialization.Serdes;
> > import org.apache.kafka.streams.KafkaStreams;
> > import org.apache.kafka.streams.StreamsConfig;
> > import org.apache.kafka.streams.processor.Processor;
> > import org.apache.kafka.streams.processor.ProcessorContext;
> > import org.apache.kafka.streams.processor.StateStoreSupplier;
> > import org.apache.kafka.streams.processor.TopologyBuilder;
> > import org.apache.kafka.streams.state.Stores;
> >
> > import org.apache.kafka.streams.kstream.KStreamBuilder;
> > import org.apache.kafka.streams.processor.StateStoreSupplier;
> > import org.apache.kafka.streams.state.KeyValueStore;
> >
> > import java.util.Properties;
> > import java.lang.*;
> >
> > /**
> >  * Hello world!
> >  *
> >  */
> > public class App
> > {
> >     public static void main( String[] args )
> >     {
> > /*        StateStoreSupplier testStore = Stores.create("count2")
> >                 .withKeys(Serdes.String())
> >                 .withValues(Serdes.Long())
> >                 .persistent()
> >                 .build();*/
> >         StateStoreSupplier testStore = Stores.create("count2")
> >                 .withStringKeys()
> >                 .withLongValues()
> >                 .persistent()
> >                 .build();
> >
> > //        TopologyBuilder builder = new TopologyBuilder();
> >         final KStreamBuilder builder = new KStreamBuilder();
> >
> >         builder.addSource("source", "test2").addProcessor("process",
> > TestProcessor::new, "source").addStateStore(testStore, "process");
> >
> >         Properties props = new Properties();
> >         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
> >         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > "localhost:9092");
> >         props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass());
> >         props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass());
> > //        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > Serdes.ByteArray().getClass().getName());
> > //        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.ByteArray().getClass().getName());
> >
> >         props.put("auto.offset.reset", "latest");
> >         props.put("num.stream.threads", 3);
> >
> >             System.out.println("test1");
> >         KafkaStreams streams = new KafkaStreams(builder, props);
> >             System.out.println("test2");
> >         streams.start();
> >     }
> >
> > //    public static class TestProcessor implements Processor<byte[],
> > byte[]> {
> >     public static class TestProcessor implements Processor<String,
> String>
> > {
> >          private  KeyValueStore<String, Long> kvStore;
> >             private ProcessorContext context;
> >
> >         @Override
> >         public void init(ProcessorContext context) {
> >                 this.context = context;
> > //            context.getStateStore("count2");
> >             System.out.println("Initialized");
> >                 this.kvStore = (KeyValueStore<String, Long>)
> >                                     context.getStateStore("count2");
> >
> >         }
> >
> >         @Override
> >        public void process(String k, String v) {
> >  //       public void process(byte[] k, byte[] v) {
> >             System.out.println("Processing " + k + " -> " + v);
> >                 try {
> >
> >
> >                         Long oldValue = this.kvStore.get(v);
> >             System.out.println("Oldval " + oldValue + " -> Key " + v);
> >                         if (oldValue == null) {
> >                                 this.kvStore.put(v, 1L);
> >                         }
> >                         else
> >                         {
> >                                 this.kvStore.put(v, oldValue + 1L);
> >                         }
> >                           Thread.sleep(10000);
> >            } catch (Exception e) {
> >              System.out.println(e);
> >            }
> >         }
> >
> >         @Override
> >         public void punctuate(long timestamp) {
> >
> >         }
> >
> >         @Override
> >         public void close() {
> >
> >         }
> >     }
> > }
> >
>
>
>
> --
> -- Guozhang
>

Re: kafka streams with multiple threads and state store

Posted by Guozhang Wang <wa...@gmail.com>.
Ranjit,

Note that the "testStore" instance you are passing is a StateStoreSupplier
which will generate a new StateStore instance for each thread's task.

If you really want to have all the thread's share the same state store you
should implement your own StateStoreSupplier that only return the same
StateStore instance in its "get()" call; however, keep in mind that in this
case this state store could be concurrently accessed by multi-threads which
is not protected by the library itself (by default single-thread access is
guaranteed on the state stores).


Guozhang

On Thu, Nov 9, 2017 at 2:51 AM, Ranjit Kumar <ra...@gmail.com> wrote:

> Hi All,
>
> I want to use one state store in all my kafka stream threads in my
> application, how can i do it.
>
> 1. i created one topic (name: test2) with 3 partitions .
> 2. wrote kafka stream with num.stream.threads = 3 in java code
> 3. using state store (name: count2) in my application.
>
> But state store (count2) is acting like local to thread, but it should be
> unique to entire application and the same value to be reflected every where
> how can i do it ?
>
> Do i need to take care any synch also ?
>
> Code:
> ====
> package com.javatpoint;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.KafkaStreams;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.processor.StateStoreSupplier;
> import org.apache.kafka.streams.processor.TopologyBuilder;
> import org.apache.kafka.streams.state.Stores;
>
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.processor.StateStoreSupplier;
> import org.apache.kafka.streams.state.KeyValueStore;
>
> import java.util.Properties;
> import java.lang.*;
>
> /**
>  * Hello world!
>  *
>  */
> public class App
> {
>     public static void main( String[] args )
>     {
> /*        StateStoreSupplier testStore = Stores.create("count2")
>                 .withKeys(Serdes.String())
>                 .withValues(Serdes.Long())
>                 .persistent()
>                 .build();*/
>         StateStoreSupplier testStore = Stores.create("count2")
>                 .withStringKeys()
>                 .withLongValues()
>                 .persistent()
>                 .build();
>
> //        TopologyBuilder builder = new TopologyBuilder();
>         final KStreamBuilder builder = new KStreamBuilder();
>
>         builder.addSource("source", "test2").addProcessor("process",
> TestProcessor::new, "source").addStateStore(testStore, "process");
>
>         Properties props = new Properties();
>         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
>         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
>         props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass());
>         props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass());
> //        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.ByteArray().getClass().getName());
> //        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.ByteArray().getClass().getName());
>
>         props.put("auto.offset.reset", "latest");
>         props.put("num.stream.threads", 3);
>
>             System.out.println("test1");
>         KafkaStreams streams = new KafkaStreams(builder, props);
>             System.out.println("test2");
>         streams.start();
>     }
>
> //    public static class TestProcessor implements Processor<byte[],
> byte[]> {
>     public static class TestProcessor implements Processor<String, String>
> {
>          private  KeyValueStore<String, Long> kvStore;
>             private ProcessorContext context;
>
>         @Override
>         public void init(ProcessorContext context) {
>                 this.context = context;
> //            context.getStateStore("count2");
>             System.out.println("Initialized");
>                 this.kvStore = (KeyValueStore<String, Long>)
>                                     context.getStateStore("count2");
>
>         }
>
>         @Override
>        public void process(String k, String v) {
>  //       public void process(byte[] k, byte[] v) {
>             System.out.println("Processing " + k + " -> " + v);
>                 try {
>
>
>                         Long oldValue = this.kvStore.get(v);
>             System.out.println("Oldval " + oldValue + " -> Key " + v);
>                         if (oldValue == null) {
>                                 this.kvStore.put(v, 1L);
>                         }
>                         else
>                         {
>                                 this.kvStore.put(v, oldValue + 1L);
>                         }
>                           Thread.sleep(10000);
>            } catch (Exception e) {
>              System.out.println(e);
>            }
>         }
>
>         @Override
>         public void punctuate(long timestamp) {
>
>         }
>
>         @Override
>         public void close() {
>
>         }
>     }
> }
>



-- 
-- Guozhang