You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Emmanuel <el...@msn.com> on 2015/03/12 18:30:28 UTC

Flink logs only written to one host

Hello,
I'm using a 3 nodes (3VMs) cluster, 3CPUs each, parallelism of 9, I usually only see taskmanager.out logs generated only on one of the 3 nodes when I use the System.out.println() method, to print debug info in my main processing function.
Is this expected? Or am I just doing something wrong? I stream from a socket with socketTextStream; I understand that this job runs on a single process, and I see that in the UI (using one slot only), but the computation task runs on 9 slots. That task includes the System.out.println() statement, yet it only shows on one host's .out log folder. The host is not always the same, so I have to tail all logs on all hosts, but I'm surprised of this behavior.Am I just missing something? Are 'print' statement to stdout aggregated on one host somehow? If so how is this controlled? Why would that host change?
I would love to understand what is going on, and if maybe somehow the 9 slots may be running on a single host which would defeat the purpose.
Thanks for the insight
Emmanuel 		 	   		  

RE: Flink logs only written to one host

Posted by Stephan Ewen <se...@apache.org>.
Hi Emmanuel!

I think there is currently no way to tell the scheduler to pick the "least
loaded host". That would be feature we would need to add to Flink.

If you want, open a feature request, and give us some more information on
how you would expect this to behave.

Greetings,
Stephan
 Am 12.03.2015 21:08 schrieb "Emmanuel" <el...@msn.com>:

> Thanks for the clarification Guyla,
>
> I had 9 slots per node, however one node only has 3 CPUs.
> So, my parallelism here was 9, and all tasks were allocated to the 9 slots
> on the one host
> I understand the strategy of trying to minimize network IOs by sending to
> the same host, but in this case where the number of slots exceeds the
> number of CPUs, the strategy seems to fail to distribute the load across
> the 9 'available' CPUs.
>
> My thinking is having more than 1 slots per CPU is that my tasks are not
> homogeneous, so one task may receive data, while another is waiting for
> more data. So I may have many jobs running, with only some being actually
> used because the data determines what job is being used. In that case,
> ideally I want the working jobs distributed accross available CPUs, not
> available slots on the closest host.
> Is there a way to achieve that?
>
>
> ------------------------------
> Date: Thu, 12 Mar 2015 20:28:29 +0100
> Subject: Re: Flink logs only written to one host
> From: gyfora@apache.org
> To: user@flink.apache.org
>
> Hey,
>
> Let me clarify some things regarding distribute(). You should only specify
> a partitioning scheme like distribute or shuffle or groupby in cases when
> it actually matters for you how the data is partitioned across operator
> instances.
>
> By default forwarding is applied which works in the following way:
>
> When the output parallelism (source in this case) is the same as the
> receiver parallelism (map) the data is forwarded to the task that is on the
> same task manager which reduces network costs. This also allows chaining of
> subtasks which fully eliminates the serialization costs.
>
> When the output parallelism is smaller or larger (in your case 1 as the
> socket is non parallel) it applies a round robin scheme, same as distribute.
>
> So in your case you dont have to call distribute you will get the same
> result without it.
>
> Cheers,
> Gyula
>
> On Thu, Mar 12, 2015 at 8:17 PM, Emmanuel <el...@msn.com> wrote:
>
> Hi,
>
> I did change my config to have parallelism of 9 and 3 slots on each
> machine and now it does distribute properly.
>
> The other day I was told i could have many more slots than CPUs available
> and the system would distribute the load properly between the hosts with
> available CPU time, but it doesn't seem to be the case here:
> with 9 slots on each hosts, the distribute partitioning still send all 9
> tasks to the 9 slots on the same machine even though it only has 3CPUs
> available and the other 6 CPUs on the other machines were not running
> anything.
>
> The idea of having more slots than CPUs to me is that you can spin out
> more jobs and they'll be balanced as data comes... but with few jobs it
> seems odd that the task are all attributed to the same host just because it
> has slots available.
>
> Anyways, that fixed that issue for me for now.. .looking forward to learning
> more about Flink
>
> ------------------------------
> Date: Thu, 12 Mar 2015 20:00:32 +0100
> Subject: RE: Flink logs only written to one host
> From: fhueske@gmail.com
> To: user@flink.apache.org
>
>
> Can you check the JM log file how many slots are available?
>
> Slots are configured per TM. If you configure 9 slots and 3 TMs you end up
> with 27 slots, 9 on each TM.
> On Mar 12, 2015 7:55 PM, "Emmanuel" <el...@msn.com> wrote:
>
> It appears actually that the slots used are all on the same host.
> My guess is because I am using the default partitioning method (forward,
> which defaults to the same host)
>
> However I now tried .shuffle() and .distribute() without any luck:
>
> I have a
>
> DataStream<String> text = env.socketTextStream(inHostName, inPort);
>
> this is the one socket input stream.
>
> Adding text.distribute().map(...)
>
> does not seem to distribute the *.map()* process on the other hosts.
>
> Is this the correct way to use *.distribute()* on a stream input?
>
> Thanks
>
> Emmanuel
>
>
> ------------------------------
> From: eleroy@msn.com
> To: user@flink.apache.org
> Subject: Flink logs only written to one host
> Date: Thu, 12 Mar 2015 17:30:28 +0000
>
> Hello,
>
> I'm using a 3 nodes (3VMs) cluster, 3CPUs each, parallelism of 9,
> I usually only see taskmanager.out logs generated only on one of the 3
> nodes when I use the System.out.println() method, to print debug info in my
> main processing function.
>
> Is this expected? Or am I just doing something wrong?
> I stream from a socket with socketTextStream; I understand that this job
> runs on a single process, and I see that in the UI (using one slot only),
> but the computation task runs on 9 slots. That task includes the System.out.println()
> statement, yet it only shows on one host's .out log folder.
> The host is not always the same, so I have to tail all logs on all hosts,
> but I'm surprised of this behavior.
> Am I just missing something?
> Are 'print' statement to stdout aggregated on one host somehow? If so how
> is this controlled? Why would that host change?
>
> I would love to understand what is going on, and if maybe somehow the 9
> slots may be running on a single host which would defeat the purpose.
>
> Thanks for the insight
>
> Emmanuel
>
>
>

RE: Flink logs only written to one host

Posted by Emmanuel <el...@msn.com>.
Thanks for the clarification Guyla,
I had 9 slots per node, however one node only has 3 CPUs.So, my parallelism here was 9, and all tasks were allocated to the 9 slots on the one host
I understand the strategy of trying to minimize network IOs by sending to the same host, but in this case where the number of slots exceeds the number of CPUs, the strategy seems to fail to distribute the load across the 9 'available' CPUs.
My thinking is having more than 1 slots per CPU is that my tasks are not homogeneous, so one task may receive data, while another is waiting for more data. So I may have many jobs running, with only some being actually used because the data determines what job is being used. In that case, ideally I want the working jobs distributed accross available CPUs, not available slots on the closest host.Is there a way to achieve that?

Date: Thu, 12 Mar 2015 20:28:29 +0100
Subject: Re: Flink logs only written to one host
From: gyfora@apache.org
To: user@flink.apache.org

Hey,
Let me clarify some things regarding distribute(). You should only specify a partitioning scheme like distribute or shuffle or groupby in cases when it actually matters for you how the data is partitioned across operator instances. 
By default forwarding is applied which works in the following way:
When the output parallelism (source in this case) is the same as the receiver parallelism (map) the data is forwarded to the task that is on the same task manager which reduces network costs. This also allows chaining of subtasks which fully eliminates the serialization costs.
When the output parallelism is smaller or larger (in your case 1 as the socket is non parallel) it applies a round robin scheme, same as distribute.
So in your case you dont have to call distribute you will get the same result without it.
Cheers,Gyula
On Thu, Mar 12, 2015 at 8:17 PM, Emmanuel <el...@msn.com> wrote:



Hi,
I did change my config to have parallelism of 9 and 3 slots on each machine and now it does distribute properly.
The other day I was told i could have many more slots than CPUs available and the system would distribute the load properly between the hosts with available CPU time, but it doesn't seem to be the case here:with 9 slots on each hosts, the distribute partitioning still send all 9 tasks to the 9 slots on the same machine even though it only has 3CPUs available and the other 6 CPUs on the other machines were not running anything.
The idea of having more slots than CPUs to me is that you can spin out more jobs and they'll be balanced as data comes... but with few jobs it seems odd that the task are all attributed to the same host just because it has slots available.
Anyways, that fixed that issue for me for now.. .looking forward to learning more about Flink
Date: Thu, 12 Mar 2015 20:00:32 +0100
Subject: RE: Flink logs only written to one host
From: fhueske@gmail.com
To: user@flink.apache.org

Can you check the JM log file how many slots are available?
Slots are configured per TM. If you configure 9 slots and 3 TMs you end up with 27 slots, 9 on each TM.
On Mar 12, 2015 7:55 PM, "Emmanuel" <el...@msn.com> wrote:



It appears actually that the slots used are all on the same host.My guess is because I am using the default partitioning method (forward, which defaults to the same host) 
However I now tried .shuffle() and .distribute() without any luck:
I have a DataStream<String> text = env.socketTextStream(inHostName, inPort);
this is the one socket input stream.Adding text.distribute().map(...)does not seem to distribute the .map() process on the other hosts.Is this the correct way to use .distribute() on a stream input? ThanksEmmanuel
From: eleroy@msn.com
To: user@flink.apache.org
Subject: Flink logs only written to one host
Date: Thu, 12 Mar 2015 17:30:28 +0000




Hello,
I'm using a 3 nodes (3VMs) cluster, 3CPUs each, parallelism of 9, I usually only see taskmanager.out logs generated only on one of the 3 nodes when I use the System.out.println() method, to print debug info in my main processing function.
Is this expected? Or am I just doing something wrong? I stream from a socket with socketTextStream; I understand that this job runs on a single process, and I see that in the UI (using one slot only), but the computation task runs on 9 slots. That task includes the System.out.println() statement, yet it only shows on one host's .out log folder. The host is not always the same, so I have to tail all logs on all hosts, but I'm surprised of this behavior.Am I just missing something? Are 'print' statement to stdout aggregated on one host somehow? If so how is this controlled? Why would that host change?
I would love to understand what is going on, and if maybe somehow the 9 slots may be running on a single host which would defeat the purpose.
Thanks for the insight
Emmanuel 		 	   		   		 	   		  
 		 	   		  

 		 	   		  

Re: Flink logs only written to one host

Posted by Gyula Fóra <gy...@apache.org>.
Hey,

Let me clarify some things regarding distribute(). You should only specify
a partitioning scheme like distribute or shuffle or groupby in cases when
it actually matters for you how the data is partitioned across operator
instances.

By default forwarding is applied which works in the following way:

When the output parallelism (source in this case) is the same as the
receiver parallelism (map) the data is forwarded to the task that is on the
same task manager which reduces network costs. This also allows chaining of
subtasks which fully eliminates the serialization costs.

When the output parallelism is smaller or larger (in your case 1 as the
socket is non parallel) it applies a round robin scheme, same as distribute.

So in your case you dont have to call distribute you will get the same
result without it.

Cheers,
Gyula

On Thu, Mar 12, 2015 at 8:17 PM, Emmanuel <el...@msn.com> wrote:

> Hi,
>
> I did change my config to have parallelism of 9 and 3 slots on each
> machine and now it does distribute properly.
>
> The other day I was told i could have many more slots than CPUs available
> and the system would distribute the load properly between the hosts with
> available CPU time, but it doesn't seem to be the case here:
> with 9 slots on each hosts, the distribute partitioning still send all 9
> tasks to the 9 slots on the same machine even though it only has 3CPUs
> available and the other 6 CPUs on the other machines were not running
> anything.
>
> The idea of having more slots than CPUs to me is that you can spin out
> more jobs and they'll be balanced as data comes... but with few jobs it
> seems odd that the task are all attributed to the same host just because it
> has slots available.
>
> Anyways, that fixed that issue for me for now.. .looking forward to learning
> more about Flink
>
> ------------------------------
> Date: Thu, 12 Mar 2015 20:00:32 +0100
> Subject: RE: Flink logs only written to one host
> From: fhueske@gmail.com
> To: user@flink.apache.org
>
>
> Can you check the JM log file how many slots are available?
>
> Slots are configured per TM. If you configure 9 slots and 3 TMs you end up
> with 27 slots, 9 on each TM.
> On Mar 12, 2015 7:55 PM, "Emmanuel" <el...@msn.com> wrote:
>
> It appears actually that the slots used are all on the same host.
> My guess is because I am using the default partitioning method (forward,
> which defaults to the same host)
>
> However I now tried .shuffle() and .distribute() without any luck:
>
> I have a
>
> DataStream<String> text = env.socketTextStream(inHostName, inPort);
>
> this is the one socket input stream.
>
> Adding text.distribute().map(...)
>
> does not seem to distribute the *.map()* process on the other hosts.
>
> Is this the correct way to use *.distribute()* on a stream input?
>
> Thanks
>
> Emmanuel
>
>
> ------------------------------
> From: eleroy@msn.com
> To: user@flink.apache.org
> Subject: Flink logs only written to one host
> Date: Thu, 12 Mar 2015 17:30:28 +0000
>
> Hello,
>
> I'm using a 3 nodes (3VMs) cluster, 3CPUs each, parallelism of 9,
> I usually only see taskmanager.out logs generated only on one of the 3
> nodes when I use the System.out.println() method, to print debug info in my
> main processing function.
>
> Is this expected? Or am I just doing something wrong?
> I stream from a socket with socketTextStream; I understand that this job
> runs on a single process, and I see that in the UI (using one slot only),
> but the computation task runs on 9 slots. That task includes the System.out.println()
> statement, yet it only shows on one host's .out log folder.
> The host is not always the same, so I have to tail all logs on all hosts,
> but I'm surprised of this behavior.
> Am I just missing something?
> Are 'print' statement to stdout aggregated on one host somehow? If so how
> is this controlled? Why would that host change?
>
> I would love to understand what is going on, and if maybe somehow the 9
> slots may be running on a single host which would defeat the purpose.
>
> Thanks for the insight
>
> Emmanuel
>
>

RE: Flink logs only written to one host

Posted by Emmanuel <el...@msn.com>.
Hi,
I did change my config to have parallelism of 9 and 3 slots on each machine and now it does distribute properly.
The other day I was told i could have many more slots than CPUs available and the system would distribute the load properly between the hosts with available CPU time, but it doesn't seem to be the case here:with 9 slots on each hosts, the distribute partitioning still send all 9 tasks to the 9 slots on the same machine even though it only has 3CPUs available and the other 6 CPUs on the other machines were not running anything.
The idea of having more slots than CPUs to me is that you can spin out more jobs and they'll be balanced as data comes... but with few jobs it seems odd that the task are all attributed to the same host just because it has slots available.
Anyways, that fixed that issue for me for now.. .looking forward to learning more about Flink
Date: Thu, 12 Mar 2015 20:00:32 +0100
Subject: RE: Flink logs only written to one host
From: fhueske@gmail.com
To: user@flink.apache.org

Can you check the JM log file how many slots are available?
Slots are configured per TM. If you configure 9 slots and 3 TMs you end up with 27 slots, 9 on each TM.
On Mar 12, 2015 7:55 PM, "Emmanuel" <el...@msn.com> wrote:



It appears actually that the slots used are all on the same host.My guess is because I am using the default partitioning method (forward, which defaults to the same host) 
However I now tried .shuffle() and .distribute() without any luck:
I have a DataStream<String> text = env.socketTextStream(inHostName, inPort);
this is the one socket input stream.Adding text.distribute().map(...)does not seem to distribute the .map() process on the other hosts.Is this the correct way to use .distribute() on a stream input? ThanksEmmanuel
From: eleroy@msn.com
To: user@flink.apache.org
Subject: Flink logs only written to one host
Date: Thu, 12 Mar 2015 17:30:28 +0000




Hello,
I'm using a 3 nodes (3VMs) cluster, 3CPUs each, parallelism of 9, I usually only see taskmanager.out logs generated only on one of the 3 nodes when I use the System.out.println() method, to print debug info in my main processing function.
Is this expected? Or am I just doing something wrong? I stream from a socket with socketTextStream; I understand that this job runs on a single process, and I see that in the UI (using one slot only), but the computation task runs on 9 slots. That task includes the System.out.println() statement, yet it only shows on one host's .out log folder. The host is not always the same, so I have to tail all logs on all hosts, but I'm surprised of this behavior.Am I just missing something? Are 'print' statement to stdout aggregated on one host somehow? If so how is this controlled? Why would that host change?
I would love to understand what is going on, and if maybe somehow the 9 slots may be running on a single host which would defeat the purpose.
Thanks for the insight
Emmanuel 		 	   		   		 	   		  
 		 	   		  

RE: Flink logs only written to one host

Posted by Fabian Hueske <fh...@gmail.com>.
Can you check the JM log file how many slots are available?

Slots are configured per TM. If you configure 9 slots and 3 TMs you end up
with 27 slots, 9 on each TM.
On Mar 12, 2015 7:55 PM, "Emmanuel" <el...@msn.com> wrote:

> It appears actually that the slots used are all on the same host.
> My guess is because I am using the default partitioning method (forward,
> which defaults to the same host)
>
> However I now tried .shuffle() and .distribute() without any luck:
>
> I have a
>
> DataStream<String> text = env.socketTextStream(inHostName, inPort);
>
> this is the one socket input stream.
>
> Adding text.distribute().map(...)
>
> does not seem to distribute the *.map()* process on the other hosts.
>
> Is this the correct way to use *.distribute()* on a stream input?
>
> Thanks
>
> Emmanuel
>
>
> ------------------------------
> From: eleroy@msn.com
> To: user@flink.apache.org
> Subject: Flink logs only written to one host
> Date: Thu, 12 Mar 2015 17:30:28 +0000
>
> Hello,
>
> I'm using a 3 nodes (3VMs) cluster, 3CPUs each, parallelism of 9,
> I usually only see taskmanager.out logs generated only on one of the 3
> nodes when I use the System.out.println() method, to print debug info in my
> main processing function.
>
> Is this expected? Or am I just doing something wrong?
> I stream from a socket with socketTextStream; I understand that this job
> runs on a single process, and I see that in the UI (using one slot only),
> but the computation task runs on 9 slots. That task includes the System.out.println()
> statement, yet it only shows on one host's .out log folder.
> The host is not always the same, so I have to tail all logs on all hosts,
> but I'm surprised of this behavior.
> Am I just missing something?
> Are 'print' statement to stdout aggregated on one host somehow? If so how
> is this controlled? Why would that host change?
>
> I would love to understand what is going on, and if maybe somehow the 9
> slots may be running on a single host which would defeat the purpose.
>
> Thanks for the insight
>
> Emmanuel
>

RE: Flink logs only written to one host

Posted by Emmanuel <el...@msn.com>.
It appears actually that the slots used are all on the same host.My guess is because I am using the default partitioning method (forward, which defaults to the same host) 
However I now tried .shuffle() and .distribute() without any luck:
I have a DataStream<String> text = env.socketTextStream(inHostName, inPort);
this is the one socket input stream.Adding text.distribute().map(...)does not seem to distribute the .map() process on the other hosts.Is this the correct way to use .distribute() on a stream input? ThanksEmmanuel
From: eleroy@msn.com
To: user@flink.apache.org
Subject: Flink logs only written to one host
Date: Thu, 12 Mar 2015 17:30:28 +0000




Hello,
I'm using a 3 nodes (3VMs) cluster, 3CPUs each, parallelism of 9, I usually only see taskmanager.out logs generated only on one of the 3 nodes when I use the System.out.println() method, to print debug info in my main processing function.
Is this expected? Or am I just doing something wrong? I stream from a socket with socketTextStream; I understand that this job runs on a single process, and I see that in the UI (using one slot only), but the computation task runs on 9 slots. That task includes the System.out.println() statement, yet it only shows on one host's .out log folder. The host is not always the same, so I have to tail all logs on all hosts, but I'm surprised of this behavior.Am I just missing something? Are 'print' statement to stdout aggregated on one host somehow? If so how is this controlled? Why would that host change?
I would love to understand what is going on, and if maybe somehow the 9 slots may be running on a single host which would defeat the purpose.
Thanks for the insight
Emmanuel