You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Telco Phone <te...@yahoo.com> on 2017/04/18 13:38:16 UTC

Key by Task number

I am trying to use the task number as a keyby value to help fan out the work load reading from kafka.

Given:
        DataStream<SchemaRecord> stream =                env.addSource(new FlinkKafkaConsumer010<SchemaRecord>("topicA", schema, properties)                ).setParallelism(240).flatMap(new SchemaRecordSplit()).setParallelism(240).                        name("TopicA splitter").keyBy("partition", "keyByHelper", "schemaId");
        stream.addSink(new CustomMaprFsSink()).name("TopicA Sink").setParallelism(240);

In the DeserialClass I am trying to get to the 
getRuntimeContext().getIndexOfThisSubtask(); 

Which is only avaliable in the RichSinkFunction


The above is partition (by hour) , schemaID (avro schemaId) and I would like to add the task number so that all 240 readers / writers have something to do.
Any ideas ?



Re: Key by Task number

Posted by Telco Phone <te...@yahoo.com>.
Do you have a small example or a link to a doc that has one ?



      From: Chesnay Schepler <ch...@apache.org>
 To: user@flink.apache.org 
 Sent: Tuesday, April 18, 2017 8:16 AM
 Subject: Re: Key by Task number
   
 If the number of combinations between partition and schemaID is limited then the subtask index could actually improve the distribution of values.
 
 In any case, the easiest way to do this is to add a RichMapFunction after the flatMap, or modify the flatMap, to also include the subtask index.
 Typically this would be done by creating a Tuple2 containing the index and value.
 
 On 18.04.2017 15:43, Kamil Dziublinski wrote:
  
 I am not sure if you really need a keyby, your load will be distributed among your map function without it.  But could you explain a bit what is your sink doing? 
  
  As for setting parallelism on the consumer remember that you wont have higher parallelism than number of partitions in your topic. If you have 240 partitions that's fine, but if you have less than other subtasks will be idle. Only one task can read from one partition in parallel.  
  On Tue, Apr 18, 2017 at 3:38 PM Telco Phone <te...@yahoo.com> wrote:
  
   
  I am trying to use the task number as a keyby value to help fan out the work load reading from kafka. 
  
  Given: 
          DataStream<SchemaRecord> stream =                 env.addSource(new FlinkKafkaConsumer010<SchemaRecord>("topicA", schema, properties)                 ).setParallelism(240).flatMap(new SchemaRecordSplit()).setParallelism(240).                         name("TopicA splitter").keyBy("partition", "keyByHelper", "schemaId"); 
          stream.addSink(new CustomMaprFsSink()).name("TopicA Sink").setParallelism(240); 
  
  In the DeserialClass I am trying to get to the  
  getRuntimeContext().getIndexOfThisSubtask(); 
  
  Which is only avaliable in the RichSinkFunction 
  
  
  The above is partition (by hour) , schemaID (avro schemaId) and I would like to add the task number so that all 240 readers / writers have something to do. 
  Any ideas ? 
  
  
    
  
 
  

   

Re: Key by Task number

Posted by Chesnay Schepler <ch...@apache.org>.
If the number of combinations between partition and schemaID is limited 
then the subtask index could actually improve the distribution of values.

In any case, the easiest way to do this is to add a RichMapFunction 
after the flatMap, or modify the flatMap, to also include the subtask index.
Typically this would be done by creating a Tuple2 containing the index 
and value.

On 18.04.2017 15:43, Kamil Dziublinski wrote:
> I am not sure if you really need a keyby, your load will be 
> distributed among your map function without it.  But could you explain 
> a bit what is your sink doing?
>
>
> As for setting parallelism on the consumer remember that you wont have 
> higher parallelism than number of partitions in your topic.
> If you have 240 partitions that's fine, but if you have less than 
> other subtasks will be idle. Only one task can read from one partition 
> in parallel.
>
> On Tue, Apr 18, 2017 at 3:38 PM Telco Phone <telco5@yahoo.com 
> <ma...@yahoo.com>> wrote:
>
>
>     I am trying to use the task number as a keyby value to help fan
>     out the work load reading from kafka.
>
>
>     Given:
>
>            DataStream<SchemaRecord> stream =
>                     env.addSource(new
>     FlinkKafkaConsumer010<SchemaRecord>("topicA", schema, properties)
>                   ).setParallelism(240).flatMap(new
>     SchemaRecordSplit()).setParallelism(240).
>                           name("TopicA splitter").keyBy("partition",
>     "keyByHelper", "schemaId");
>
>           stream.addSink(new CustomMaprFsSink()).name("TopicA
>     Sink").setParallelism(240);
>
>
>     In the DeserialClass I am trying to get to the
>
>     getRuntimeContext().getIndexOfThisSubtask();
>
>     Which is only avaliable in the RichSinkFunction
>
>
>
>     The above is partition (by hour) , schemaID (avro schemaId) and I
>     would like to add the task number so that all 240 readers /
>     writers have something to do.
>
>     Any ideas ?
>
>
>


Re: Key by Task number

Posted by Kamil Dziublinski <ka...@gmail.com>.
I am not sure if you really need a keyby, your load will be distributed
among your map function without it.  But could you explain a bit what is
your sink doing?


As for setting parallelism on the consumer remember that you wont have
higher parallelism than number of partitions in your topic.
If you have 240 partitions that's fine, but if you have less than other
subtasks will be idle. Only one task can read from one partition in
parallel.

On Tue, Apr 18, 2017 at 3:38 PM Telco Phone <te...@yahoo.com> wrote:

>
> I am trying to use the task number as a keyby value to help fan out the
> work load reading from kafka.
>
>
> Given:
>
>        DataStream<SchemaRecord> stream =
>                 env.addSource(new
> FlinkKafkaConsumer010<SchemaRecord>("topicA", schema, properties)
>                 ).setParallelism(240).flatMap(new
> SchemaRecordSplit()).setParallelism(240).
>                         name("TopicA splitter").keyBy("partition",
> "keyByHelper", "schemaId");
>
>         stream.addSink(new CustomMaprFsSink()).name("TopicA
> Sink").setParallelism(240);
>
>
> In the DeserialClass I am trying to get to the
>
> getRuntimeContext().getIndexOfThisSubtask();
>
> Which is only avaliable in the RichSinkFunction
>
>
>
> The above is partition (by hour) , schemaID (avro schemaId) and I would
> like to add the task number so that all 240 readers / writers have
> something to do.
>
> Any ideas ?
>
>
>
>