You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by begineer <re...@gmail.com> on 2018/03/31 09:38:23 UTC

Data Streamer not flushing data to cache

Hi, This must be something very simple. I am adding 100 items to data
streamer. But it is not flushing items to cache. Is there a settings much
enables it. Cache size is zero. Am I doing something wrong ?

public class DataStreamerExample {
public static void main(String[] args) throws InterruptedException {
	Ignite ignite = Ignition.start("examples/config/example-ignite.xml");
	CacheConfiguration<Long, Long> config = new
CacheConfiguration<>("mycache");
	IgniteCache<Long, Long> cache = ignite.getOrCreateCache(config);
	IgniteDataStreamer<Long, Long> streamer = ignite.dataStreamer("mycache");
	LongStream.range(1, 100).forEach( l->{
		System.out.println("Adding to streamer "+ l);
		streamer.addData(l, l);
	});
	System.out.println(streamer.perNodeBufferSize());
	System.out.println("Cache size : "+ cache.size(CachePeekMode.ALL));
	cache.query(new ScanQuery<>()).getAll().stream().forEach(entry->{
		System.out.println("cache Entry: " + entry.getKey()+" "+
entry.getValue());
	});
}
}




--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Data Streamer not flushing data to cache

Posted by Andrey Kuznetsov <st...@gmail.com>.
Indeed, the only reliable way is flush/close. Nonzero automatic flush
frequency doesn't provide the same guarantee.

2018-03-31 21:11 GMT+03:00 begineer <re...@gmail.com>:

> One more query.. Would it never flush the data if nothing more is added to
> streamer and current size is less than buffer size ?
> What is the default time. I can see only flush frequency
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>



-- 
Best regards,
  Andrey Kuznetsov.

Re: Data Streamer not flushing data to cache

Posted by begineer <re...@gmail.com>.
One more query.. Would it never flush the data if nothing more is added to
streamer and current size is less than buffer size ?
What is the default time. I can see only flush frequency 



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Data Streamer not flushing data to cache

Posted by begineer <re...@gmail.com>.
Thanks for reply... Its works after invoking flush(). 



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Data Streamer not flushing data to cache

Posted by Andrey Kuznetsov <st...@gmail.com>.
Hello!

The simplest way to ensure your data have got to the cache is to use
IgniteDataStreamer in try-with-resources block. I some rare scenarios it
can make sense to call {{flush()}} or {{close()}} on streamer instance
directly.

2018-03-31 12:38 GMT+03:00 begineer <re...@gmail.com>:

> Hi, This must be something very simple. I am adding 100 items to data
> streamer. But it is not flushing items to cache. Is there a settings much
> enables it. Cache size is zero. Am I doing something wrong ?
>
> public class DataStreamerExample {
> public static void main(String[] args) throws InterruptedException {
>         Ignite ignite = Ignition.start("examples/
> config/example-ignite.xml");
>         CacheConfiguration<Long, Long> config = new
> CacheConfiguration<>("mycache");
>         IgniteCache<Long, Long> cache = ignite.getOrCreateCache(config);
>         IgniteDataStreamer<Long, Long> streamer =
> ignite.dataStreamer("mycache");
>         LongStream.range(1, 100).forEach( l->{
>                 System.out.println("Adding to streamer "+ l);
>                 streamer.addData(l, l);
>         });
>         System.out.println(streamer.perNodeBufferSize());
>         System.out.println("Cache size : "+ cache.size(CachePeekMode.ALL))
> ;
>         cache.query(new ScanQuery<>()).getAll().stream().forEach(entry->{
>                 System.out.println("cache Entry: " + entry.getKey()+" "+
> entry.getValue());
>         });
> }
> }
>
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>



-- 
Best regards,
  Andrey Kuznetsov.

Re: Data Streamer not flushing data to cache

Posted by David Harvey <dh...@jobcase.com>.
By default, DataStreamer will only send a full buffer, unless you
explicitly flush or close it, or as suggested, implicitly close it.   You
can set a flush timer also.

The last time I looked (2.3), flush timer is implemented to flush
periodically, and this is unaffected by when data was last added to the
stream, i.e., if you want to ensure that it has stored all of the records
older than 5 seconds, setting the flush timer to 5 seconds will cause it to
flush every 5 seconds, even when all of the outstanding data was added in
the last 10ms.  So the flush timer creates an (unnecessary) performance
anomaly under heavy load.

Note that while the DataStreamer is thread safe, if you have a
multi-threaded producer, works much better to have a DataStreamer per
producer thread, especially if there are explicit or timer driven flushes.


-DH


On Sat, Mar 31, 2018 at 5:38 AM, begineer <re...@gmail.com> wrote:

> Hi, This must be something very simple. I am adding 100 items to data
> streamer. But it is not flushing items to cache. Is there a settings much
> enables it. Cache size is zero. Am I doing something wrong ?
>
> public class DataStreamerExample {
> public static void main(String[] args) throws InterruptedException {
>         Ignite ignite = Ignition.start("examples/
> config/example-ignite.xml");
>         CacheConfiguration<Long, Long> config = new
> CacheConfiguration<>("mycache");
>         IgniteCache<Long, Long> cache = ignite.getOrCreateCache(config);
>         IgniteDataStreamer<Long, Long> streamer =
> ignite.dataStreamer("mycache");
>         LongStream.range(1, 100).forEach( l->{
>                 System.out.println("Adding to streamer "+ l);
>                 streamer.addData(l, l);
>         });
>         System.out.println(streamer.perNodeBufferSize());
>         System.out.println("Cache size : "+ cache.size(CachePeekMode.ALL))
> ;
>         cache.query(new ScanQuery<>()).getAll().stream().forEach(entry->{
>                 System.out.println("cache Entry: " + entry.getKey()+" "+
> entry.getValue());
>         });
> }
> }
>
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Disclaimer

The information contained in this communication from the sender is confidential. It is intended solely for use by the recipient and others authorized to receive it. If you are not the recipient, you are hereby notified that any disclosure, copying, distribution or taking action in relation of the contents of this information is strictly prohibited and may be unlawful.

This email has been scanned for viruses and malware, and may have been automatically archived by Mimecast Ltd, an innovator in Software as a Service (SaaS) for business. Providing a safer and more useful place for your human generated data. Specializing in; Security, archiving and compliance. To find out more visit the Mimecast website.