You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Naveen <na...@gmail.com> on 2018/03/07 13:48:14 UTC

Usage of DataStreamer for bulk loading

HI

We are using Ignite 2.3
We have requirement to migrate the data from existing in-memory solution to
Ignite, its one time migration we should be doing 

We have data available in CSV with a delimiter, we have split for the source
files into multiple chunks and each thread processing one file. 
Here is the code which read the file line by line and call dataStreamer.add
method.


		    while (sc.hasNextLine()) {
		    	ct++;
		        String line = sc.nextLine();
		        //System.out.println(line);
			String[] tokens = line.split(Constants.Delimter, -1);
		       //System.out.println("No of tokens " + toekns.length);
		 	//PARTY_ID~ASSOCIATED_PARTY_ID~UPDATEDDATETIME~UPDATEDBY
	            	aASSOCIATED_PARTIES = new ASSOCIATED_PARTIES();
	            	aASSOCIATED_PARTIES.setPARTY_ID(tokens[0]);
	            	aASSOCIATED_PARTIES.setASSOCIATED_PARTY_ID(tokens[1]);
	            	aASSOCIATED_PARTIES.setUPDATEDBY(tokens[3]);
	            	aASSOCIATED_PARTIES.setUPDATEDDATETIME(new
Timestamp(System.currentTimeMillis()));
	
	            	streamer.perNodeBufferSize(Constants.perNodeBufferSize);
	            
streamer.perNodeParallelOperations(Constants.perNodeParallelOperations);

	            	streamer.addData(tokens[0], aASSOCIATED_PARTIES);
	            	if (ct > 0 && ct % 10000 == 0) 
	                     System.out.println("Done: " + ct);
         }

My question is, how do I call streamer.addData(tokens[0],
aASSOCIATED_PARTIES) for a batch instead of calling for every record. Guess
the way I have used is calling this for every line thats read from the file.
It should be called for lets say every 1000 records once, start accumulating
all the 1000 records and add it to cache 1000 records in batch. 
How could we do this, batch wise or time wise, like every minute it should
write.

Thanks
Naveen



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Usage of DataStreamer for bulk loading

Posted by Stanislav Lukyanov <st...@gmail.com>.
Hi Naveen,

There is no scheduled date for it yet. 2.4 release has just been voted for
and accepted.
Based on the version history (https://ignite.apache.org/download.cgi) one
could say that the average time between releases is about 2-3 months, so
it's quite possible that 2.5 will be released in that time frame, but there
are no guarantees for that. To note, there are also more frequent commercial
releases provided by third-party vendors.

Thanks,
Stan



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Usage of DataStreamer for bulk loading

Posted by Naveen <na...@gmail.com>.
Thanks Alexey.

We are in the middle of the development, we may go live in next 3 to 4
months.
If we are done with Copy command by then, we are good.
When are we going to release 2.5 ??

Thanks
Naveen



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Usage of DataStreamer for bulk loading

Posted by Alexey Kuznetsov <ak...@apache.org>.
Hi Naveen,

FYI there an issue in JIRA: IGNITE-6917 SQL: implement COPY command for
efficient data loading
https://issues.apache.org/jira/browse/IGNITE-6917

And it is already merged to master and will be a part of ignite 2.5.


-- 
Alexey Kuznetsov

Re: Usage of DataStreamer for bulk loading

Posted by Gaurav Bajaj <ga...@gmail.com>.
Hi Naveen,

In our case biggest performance gain happened when we started adding data
to IgniteStreamer in parallel.


Earlier we are doing :
entryMapToBeStreamed.entrySet().*stream*().forEach(dataStreamer::addData);

Perf improved tremendously when we did something like this :
entryMapToBeStreamed.entrySet().*parallelStream*
().forEach(dataStreamer::addData);


If you are not doing this (calling addData() in multiple threads), I would
do that first and check performance.


Best Regards,
Gaurav


On Mon, Mar 12, 2018 at 9:55 AM, Naveen <na...@gmail.com> wrote:

> Hi Gaurav
>
> Decoupling file reading and cache streaming requires kind of a messaging
> layer in between right. Initially I was thinking since its a bulk activity
> we will be doing, I did not want to have additional memory and system
> resources consumed by the introduction of messaging layer.
>
> But the whole purpose of using DataStreamer for bulk loading is violated, I
> may have go for it.
>
> If my understanding is correct, are we trying to implement the below
> solution
>
> 1. Read in batches, for a collection object, lets say for every 10K records
> 2. Publish collection object to Solace (Messaging)
> 3. Consume the collection object from Solace Queue and use DatStreamer <Map
> entry> to load it to cache.
>
> OR through StreamRecivers and DataStreamers we can avoid messaging layer
> for
> decoupling the read part with cache streaming part.
>
> If you have any code snippet which does the same, would you be able to
> share
> the code.
>
>
> Thanks
> Naveen
>
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Re: Usage of DataStreamer for bulk loading

Posted by Naveen <na...@gmail.com>.
Hi Gaurav

Decoupling file reading and cache streaming requires kind of a messaging
layer in between right. Initially I was thinking since its a bulk activity
we will be doing, I did not want to have additional memory and system
resources consumed by the introduction of messaging layer. 

But the whole purpose of using DataStreamer for bulk loading is violated, I
may have go for it.

If my understanding is correct, are we trying to implement the below
solution

1. Read in batches, for a collection object, lets say for every 10K records
2. Publish collection object to Solace (Messaging)
3. Consume the collection object from Solace Queue and use DatStreamer <Map
entry> to load it to cache.

OR through StreamRecivers and DataStreamers we can avoid messaging layer for
decoupling the read part with cache streaming part.

If you have any code snippet which does the same, would you be able to share
the code.


Thanks
Naveen




--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Usage of DataStreamer for bulk loading

Posted by Gaurav Bajaj <ga...@gmail.com>.
Hi Naveen,

I had similar situation. Two things you can do :
1. Decouple file reading from cache streaming, so that both can be handled
in separate threads asynchronously.

2. Once you have data from csv in collection, use use parallelStreams to
add data in streamer with multiple threads.

Thanks,
Gaurav

On 09-Mar-2018 3:05 PM, "Naveen" <na...@gmail.com> wrote:

> Hi DH
>
> I am not using any custom streamReciever, my requirement is very simple.
> Have huge data in CSV, reading line by line and parsing the line and
> populating the POJO and using the DataStreamer to load data into cache.
>
>             while (sc.hasNextLine()) {
>                         ct++;
>                         String line = sc.nextLine();
>                         String[] tokens = line.split(Constants.Delimter,
> -1);
>                         aASSOCIATED_PARTIES = new ASSOCIATED_PARTIES();
>
>                         aASSOCIATED_PARTIES.setASSOCIATED_PARTY_ID(tokens[
> 1]);
>                         aASSOCIATED_PARTIES.setUPDATEDBY(tokens[3]);
>
>
> streamer.perNodeBufferSize(Constants.perNodeBufferSize);
>
> streamer.perNodeParallelOperations(Constants.perNodeParallelOperations);
>
>                         streamer.addData(tokens[0], aASSOCIATED_PARTIES);
>            }
>
> As you mentioned, I have made sure that one DataStreamer for each client,
> after this change, it stopped failing.
> Any ways we can improve this performance.
>
> Thanks
> Naveen
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Re: Usage of DataStreamer for bulk loading

Posted by Naveen <na...@gmail.com>.
Hi DH

I am not using any custom streamReciever, my requirement is very simple.
Have huge data in CSV, reading line by line and parsing the line and
populating the POJO and using the DataStreamer to load data into cache.

	    while (sc.hasNextLine()) {
		    	ct++;
		        String line = sc.nextLine();
			String[] tokens = line.split(Constants.Delimter, -1);
	            	aASSOCIATED_PARTIES = new ASSOCIATED_PARTIES();
	            	
	            	aASSOCIATED_PARTIES.setASSOCIATED_PARTY_ID(tokens[1]);
	            	aASSOCIATED_PARTIES.setUPDATEDBY(tokens[3]);
	            	
                       
streamer.perNodeBufferSize(Constants.perNodeBufferSize);
	            
streamer.perNodeParallelOperations(Constants.perNodeParallelOperations);

	            	streamer.addData(tokens[0], aASSOCIATED_PARTIES);
           }

As you mentioned, I have made sure that one DataStreamer for each client,
after this change, it stopped failing. 
Any ways we can improve this performance.

Thanks
Naveen



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Usage of DataStreamer for bulk loading

Posted by David Harvey <dh...@jobcase.com>.
You can give it a Map<K,V> also, but that will not be substantially
faster.   Because the data streamer routes by affinity key,  has to process
each record, and route it to the buffer for the node with that key.     The
data streamer will collect 8192 records on the client node for each server
node before sending.   If you write a custom StreamReceiver, what you get
on the receiving side is a Map with 8192 elements.      Note that map
contains records whose affinity keys for the node, and it is not broken up
by partition.

Considerations:
- use a separate DataStreamer per client thread, otherwise the flush gets
complicated.
- If you use a custom StreamReceiver, the stream receiver is serialized
once when the stream is set up,  but the serialized form is sent in every
buffer.     Put it in its own class and limit the amount of data that will
be serialized with it.  I actually had an IgniteCache  reference in the
class, which cause the instance to serialize to 100s of KB, which wasd
getting sent each time.
- The default for peer class loading is SHARED.   If you change the
StreamReceiver code, you have to shut down all of the "master" (i.e.,,
client) nodes to get it to replace the class, apparently even if the other
clients did not load that specific class.

-DH



On Wed, Mar 7, 2018 at 8:48 AM, Naveen <na...@gmail.com> wrote:

> HI
>
> We are using Ignite 2.3
> We have requirement to migrate the data from existing in-memory solution to
> Ignite, its one time migration we should be doing
>
> We have data available in CSV with a delimiter, we have split for the
> source
> files into multiple chunks and each thread processing one file.
> Here is the code which read the file line by line and call dataStreamer.add
> method.
>
>
>                     while (sc.hasNextLine()) {
>                         ct++;
>                         String line = sc.nextLine();
>                         //System.out.println(line);
>                         String[] tokens = line.split(Constants.Delimter,
> -1);
>                        //System.out.println("No of tokens " +
> toekns.length);
>                         //PARTY_ID~ASSOCIATED_PARTY_
> ID~UPDATEDDATETIME~UPDATEDBY
>                         aASSOCIATED_PARTIES = new ASSOCIATED_PARTIES();
>                         aASSOCIATED_PARTIES.setPARTY_ID(tokens[0]);
>                         aASSOCIATED_PARTIES.setASSOCIATED_PARTY_ID(tokens[
> 1]);
>                         aASSOCIATED_PARTIES.setUPDATEDBY(tokens[3]);
>                         aASSOCIATED_PARTIES.setUPDATEDDATETIME(new
> Timestamp(System.currentTimeMillis()));
>
>                         streamer.perNodeBufferSize(
> Constants.perNodeBufferSize);
>
> streamer.perNodeParallelOperations(Constants.perNodeParallelOperations);
>
>                         streamer.addData(tokens[0], aASSOCIATED_PARTIES);
>                         if (ct > 0 && ct % 10000 == 0)
>                              System.out.println("Done: " + ct);
>          }
>
> My question is, how do I call streamer.addData(tokens[0],
> aASSOCIATED_PARTIES) for a batch instead of calling for every record. Guess
> the way I have used is calling this for every line thats read from the
> file.
> It should be called for lets say every 1000 records once, start
> accumulating
> all the 1000 records and add it to cache 1000 records in batch.
> How could we do this, batch wise or time wise, like every minute it should
> write.
>
> Thanks
> Naveen
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Disclaimer

The information contained in this communication from the sender is confidential. It is intended solely for use by the recipient and others authorized to receive it. If you are not the recipient, you are hereby notified that any disclosure, copying, distribution or taking action in relation of the contents of this information is strictly prohibited and may be unlawful.

This email has been scanned for viruses and malware, and may have been automatically archived by Mimecast Ltd, an innovator in Software as a Service (SaaS) for business. Providing a safer and more useful place for your human generated data. Specializing in; Security, archiving and compliance. To find out more visit the Mimecast website.