You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by vbm <bm...@gmail.com> on 2018/03/12 08:34:54 UTC

How to achieve writethrough with ignite data streamer

Hi,

I am using ignite datastreamer to pull the data from a kafka topic. 
THe data is getting loaded to the ignite cache, but it is not getting
written to the 3rd party persistance (mysql db).

I have set the cacheStoreFactory to my CustomClass which has extended
CacheStoreAdapter class.

Code Snippet:

   
myCacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(MyCacheStoreAdapter.class));
    // Set as write-thorugh cache
    myCacheCfg.setWriteThrough(true);

   try (IgniteDataStreamer<Long, MyOrders> stmr =
ignite.dataStreamer("myCache")) {
	    // allow overwriting cache data
	    stmr.allowOverwrite(true);
	    
	    kafkaStreamer = new KafkaStreamer<>();
	    kafkaStreamer.setIgnite(ignite);
	    
	    kafkaStreamer.setStreamer(stmr);
	    
	    // set the topic
	    kafkaStreamer.setTopic(topic);

	    // set the number of threads to process Kafka streams
	    kafkaStreamer.setThreads(1);    
	    
	    Properties settings = new Properties();

	    kafkaStreamer.setConsumerConfig(new ConsumerConfig(settings));
	    kafkaStreamer.setMultipleTupleExtractor(
                new
StreamMultipleTupleExtractor<MessageAndMetadata&lt;byte[], byte[]>, Long,
MyOrders>() {
                @Override public Map<Long, MyOrders>
extract(MessageAndMetadata<byte[], byte[]> msg) {
                    Map<Long, MyOrders> entries = new HashMap<>();

                    try {
                    	// Converting the message recieved to my requirement
and adding it to the map
                        entries.put(key, order);

                    }
                    catch (Exception ex) {
                        ex.printStackTrace();
                    }
                   
                    return entries;
                }
});

With this code, I am able to get the data in to the cache. But the write
through behaviour is not getting triggered. The code in my
cacheStroreFactory (write, load) is not getting called.


Can anyone help me with this and let me know how to achieve the write
through behaviour. 


Regards,
Vishwas



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: How to achieve writethrough with ignite data streamer

Posted by vkulichenko <va...@gmail.com>.
Hm.. Not sore what happened exactly in your case, but cache store is never
deployed via peer class loading. It's required that you have a class
explicitly deployed on every node prior to start up.

-Val



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: How to achieve writethrough with ignite data streamer

Posted by vbm <bm...@gmail.com>.
Here the issue was with peerClassLoading flag. 

I had not enabled it on the server. The cache store factory class was not
getting loaded as per the server logs. 
I enabled it and now write-through is achieved.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: How to achieve writethrough with ignite data streamer

Posted by ezhuravlev <e....@gmail.com>.
if skipStore flag is enabled, then data won't be propagated to the store


are you sure that you have writeThrough flag enabled?

Additionally, you need to check your CacheStore implementation, if you don't
have any ideas, you can post it here, the community will check it.

Evgenii



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: How to achieve writethrough with ignite data streamer

Posted by vbm <bm...@gmail.com>.
I have set the overwrite flag to true.
stmr.allowOverwrite(true); 

What is the significance of skipStore flag ? 
What is the flow for an entry from to setTupleMultipleExtractore to reach
the cache ? 

I am thinking it should go through write method with which it gets put to
the cache. I have overloaded the write method in the cachefactory as below:


myCacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(MyCacheStoreAdapter.class));



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: How to achieve writethrough with ignite data streamer

Posted by Evgenii Zhuravlev <e....@gmail.com>.
Hi,

You need to set IgniteDataStreamer.allowOverwrite(true), as javadoc
says: Note that when this flag is {@code false}, updates will not be
propagated to the cache store

* (i.e. {@link #skipStore()} flag will be set to {@code true} implicitly).


Evgenii


2018-03-12 11:34 GMT+03:00 vbm <bm...@gmail.com>:

> Hi,
>
> I am using ignite datastreamer to pull the data from a kafka topic.
> THe data is getting loaded to the ignite cache, but it is not getting
> written to the 3rd party persistance (mysql db).
>
> I have set the cacheStoreFactory to my CustomClass which has extended
> CacheStoreAdapter class.
>
> Code Snippet:
>
>
> myCacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(
> MyCacheStoreAdapter.class));
>     // Set as write-thorugh cache
>     myCacheCfg.setWriteThrough(true);
>
>    try (IgniteDataStreamer<Long, MyOrders> stmr =
> ignite.dataStreamer("myCache")) {
>             // allow overwriting cache data
>             stmr.allowOverwrite(true);
>
>             kafkaStreamer = new KafkaStreamer<>();
>             kafkaStreamer.setIgnite(ignite);
>
>             kafkaStreamer.setStreamer(stmr);
>
>             // set the topic
>             kafkaStreamer.setTopic(topic);
>
>             // set the number of threads to process Kafka streams
>             kafkaStreamer.setThreads(1);
>
>             Properties settings = new Properties();
>
>             kafkaStreamer.setConsumerConfig(new ConsumerConfig(settings));
>             kafkaStreamer.setMultipleTupleExtractor(
>                 new
> StreamMultipleTupleExtractor<MessageAndMetadata&lt;byte[], byte[]>, Long,
> MyOrders>() {
>                 @Override public Map<Long, MyOrders>
> extract(MessageAndMetadata<byte[], byte[]> msg) {
>                     Map<Long, MyOrders> entries = new HashMap<>();
>
>                     try {
>                         // Converting the message recieved to my
> requirement
> and adding it to the map
>                         entries.put(key, order);
>
>                     }
>                     catch (Exception ex) {
>                         ex.printStackTrace();
>                     }
>
>                     return entries;
>                 }
> });
>
> With this code, I am able to get the data in to the cache. But the write
> through behaviour is not getting triggered. The code in my
> cacheStroreFactory (write, load) is not getting called.
>
>
> Can anyone help me with this and let me know how to achieve the write
> through behaviour.
>
>
> Regards,
> Vishwas
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>