You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Александр Савинов <ra...@mail.ru> on 2016/05/23 07:35:17 UTC

Data Streamer

Hello.
I have a problem with stream API and Ignite. The value of "sum" variable should be 1000000 (that equals to length of file test.csv), but it equals 999424. If file length is small (10 or even 1000) there is nothing in cache.
Thank you.
Если вы знаете русский, ответьте, пожалуйста, на русском.
IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
igniteConfiguration.setPeerClassLoadingEnabled(true);
Ignite ignite = Ignition.start(igniteConfiguration);
CacheConfiguration<Integer, Long> cacheConfiguration = new CacheConfiguration<>("cache");
IgniteCache<Integer, Long> cache = ignite.getOrCreateCache(cacheConfiguration);
IgniteDataStreamer<Integer, Long> streamer = ignite.dataStreamer("cache");
streamer.receiver(StreamTransformer.from((entry, arg)->{
    Long value = entry.getValue();
    entry.setValue(value==null ? 1L : value + 1L);
    return entry;
}));
Stream<String> fileStream = Files.lines(Paths.get("test.csv"));
fileStream.mapToInt(Integer::parseInt).forEach(line->streamer.addData(line,1L));
cache.forEach((entry)->System.out.println(entry.getKey() + ": " + entry.getValue()));
int s = 0;
Iterator<Cache.Entry<Integer, Long>> iterator = cache.iterator();
while(iterator.hasNext()){
    Cache.Entry<Integer, Long> entry = iterator.next();
    s+=entry.getValue();
}
System.out.println(s);
cache.clear();
ignite.close();

-- 
С уважением, Александр.
----------------------------------------------------------------------

-- 
С уважением, Александр.

Re[2]: Data Streamer

Posted by Александр Савинов <ra...@mail.ru>.
 Thanks you! It works correctly.


>Понедельник, 23 мая 2016, 11:11 +03:00 от Vladimir Ozerov <vo...@gridgain.com>:
>
>Hi Alexander,
>
>Please make sure that you flush data streamer before checking the "sum" value:
>fileStream.mapToInt(Integer::parseInt).forEach(line->streamer.addData(line,1L));
>streamer.flush();
>Vladimir.
>
>On Mon, May 23, 2016 at 10:35 AM, Александр Савинов  < rain-sanek@mail.ru > wrote:
>>
>>Hello.
>>I have a problem with stream API and Ignite. The value of "sum" variable should be 1000000 (that equals to length of file test.csv), but it equals 999424. If file length is small (10 or even 1000) there is nothing in cache.
>>Thank you.
>>Если вы знаете русский, ответьте, пожалуйста, на русском.
>>IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
>>igniteConfiguration.setPeerClassLoadingEnabled(true);
>>Ignite ignite = Ignition.start(igniteConfiguration);
>>CacheConfiguration<Integer, Long> cacheConfiguration = new CacheConfiguration<>("cache");
>>IgniteCache<Integer, Long> cache = ignite.getOrCreateCache(cacheConfiguration);
>>IgniteDataStreamer<Integer, Long> streamer = ignite.dataStreamer("cache");
>>streamer.receiver(StreamTransformer.from((entry, arg)->{
>>    Long value = entry.getValue();
>>    entry.setValue(value==null ? 1L : value + 1L);
>>    return entry;
>>}));
>>Stream<String> fileStream = Files.lines(Paths.get("test.csv"));
>>fileStream.mapToInt(Integer::parseInt).forEach(line->streamer.addData(line,1L));
>>cache.forEach((entry)->System.out.println(entry.getKey() + ": " + entry.getValue()));
>>int s = 0;
>>Iterator<Cache.Entry<Integer, Long>> iterator = cache.iterator();
>>while(iterator.hasNext()){
>>    Cache.Entry<Integer, Long> entry = iterator.next();
>>    s+=entry.getValue();
>>}
>>System.out.println(s);
>>cache.clear();
>>ignite.close();
>>
>>-- 
>>С уважением, Александр.
>>----------------------------------------------------------------------
>>
>>-- 
>>С уважением, Александр.
>


-- 
С уважением, Александр.

Re: Data Streamer

Posted by Vladimir Ozerov <vo...@gridgain.com>.
Hi Alexander,

Please make sure that you flush data streamer before checking the "sum"
value:

fileStream.mapToInt(Integer::parseInt).forEach(line->streamer.addData(line,1L));
streamer.flush();

Vladimir.


On Mon, May 23, 2016 at 10:35 AM, Александр Савинов <ra...@mail.ru>
wrote:

>
> Hello.
> I have a problem with stream API and Ignite. The value of "sum" variable
> should be 1000000 (that equals to length of file test.csv), but it equals
> 999424. If file length is small (10 or even 1000) there is nothing in cache.
> Thank you.
> Если вы знаете русский, ответьте, пожалуйста, на русском.
> <//...@ignite.apache.org>
>
> IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
> igniteConfiguration.setPeerClassLoadingEnabled(true);
> Ignite ignite = Ignition.start(igniteConfiguration);
> CacheConfiguration<Integer, Long> cacheConfiguration = new CacheConfiguration<>("cache");
> IgniteCache<Integer, Long> cache = ignite.getOrCreateCache(cacheConfiguration);
> IgniteDataStreamer<Integer, Long> streamer = ignite.dataStreamer("cache");
> streamer.receiver(StreamTransformer.from((entry, arg)->{
>     Long value = entry.getValue();
>     entry.setValue(value==null ? 1L : value + 1L);
>     return entry;
> }));
> Stream<String> fileStream = Files.lines(Paths.get("test.csv"));
> fileStream.mapToInt(Integer::parseInt).forEach(line->streamer.addData(line,1L));
> cache.forEach((entry)->System.out.println(entry.getKey() + ": " + entry.getValue()));
> int s = 0;
> Iterator<Cache.Entry<Integer, Long>> iterator = cache.iterator();
> while(iterator.hasNext()){
>     Cache.Entry<Integer, Long> entry = iterator.next();
>     s+=entry.getValue();
> }
> System.out.println(s);
> cache.clear();
> ignite.close();
>
>
>
> --
> С уважением, Александр.
>
> ------------------------------
>
> --
> С уважением, Александр.
>