You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by AndreaKinn <ki...@hotmail.it> on 2017/09/08 14:32:10 UTC

Assigning operators to slots

Hi, firstly excuse me for the long post.
I already read the documentation about parallelism, slots and the API about
it but I still have some doubts about practical implementations of them.
My program is composed essentially by three operations:

- get data from a kafka source
- perform a machine learning operator on the retrieved stream
- push out data to a cassandra sink

I'd like to investigate and trying implement them in two different
situations:


1) FIRST ONE

Imagine I have a single dual core physical node and suppose I set
NumberOfTaskSlot = NumberOfCore (As suggested by the doc). 

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t985/tonabble.png> 

I suppose I can divide in a fixed way the operations into slots as described
in the figure. Is this possible?
Can I do that using slotSharingGroup(groupname) method ? Or have I to use
startNewChain() between the operator?
Example:

*DataStream<MyEvent> stream = env
				.addSource(new FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(),
properties))
				.assignTimestampsAndWatermarks(new CustomTimestampExtractor())
				.map(...)
				.slotSharingGroup("source");*

or

*DataStream<MyEvent> stream = env
				.addSource(new FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(),
properties))
.startNewChain()
				.assignTimestampsAndWatermarks(new CustomTimestampExtractor())
.startNewChain()
				.map(...);
				*


2) SECOND ONE

Imagine I have 3 dual core physical nodes.

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t985/tonabble2.png> 

I suppose I can reserve one physical NODE for each operation. Is this
possible?
In this case honestly I don't know how to implement that at level code.
Moreover, I don't know if it would has sense set NumberTaskSlot =
NumberOfCores or to leave this option to Flink's choice.








--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Assigning operators to slots

Posted by Elias Levy <fe...@gmail.com>.
The execution within the IDE is most likely not loading the flink-conf.yaml
file to read the configuration.  When run from the IDE you get a
LocalStreamEnvironment, which starts a LocalFlinkMiniCluster.
LocalStreamEnvironment is created by
StreamExecutionEnvironment.createLocalEnvironment without passing it any
configuration.  So none
of StreamExecutionEnvironment., LocalStreamEnvironment,
and LocalFlinkMiniCluster try to read the config file.

This makes it difficult to test certain Flink features from within the IDE,
as some configuration properties can't be set programmatically.  For
instance, you can't configure the external checkpoint URL in code.  It can
only be yet in the config file.  That means you can't run a job that turns
on external checkpoints from within the IDE.

Ideally one of these components would try load the config file when
executing locally.  You could then point it to the config file via
the FLINK_CONF_DIR environment variable.


On Fri, Sep 8, 2017 at 8:47 AM, AndreaKinn <ki...@hotmail.it> wrote:

> UPDATE:
>
> I'm trying to implement the version with one node and two task slots on my
> laptop. I have also in configured flink-conf.yaml the key:
>
> taskmanager.numberOfTaskSlots: 2
>
> but when I execute my program in the IDE:
>
> /org.apache.flink.runtime.jobmanager.scheduler.
> NoResourceAvailableException:
> Not enough free slots available to run the job. You can decrease the
> operator parallelism or increase the number of slots per TaskManager in the
> configuration. /
>
> parallelism is set 1.
>
> Which could be the problem?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Assigning operators to slots

Posted by AndreaKinn <ki...@hotmail.it>.
UPDATE:

I'm trying to implement the version with one node and two task slots on my
laptop. I have also in configured flink-conf.yaml the key: 

taskmanager.numberOfTaskSlots: 2

but when I execute my program in the IDE:

/org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Not enough free slots available to run the job. You can decrease the
operator parallelism or increase the number of slots per TaskManager in the
configuration. /

parallelism is set 1.

Which could be the problem?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Assigning operators to slots

Posted by AndreaKinn <ki...@hotmail.it>.
Nice, thank you for reply.

So if I call slotSharedGroup(groupname) on the last operator as here:

DataStream<MyEvent> stream = env 
 .addSource(new FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(), 
 properties)) 
 .assignTimestampsAndWatermarks(new CustomTimestampExtractor()) 
 .map(...) 
 .slotSharingGroup("source");

it is applied to all the previous operator right? Or I have to call it after
each operator?
i.e I want that addSource, assignTimestamp and map reside on the same slot.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Assigning operators to slots

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

For the first question, I think both approaches should work. You only have to be careful about startNewChain() because the behaviour can be somewhat unexpected. What it does is specify, that a new chain should be started with the operator on which you call startNewChain(). For example, in:

DataStream input = ...

input
  .map().name("map1")
  .map().name("map2")
  .startNewChain()
  .map().name("map3")

You will have one chain ("map1") and a second chain ("map2", "map3").

For the second question, I think to make sure that each operator is on a separate machine you would have to set the number of slots to 1. This way you get 3 slots and if you set the resource group or chaining right you will have each operator on a different slot.

Best,
Aljoscha

> On 8. Sep 2017, at 16:32, AndreaKinn <ki...@hotmail.it> wrote:
> 
> Hi, firstly excuse me for the long post.
> I already read the documentation about parallelism, slots and the API about
> it but I still have some doubts about practical implementations of them.
> My program is composed essentially by three operations:
> 
> - get data from a kafka source
> - perform a machine learning operator on the retrieved stream
> - push out data to a cassandra sink
> 
> I'd like to investigate and trying implement them in two different
> situations:
> 
> 
> 1) FIRST ONE
> 
> Imagine I have a single dual core physical node and suppose I set
> NumberOfTaskSlot = NumberOfCore (As suggested by the doc). 
> 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t985/tonabble.png> 
> 
> I suppose I can divide in a fixed way the operations into slots as described
> in the figure. Is this possible?
> Can I do that using slotSharingGroup(groupname) method ? Or have I to use
> startNewChain() between the operator?
> Example:
> 
> *DataStream<MyEvent> stream = env
> 				.addSource(new FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(),
> properties))
> 				.assignTimestampsAndWatermarks(new CustomTimestampExtractor())
> 				.map(...)
> 				.slotSharingGroup("source");*
> 
> or
> 
> *DataStream<MyEvent> stream = env
> 				.addSource(new FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(),
> properties))
> .startNewChain()
> 				.assignTimestampsAndWatermarks(new CustomTimestampExtractor())
> .startNewChain()
> 				.map(...);
> 				*
> 
> 
> 2) SECOND ONE
> 
> Imagine I have 3 dual core physical nodes.
> 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t985/tonabble2.png> 
> 
> I suppose I can reserve one physical NODE for each operation. Is this
> possible?
> In this case honestly I don't know how to implement that at level code.
> Moreover, I don't know if it would has sense set NumberTaskSlot =
> NumberOfCores or to leave this option to Flink's choice.
> 
> 
> 
> 
> 
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/