You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Stephen Darlington <st...@gridgain.com> on 2023/03/01 09:40:23 UTC

Re: Performance of data stream on 3 cluster node.

You might want to check the data distribution. You can use control.sh —cache distribution to do that.

> On 28 Feb 2023, at 20:32, John Smith <ja...@gmail.com> wrote:
> 
> The last thing I can add to clarify is, the 3 node cluster is a centralized cluster and the CSV loader is a thick client running on its own machine.
> 
> On Tue, Feb 28, 2023 at 2:52 PM John Smith <java.dev.mtl@gmail.com <ma...@gmail.com>> wrote:
>> Btw when I run a query like SELECT COLUMN_2, COUNT(COLUMN_1) FROM MY_TABLE GROUP BY COLUMN_2; The query runs full tilt 100% on all 3 nodes and returns in a respectable manager.
>> 
>> So not sure whats going on but with the data streamer I guess most of the writes are pushed to THE ONE node mostly and the others are busy making the backups or the network to push/back up can't keep up?
>> The same behaviour happens with replicated table when using the data, one node seems to be running almost 100% while the others hover at 40-50%
>> The fastest I could get the streamer to work is to turn off backups, but same thing, one node runs full tilt while the others are "slowish"
>> 
>> Queries are ok, all nodes are fully utilized.
>> 
>> On Tue, Feb 28, 2023 at 12:54 PM John Smith <java.dev.mtl@gmail.com <ma...@gmail.com>> wrote:
>>> Hi so I'm using it in a pretty straight forward kind of way at least I think...
>>> 
>>> I'm loading 35 million lines from CSV to an SQL table. Decided to use streamer as I figured it would still be allot faster than batching SQL INSERTS.
>>> I tried with backup=0 and backup=1 (Prefer to have backup on)
>>> 1- With 0 backups: 6 minutes to load
>>> 2- With 1 backups: 15 minutes to load.
>>> 
>>> In both cases I still see the same behaviour, the 1 machine seems to be taking the brunt of the work...
>>> 
>>> I'm reading a CSV file line by line and doing streamer.add()
>>> 
>>> The table definition is as follows...
>>> CREATE TABLE PUBLIC.MY_TABLE (
>>>     COLUMN_1 VARCHAR(32) NOT NULL,
>>>     COLUMN_2 VARCHAR(64) NOT NULL,
>>>     CONSTRAINT PHONE_CARRIER_IDS_PK PRIMARY KEY (COLUMN_1)
>>> ) with "template=parallelTpl, backups=0, key_type=String, value_type=MyObject";
>>> CREATE INDEX MY_TABLE_COLUMN_2_IDX ON PUBLIC.MY_TABLE (COLUMN_2);
>>> 
>>>         String fileName = "my_file";
>>> 
>>>         final String cacheNameDest = "MY_TABLE";
>>> 
>>>         try(
>>>                 Ignite igniteDest = configIgnite(Arrays.asList("...:47500..47509", "...:47500..47509", "...:47500..47509"), "ignite-dest");
>>>                 IgniteCache<BinaryObject, BinaryObject> cacheDest = igniteDest.getOrCreateCache(cacheNameDest).withKeepBinary();
>>>                 IgniteDataStreamer<BinaryObject, BinaryObject> streamer = igniteDest.dataStreamer(cacheNameDest);
>>>         ) {
>>>             System.out.println("Ignite started.");
>>>             long start = System.currentTimeMillis();
>>> 
>>>             System.out.println("Cache size: " + cacheDest.size(CachePeekMode.PRIMARY));
>>>             System.out.println("Default");
>>>             System.out.println("1d");
>>>             
>>>             IgniteBinary binaryDest = igniteDest.binary();
>>> 
>>>             try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
>>>                 int count = 0;
>>> 
>>>                 String line;
>>>                 while ((line = br.readLine()) != null) {
>>> 
>>>                     String[] parts = line.split("\\|");
>>> 
>>>                     BinaryObjectBuilder keyBuilder = binaryDest.builder("String");
>>>                     keyBuilder.setField("COLUMN_1", parts[1], String.class);
>>>                     BinaryObjectBuilder valueBuilder = binaryDest.builder("PhoneCarrier");
>>>                     valueBuilder.setField("COLUMN_2", parts[3], String.class);
>>> 
>>>                     streamer.addData(keyBuilder.build(), valueBuilder.build());
>>> 
>>>                     count++;
>>>                     
>>>                     if ((count % 10000) == 0) {
>>>                         System.out.println(count);
>>>                     }
>>>                 }
>>>                 streamer.flush();
>>>                 long end = System.currentTimeMillis();
>>>                 System.out.println("Ms: " + (end - start));
>>>             } catch (IOException e) {
>>>                 e.printStackTrace();
>>>             }
>>>         }
>>> 
>>> On Tue, Feb 28, 2023 at 11:00 AM Jeremy McMillan <jeremy.mcmillan@gridgain.com <ma...@gridgain.com>> wrote:
>>>> Have you tried tracing the workload on the 100% and 40% nodes for comparison? There just isn't enough detail in your question to help predict what should be happening with the cluster workload. For a starting point, please identify your design goals. It's easy to get confused by advice that seeks to help you do something you don't want to do.
>>>> 
>>>> Some things to think about include how the stream workload is composed. How should/would this work if there were only one node? How should behavior change as nodes are added to the topology and the test is repeated?
>>>> 
>>>> Gedanken: what if the data streamer is doing some really expensive operations as it feeds the data into the stream, but the nodes can very cheaply put the processed data into their cache partitions? In this case, for example, the expensive operations should be refactored into a stream transformer that will move the workload from the stream sender to the stream receivers. https://ignite.apache.org/docs/latest/data-streaming#stream-transformer
>>>> 
>>>> Also gedanken: what if the data distribution is skewed such that one node gets more data than 2x the data sent to other partitions because of affinity? In this case, for example, changes to affinity/colocation design or changes to cluster topology (more nodes with greater CPU to RAM ratio?) can help distribute the load so that no single node becomes a bottleneck.
>>>> 
>>>> On Tue, Feb 28, 2023 at 9:27 AM John Smith <java.dev.mtl@gmail.com <ma...@gmail.com>> wrote:
>>>>> Hi I'm using the data streamer to insert into a 3 cluster node. I have noticed that 1 node is pegging at 100% cpu while the others are at 40ish %.
>>>>> 
>>>>> Is that normal?
>>>>> 
>>>>> 


Re: Performance of data stream on 3 cluster node.

Posted by Stephen Darlington <st...@gridgain.com>.
This is a great blog that explains how data is distributed in an Ignite cluster:

https://www.gridgain.com/resources/blog/data-distribution-in-apache-ignite
Data Distribution in Apache Ignite
gridgain.com


> On 1 Mar 2023, at 18:40, John Smith <ja...@gmail.com> wrote:
> 
> My key is  phone_number and they are all unique... I'll check with the command...
> 
> On Wed., Mar. 1, 2023, 11:20 a.m. Stephen Darlington, <stephen.darlington@gridgain.com <ma...@gridgain.com>> wrote:
>> The streamer doesn’t determine where the data goes. It just efficiently sends it to the correct place. 
>> 
>> If your data is skewed in some way so that there is more data in some partitions than others, then you could find one machine with more work to do than others. All else being equal, you’ll also get better distribution with more than three nodes.
>> 
>>> On 1 Mar 2023, at 15:45, John Smith <java.dev.mtl@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Ok thanks. I just thought the streamer would be more uniform.
>>> 
>>> On Wed, Mar 1, 2023 at 4:41 AM Stephen Darlington <stephen.darlington@gridgain.com <ma...@gridgain.com>> wrote:
>>>> You might want to check the data distribution. You can use control.sh —cache distribution to do that.
>>>> 
>>>>> On 28 Feb 2023, at 20:32, John Smith <java.dev.mtl@gmail.com <ma...@gmail.com>> wrote:
>>>>> 
>>>>> The last thing I can add to clarify is, the 3 node cluster is a centralized cluster and the CSV loader is a thick client running on its own machine.
>>>>> 
>>>>> On Tue, Feb 28, 2023 at 2:52 PM John Smith <java.dev.mtl@gmail.com <ma...@gmail.com>> wrote:
>>>>>> Btw when I run a query like SELECT COLUMN_2, COUNT(COLUMN_1) FROM MY_TABLE GROUP BY COLUMN_2; The query runs full tilt 100% on all 3 nodes and returns in a respectable manager.
>>>>>> 
>>>>>> So not sure whats going on but with the data streamer I guess most of the writes are pushed to THE ONE node mostly and the others are busy making the backups or the network to push/back up can't keep up?
>>>>>> The same behaviour happens with replicated table when using the data, one node seems to be running almost 100% while the others hover at 40-50%
>>>>>> The fastest I could get the streamer to work is to turn off backups, but same thing, one node runs full tilt while the others are "slowish"
>>>>>> 
>>>>>> Queries are ok, all nodes are fully utilized.
>>>>>> 
>>>>>> On Tue, Feb 28, 2023 at 12:54 PM John Smith <java.dev.mtl@gmail.com <ma...@gmail.com>> wrote:
>>>>>>> Hi so I'm using it in a pretty straight forward kind of way at least I think...
>>>>>>> 
>>>>>>> I'm loading 35 million lines from CSV to an SQL table. Decided to use streamer as I figured it would still be allot faster than batching SQL INSERTS.
>>>>>>> I tried with backup=0 and backup=1 (Prefer to have backup on)
>>>>>>> 1- With 0 backups: 6 minutes to load
>>>>>>> 2- With 1 backups: 15 minutes to load.
>>>>>>> 
>>>>>>> In both cases I still see the same behaviour, the 1 machine seems to be taking the brunt of the work...
>>>>>>> 
>>>>>>> I'm reading a CSV file line by line and doing streamer.add()
>>>>>>> 
>>>>>>> The table definition is as follows...
>>>>>>> CREATE TABLE PUBLIC.MY_TABLE (
>>>>>>>     COLUMN_1 VARCHAR(32) NOT NULL,
>>>>>>>     COLUMN_2 VARCHAR(64) NOT NULL,
>>>>>>>     CONSTRAINT PHONE_CARRIER_IDS_PK PRIMARY KEY (COLUMN_1)
>>>>>>> ) with "template=parallelTpl, backups=0, key_type=String, value_type=MyObject";
>>>>>>> CREATE INDEX MY_TABLE_COLUMN_2_IDX ON PUBLIC.MY_TABLE (COLUMN_2);
>>>>>>> 
>>>>>>>         String fileName = "my_file";
>>>>>>> 
>>>>>>>         final String cacheNameDest = "MY_TABLE";
>>>>>>> 
>>>>>>>         try(
>>>>>>>                 Ignite igniteDest = configIgnite(Arrays.asList("...:47500..47509", "...:47500..47509", "...:47500..47509"), "ignite-dest");
>>>>>>>                 IgniteCache<BinaryObject, BinaryObject> cacheDest = igniteDest.getOrCreateCache(cacheNameDest).withKeepBinary();
>>>>>>>                 IgniteDataStreamer<BinaryObject, BinaryObject> streamer = igniteDest.dataStreamer(cacheNameDest);
>>>>>>>         ) {
>>>>>>>             System.out.println("Ignite started.");
>>>>>>>             long start = System.currentTimeMillis();
>>>>>>> 
>>>>>>>             System.out.println("Cache size: " + cacheDest.size(CachePeekMode.PRIMARY));
>>>>>>>             System.out.println("Default");
>>>>>>>             System.out.println("1d");
>>>>>>>             
>>>>>>>             IgniteBinary binaryDest = igniteDest.binary();
>>>>>>> 
>>>>>>>             try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
>>>>>>>                 int count = 0;
>>>>>>> 
>>>>>>>                 String line;
>>>>>>>                 while ((line = br.readLine()) != null) {
>>>>>>> 
>>>>>>>                     String[] parts = line.split("\\|");
>>>>>>> 
>>>>>>>                     BinaryObjectBuilder keyBuilder = binaryDest.builder("String");
>>>>>>>                     keyBuilder.setField("COLUMN_1", parts[1], String.class);
>>>>>>>                     BinaryObjectBuilder valueBuilder = binaryDest.builder("PhoneCarrier");
>>>>>>>                     valueBuilder.setField("COLUMN_2", parts[3], String.class);
>>>>>>> 
>>>>>>>                     streamer.addData(keyBuilder.build(), valueBuilder.build());
>>>>>>> 
>>>>>>>                     count++;
>>>>>>>                     
>>>>>>>                     if ((count % 10000) == 0) {
>>>>>>>                         System.out.println(count);
>>>>>>>                     }
>>>>>>>                 }
>>>>>>>                 streamer.flush();
>>>>>>>                 long end = System.currentTimeMillis();
>>>>>>>                 System.out.println("Ms: " + (end - start));
>>>>>>>             } catch (IOException e) {
>>>>>>>                 e.printStackTrace();
>>>>>>>             }
>>>>>>>         }
>>>>>>> 
>>>>>>> On Tue, Feb 28, 2023 at 11:00 AM Jeremy McMillan <jeremy.mcmillan@gridgain.com <ma...@gridgain.com>> wrote:
>>>>>>>> Have you tried tracing the workload on the 100% and 40% nodes for comparison? There just isn't enough detail in your question to help predict what should be happening with the cluster workload. For a starting point, please identify your design goals. It's easy to get confused by advice that seeks to help you do something you don't want to do.
>>>>>>>> 
>>>>>>>> Some things to think about include how the stream workload is composed. How should/would this work if there were only one node? How should behavior change as nodes are added to the topology and the test is repeated?
>>>>>>>> 
>>>>>>>> Gedanken: what if the data streamer is doing some really expensive operations as it feeds the data into the stream, but the nodes can very cheaply put the processed data into their cache partitions? In this case, for example, the expensive operations should be refactored into a stream transformer that will move the workload from the stream sender to the stream receivers. https://ignite.apache.org/docs/latest/data-streaming#stream-transformer
>>>>>>>> 
>>>>>>>> Also gedanken: what if the data distribution is skewed such that one node gets more data than 2x the data sent to other partitions because of affinity? In this case, for example, changes to affinity/colocation design or changes to cluster topology (more nodes with greater CPU to RAM ratio?) can help distribute the load so that no single node becomes a bottleneck.
>>>>>>>> 
>>>>>>>> On Tue, Feb 28, 2023 at 9:27 AM John Smith <java.dev.mtl@gmail.com <ma...@gmail.com>> wrote:
>>>>>>>>> Hi I'm using the data streamer to insert into a 3 cluster node. I have noticed that 1 node is pegging at 100% cpu while the others are at 40ish %.
>>>>>>>>> 
>>>>>>>>> Is that normal?
>>>>>>>>> 
>>>>>>>>> 
>>>> 
>> 


Re: Performance of data stream on 3 cluster node.

Posted by John Smith <ja...@gmail.com>.
My key is  phone_number and they are all unique... I'll check with the
command...

On Wed., Mar. 1, 2023, 11:20 a.m. Stephen Darlington, <
stephen.darlington@gridgain.com> wrote:

> The streamer doesn’t determine where the data goes. It just efficiently
> sends it to the correct place.
>
> If your data is skewed in some way so that there is more data in some
> partitions than others, then you could find one machine with more work to
> do than others. All else being equal, you’ll also get better distribution
> with more than three nodes.
>
> On 1 Mar 2023, at 15:45, John Smith <ja...@gmail.com> wrote:
>
> Ok thanks. I just thought the streamer would be more uniform.
>
> On Wed, Mar 1, 2023 at 4:41 AM Stephen Darlington <
> stephen.darlington@gridgain.com> wrote:
>
>> You might want to check the data distribution. You can use control.sh
>> —cache distribution to do that.
>>
>> On 28 Feb 2023, at 20:32, John Smith <ja...@gmail.com> wrote:
>>
>> The last thing I can add to clarify is, the 3 node cluster is a
>> centralized cluster and the CSV loader is a thick client running on its own
>> machine.
>>
>> On Tue, Feb 28, 2023 at 2:52 PM John Smith <ja...@gmail.com>
>> wrote:
>>
>>> Btw when I run a query like SELECT COLUMN_2, COUNT(COLUMN_1) FROM
>>> MY_TABLE GROUP BY COLUMN_2; The query runs full tilt 100% on all 3 nodes
>>> and returns in a respectable manager.
>>>
>>> So not sure whats going on but with the data streamer I guess most of
>>> the writes are pushed to THE ONE node mostly and the others are busy making
>>> the backups or the network to push/back up can't keep up?
>>> The same behaviour happens with replicated table when using the data,
>>> one node seems to be running almost 100% while the others hover at 40-50%
>>> The fastest I could get the streamer to work is to turn off backups, but
>>> same thing, one node runs full tilt while the others are "slowish"
>>>
>>> Queries are ok, all nodes are fully utilized.
>>>
>>> On Tue, Feb 28, 2023 at 12:54 PM John Smith <ja...@gmail.com>
>>> wrote:
>>>
>>>> Hi so I'm using it in a pretty straight forward kind of way at least I
>>>> think...
>>>>
>>>> I'm loading 35 million lines from CSV to an SQL table. Decided to use
>>>> streamer as I figured it would still be allot faster than batching SQL
>>>> INSERTS.
>>>> I tried with backup=0 and backup=1 (Prefer to have backup on)
>>>> 1- With 0 backups: 6 minutes to load
>>>> 2- With 1 backups: 15 minutes to load.
>>>>
>>>> In both cases I still see the same behaviour, the 1 machine seems to be
>>>> taking the brunt of the work...
>>>>
>>>> I'm reading a CSV file line by line and doing streamer.add()
>>>>
>>>> The table definition is as follows...
>>>> CREATE TABLE PUBLIC.MY_TABLE (
>>>>     COLUMN_1 VARCHAR(32) NOT NULL,
>>>>     COLUMN_2 VARCHAR(64) NOT NULL,
>>>>     CONSTRAINT PHONE_CARRIER_IDS_PK PRIMARY KEY (COLUMN_1)
>>>> ) with "template=parallelTpl, backups=0, key_type=String,
>>>> value_type=MyObject";
>>>> CREATE INDEX MY_TABLE_COLUMN_2_IDX ON PUBLIC.MY_TABLE (COLUMN_2);
>>>>
>>>>         String fileName = "my_file";
>>>>
>>>>         final String cacheNameDest = "MY_TABLE";
>>>>
>>>>         try(
>>>>                 Ignite igniteDest =
>>>> configIgnite(Arrays.asList("...:47500..47509", "...:47500..47509",
>>>> "...:47500..47509"), "ignite-dest");
>>>>                 IgniteCache<BinaryObject, BinaryObject> cacheDest =
>>>> igniteDest.getOrCreateCache(cacheNameDest).withKeepBinary();
>>>>                 IgniteDataStreamer<BinaryObject, BinaryObject> streamer
>>>> = igniteDest.dataStreamer(cacheNameDest);
>>>>         ) {
>>>>             System.out.println("Ignite started.");
>>>>             long start = System.currentTimeMillis();
>>>>
>>>>             System.out.println("Cache size: " +
>>>> cacheDest.size(CachePeekMode.PRIMARY));
>>>>             System.out.println("Default");
>>>>             System.out.println("1d");
>>>>
>>>>             IgniteBinary binaryDest = igniteDest.binary();
>>>>
>>>>             try (BufferedReader br = new BufferedReader(new
>>>> FileReader(fileName))) {
>>>>                 int count = 0;
>>>>
>>>>                 String line;
>>>>                 while ((line = br.readLine()) != null) {
>>>>
>>>>                     String[] parts = line.split("\\|");
>>>>
>>>>                     BinaryObjectBuilder keyBuilder =
>>>> binaryDest.builder("String");
>>>>                     keyBuilder.setField("COLUMN_1", parts[1],
>>>> String.class);
>>>>                     BinaryObjectBuilder valueBuilder =
>>>> binaryDest.builder("PhoneCarrier");
>>>>                     valueBuilder.setField("COLUMN_2", parts[3],
>>>> String.class);
>>>>
>>>>                     streamer.addData(keyBuilder.build(),
>>>> valueBuilder.build());
>>>>
>>>>                     count++;
>>>>
>>>>                     if ((count % 10000) == 0) {
>>>>                         System.out.println(count);
>>>>                     }
>>>>                 }
>>>>                 streamer.flush();
>>>>                 long end = System.currentTimeMillis();
>>>>                 System.out.println("Ms: " + (end - start));
>>>>             } catch (IOException e) {
>>>>                 e.printStackTrace();
>>>>             }
>>>>         }
>>>>
>>>> On Tue, Feb 28, 2023 at 11:00 AM Jeremy McMillan <
>>>> jeremy.mcmillan@gridgain.com> wrote:
>>>>
>>>>> Have you tried tracing the workload on the 100% and 40% nodes for
>>>>> comparison? There just isn't enough detail in your question to help predict
>>>>> what should be happening with the cluster workload. For a starting point,
>>>>> please identify your design goals. It's easy to get confused by advice that
>>>>> seeks to help you do something you don't want to do.
>>>>>
>>>>> Some things to think about include how the stream workload is
>>>>> composed. How should/would this work if there were only one node? How
>>>>> should behavior change as nodes are added to the topology and the test is
>>>>> repeated?
>>>>>
>>>>> Gedanken: what if the data streamer is doing some really expensive
>>>>> operations as it feeds the data into the stream, but the nodes can very
>>>>> cheaply put the processed data into their cache partitions? In this case,
>>>>> for example, the expensive operations should be refactored into a stream
>>>>> transformer that will move the workload from the stream sender to the
>>>>> stream receivers.
>>>>> https://ignite.apache.org/docs/latest/data-streaming#stream-transformer
>>>>>
>>>>> Also gedanken: what if the data distribution is skewed such that one
>>>>> node gets more data than 2x the data sent to other partitions because of
>>>>> affinity? In this case, for example, changes to affinity/colocation design
>>>>> or changes to cluster topology (more nodes with greater CPU to RAM ratio?)
>>>>> can help distribute the load so that no single node becomes a bottleneck.
>>>>>
>>>>> On Tue, Feb 28, 2023 at 9:27 AM John Smith <ja...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi I'm using the data streamer to insert into a 3 cluster node. I
>>>>>> have noticed that 1 node is pegging at 100% cpu while the others are at
>>>>>> 40ish %.
>>>>>>
>>>>>> Is that normal?
>>>>>>
>>>>>>
>>>>>>
>>
>

Re: Performance of data stream on 3 cluster node.

Posted by Stephen Darlington <st...@gridgain.com>.
The streamer doesn’t determine where the data goes. It just efficiently sends it to the correct place. 

If your data is skewed in some way so that there is more data in some partitions than others, then you could find one machine with more work to do than others. All else being equal, you’ll also get better distribution with more than three nodes.

> On 1 Mar 2023, at 15:45, John Smith <ja...@gmail.com> wrote:
> 
> Ok thanks. I just thought the streamer would be more uniform.
> 
> On Wed, Mar 1, 2023 at 4:41 AM Stephen Darlington <stephen.darlington@gridgain.com <ma...@gridgain.com>> wrote:
>> You might want to check the data distribution. You can use control.sh —cache distribution to do that.
>> 
>>> On 28 Feb 2023, at 20:32, John Smith <java.dev.mtl@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> The last thing I can add to clarify is, the 3 node cluster is a centralized cluster and the CSV loader is a thick client running on its own machine.
>>> 
>>> On Tue, Feb 28, 2023 at 2:52 PM John Smith <java.dev.mtl@gmail.com <ma...@gmail.com>> wrote:
>>>> Btw when I run a query like SELECT COLUMN_2, COUNT(COLUMN_1) FROM MY_TABLE GROUP BY COLUMN_2; The query runs full tilt 100% on all 3 nodes and returns in a respectable manager.
>>>> 
>>>> So not sure whats going on but with the data streamer I guess most of the writes are pushed to THE ONE node mostly and the others are busy making the backups or the network to push/back up can't keep up?
>>>> The same behaviour happens with replicated table when using the data, one node seems to be running almost 100% while the others hover at 40-50%
>>>> The fastest I could get the streamer to work is to turn off backups, but same thing, one node runs full tilt while the others are "slowish"
>>>> 
>>>> Queries are ok, all nodes are fully utilized.
>>>> 
>>>> On Tue, Feb 28, 2023 at 12:54 PM John Smith <java.dev.mtl@gmail.com <ma...@gmail.com>> wrote:
>>>>> Hi so I'm using it in a pretty straight forward kind of way at least I think...
>>>>> 
>>>>> I'm loading 35 million lines from CSV to an SQL table. Decided to use streamer as I figured it would still be allot faster than batching SQL INSERTS.
>>>>> I tried with backup=0 and backup=1 (Prefer to have backup on)
>>>>> 1- With 0 backups: 6 minutes to load
>>>>> 2- With 1 backups: 15 minutes to load.
>>>>> 
>>>>> In both cases I still see the same behaviour, the 1 machine seems to be taking the brunt of the work...
>>>>> 
>>>>> I'm reading a CSV file line by line and doing streamer.add()
>>>>> 
>>>>> The table definition is as follows...
>>>>> CREATE TABLE PUBLIC.MY_TABLE (
>>>>>     COLUMN_1 VARCHAR(32) NOT NULL,
>>>>>     COLUMN_2 VARCHAR(64) NOT NULL,
>>>>>     CONSTRAINT PHONE_CARRIER_IDS_PK PRIMARY KEY (COLUMN_1)
>>>>> ) with "template=parallelTpl, backups=0, key_type=String, value_type=MyObject";
>>>>> CREATE INDEX MY_TABLE_COLUMN_2_IDX ON PUBLIC.MY_TABLE (COLUMN_2);
>>>>> 
>>>>>         String fileName = "my_file";
>>>>> 
>>>>>         final String cacheNameDest = "MY_TABLE";
>>>>> 
>>>>>         try(
>>>>>                 Ignite igniteDest = configIgnite(Arrays.asList("...:47500..47509", "...:47500..47509", "...:47500..47509"), "ignite-dest");
>>>>>                 IgniteCache<BinaryObject, BinaryObject> cacheDest = igniteDest.getOrCreateCache(cacheNameDest).withKeepBinary();
>>>>>                 IgniteDataStreamer<BinaryObject, BinaryObject> streamer = igniteDest.dataStreamer(cacheNameDest);
>>>>>         ) {
>>>>>             System.out.println("Ignite started.");
>>>>>             long start = System.currentTimeMillis();
>>>>> 
>>>>>             System.out.println("Cache size: " + cacheDest.size(CachePeekMode.PRIMARY));
>>>>>             System.out.println("Default");
>>>>>             System.out.println("1d");
>>>>>             
>>>>>             IgniteBinary binaryDest = igniteDest.binary();
>>>>> 
>>>>>             try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
>>>>>                 int count = 0;
>>>>> 
>>>>>                 String line;
>>>>>                 while ((line = br.readLine()) != null) {
>>>>> 
>>>>>                     String[] parts = line.split("\\|");
>>>>> 
>>>>>                     BinaryObjectBuilder keyBuilder = binaryDest.builder("String");
>>>>>                     keyBuilder.setField("COLUMN_1", parts[1], String.class);
>>>>>                     BinaryObjectBuilder valueBuilder = binaryDest.builder("PhoneCarrier");
>>>>>                     valueBuilder.setField("COLUMN_2", parts[3], String.class);
>>>>> 
>>>>>                     streamer.addData(keyBuilder.build(), valueBuilder.build());
>>>>> 
>>>>>                     count++;
>>>>>                     
>>>>>                     if ((count % 10000) == 0) {
>>>>>                         System.out.println(count);
>>>>>                     }
>>>>>                 }
>>>>>                 streamer.flush();
>>>>>                 long end = System.currentTimeMillis();
>>>>>                 System.out.println("Ms: " + (end - start));
>>>>>             } catch (IOException e) {
>>>>>                 e.printStackTrace();
>>>>>             }
>>>>>         }
>>>>> 
>>>>> On Tue, Feb 28, 2023 at 11:00 AM Jeremy McMillan <jeremy.mcmillan@gridgain.com <ma...@gridgain.com>> wrote:
>>>>>> Have you tried tracing the workload on the 100% and 40% nodes for comparison? There just isn't enough detail in your question to help predict what should be happening with the cluster workload. For a starting point, please identify your design goals. It's easy to get confused by advice that seeks to help you do something you don't want to do.
>>>>>> 
>>>>>> Some things to think about include how the stream workload is composed. How should/would this work if there were only one node? How should behavior change as nodes are added to the topology and the test is repeated?
>>>>>> 
>>>>>> Gedanken: what if the data streamer is doing some really expensive operations as it feeds the data into the stream, but the nodes can very cheaply put the processed data into their cache partitions? In this case, for example, the expensive operations should be refactored into a stream transformer that will move the workload from the stream sender to the stream receivers. https://ignite.apache.org/docs/latest/data-streaming#stream-transformer
>>>>>> 
>>>>>> Also gedanken: what if the data distribution is skewed such that one node gets more data than 2x the data sent to other partitions because of affinity? In this case, for example, changes to affinity/colocation design or changes to cluster topology (more nodes with greater CPU to RAM ratio?) can help distribute the load so that no single node becomes a bottleneck.
>>>>>> 
>>>>>> On Tue, Feb 28, 2023 at 9:27 AM John Smith <java.dev.mtl@gmail.com <ma...@gmail.com>> wrote:
>>>>>>> Hi I'm using the data streamer to insert into a 3 cluster node. I have noticed that 1 node is pegging at 100% cpu while the others are at 40ish %.
>>>>>>> 
>>>>>>> Is that normal?
>>>>>>> 
>>>>>>> 
>> 


Re: Performance of data stream on 3 cluster node.

Posted by John Smith <ja...@gmail.com>.
Ok thanks. I just thought the streamer would be more uniform.

On Wed, Mar 1, 2023 at 4:41 AM Stephen Darlington <
stephen.darlington@gridgain.com> wrote:

> You might want to check the data distribution. You can use control.sh
> —cache distribution to do that.
>
> On 28 Feb 2023, at 20:32, John Smith <ja...@gmail.com> wrote:
>
> The last thing I can add to clarify is, the 3 node cluster is a
> centralized cluster and the CSV loader is a thick client running on its own
> machine.
>
> On Tue, Feb 28, 2023 at 2:52 PM John Smith <ja...@gmail.com> wrote:
>
>> Btw when I run a query like SELECT COLUMN_2, COUNT(COLUMN_1) FROM
>> MY_TABLE GROUP BY COLUMN_2; The query runs full tilt 100% on all 3 nodes
>> and returns in a respectable manager.
>>
>> So not sure whats going on but with the data streamer I guess most of the
>> writes are pushed to THE ONE node mostly and the others are busy making the
>> backups or the network to push/back up can't keep up?
>> The same behaviour happens with replicated table when using the data, one
>> node seems to be running almost 100% while the others hover at 40-50%
>> The fastest I could get the streamer to work is to turn off backups, but
>> same thing, one node runs full tilt while the others are "slowish"
>>
>> Queries are ok, all nodes are fully utilized.
>>
>> On Tue, Feb 28, 2023 at 12:54 PM John Smith <ja...@gmail.com>
>> wrote:
>>
>>> Hi so I'm using it in a pretty straight forward kind of way at least I
>>> think...
>>>
>>> I'm loading 35 million lines from CSV to an SQL table. Decided to use
>>> streamer as I figured it would still be allot faster than batching SQL
>>> INSERTS.
>>> I tried with backup=0 and backup=1 (Prefer to have backup on)
>>> 1- With 0 backups: 6 minutes to load
>>> 2- With 1 backups: 15 minutes to load.
>>>
>>> In both cases I still see the same behaviour, the 1 machine seems to be
>>> taking the brunt of the work...
>>>
>>> I'm reading a CSV file line by line and doing streamer.add()
>>>
>>> The table definition is as follows...
>>> CREATE TABLE PUBLIC.MY_TABLE (
>>>     COLUMN_1 VARCHAR(32) NOT NULL,
>>>     COLUMN_2 VARCHAR(64) NOT NULL,
>>>     CONSTRAINT PHONE_CARRIER_IDS_PK PRIMARY KEY (COLUMN_1)
>>> ) with "template=parallelTpl, backups=0, key_type=String,
>>> value_type=MyObject";
>>> CREATE INDEX MY_TABLE_COLUMN_2_IDX ON PUBLIC.MY_TABLE (COLUMN_2);
>>>
>>>         String fileName = "my_file";
>>>
>>>         final String cacheNameDest = "MY_TABLE";
>>>
>>>         try(
>>>                 Ignite igniteDest =
>>> configIgnite(Arrays.asList("...:47500..47509", "...:47500..47509",
>>> "...:47500..47509"), "ignite-dest");
>>>                 IgniteCache<BinaryObject, BinaryObject> cacheDest =
>>> igniteDest.getOrCreateCache(cacheNameDest).withKeepBinary();
>>>                 IgniteDataStreamer<BinaryObject, BinaryObject> streamer
>>> = igniteDest.dataStreamer(cacheNameDest);
>>>         ) {
>>>             System.out.println("Ignite started.");
>>>             long start = System.currentTimeMillis();
>>>
>>>             System.out.println("Cache size: " +
>>> cacheDest.size(CachePeekMode.PRIMARY));
>>>             System.out.println("Default");
>>>             System.out.println("1d");
>>>
>>>             IgniteBinary binaryDest = igniteDest.binary();
>>>
>>>             try (BufferedReader br = new BufferedReader(new
>>> FileReader(fileName))) {
>>>                 int count = 0;
>>>
>>>                 String line;
>>>                 while ((line = br.readLine()) != null) {
>>>
>>>                     String[] parts = line.split("\\|");
>>>
>>>                     BinaryObjectBuilder keyBuilder =
>>> binaryDest.builder("String");
>>>                     keyBuilder.setField("COLUMN_1", parts[1],
>>> String.class);
>>>                     BinaryObjectBuilder valueBuilder =
>>> binaryDest.builder("PhoneCarrier");
>>>                     valueBuilder.setField("COLUMN_2", parts[3],
>>> String.class);
>>>
>>>                     streamer.addData(keyBuilder.build(),
>>> valueBuilder.build());
>>>
>>>                     count++;
>>>
>>>                     if ((count % 10000) == 0) {
>>>                         System.out.println(count);
>>>                     }
>>>                 }
>>>                 streamer.flush();
>>>                 long end = System.currentTimeMillis();
>>>                 System.out.println("Ms: " + (end - start));
>>>             } catch (IOException e) {
>>>                 e.printStackTrace();
>>>             }
>>>         }
>>>
>>> On Tue, Feb 28, 2023 at 11:00 AM Jeremy McMillan <
>>> jeremy.mcmillan@gridgain.com> wrote:
>>>
>>>> Have you tried tracing the workload on the 100% and 40% nodes for
>>>> comparison? There just isn't enough detail in your question to help predict
>>>> what should be happening with the cluster workload. For a starting point,
>>>> please identify your design goals. It's easy to get confused by advice that
>>>> seeks to help you do something you don't want to do.
>>>>
>>>> Some things to think about include how the stream workload is composed.
>>>> How should/would this work if there were only one node? How should behavior
>>>> change as nodes are added to the topology and the test is repeated?
>>>>
>>>> Gedanken: what if the data streamer is doing some really expensive
>>>> operations as it feeds the data into the stream, but the nodes can very
>>>> cheaply put the processed data into their cache partitions? In this case,
>>>> for example, the expensive operations should be refactored into a stream
>>>> transformer that will move the workload from the stream sender to the
>>>> stream receivers.
>>>> https://ignite.apache.org/docs/latest/data-streaming#stream-transformer
>>>>
>>>> Also gedanken: what if the data distribution is skewed such that one
>>>> node gets more data than 2x the data sent to other partitions because of
>>>> affinity? In this case, for example, changes to affinity/colocation design
>>>> or changes to cluster topology (more nodes with greater CPU to RAM ratio?)
>>>> can help distribute the load so that no single node becomes a bottleneck.
>>>>
>>>> On Tue, Feb 28, 2023 at 9:27 AM John Smith <ja...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi I'm using the data streamer to insert into a 3 cluster node. I have
>>>>> noticed that 1 node is pegging at 100% cpu while the others are at 40ish %.
>>>>>
>>>>> Is that normal?
>>>>>
>>>>>
>>>>>
>