You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Telco Phone <te...@yahoo.com> on 2017/03/22 02:41:15 UTC

Threading issue

I am looking to get  readers from kafka / keyBy and Sink working with all 60 threads.
For the most part it is working correctly
DataStream<SchemaRecord> stream =env.addSource(new FlinkKafkaConsumer08<>("kafkatopic", schema, properties)).setParallelism(60).flatMap(new SchemaRecordSplit()).setParallelism(60).name("RawAdActivity splitter").keyBy("partition","threadNumber","schemaId");
stream.addSink(new CustomMaprFsSink()).setParallelism(60).name("RawAdActivity Sink");
However I only get about 24-30 sinks writing data

Now the kafka payload I am reading is based on time / schema so to help out I put in a random number generator and group by it as well so that it will "try" to force 60 sinks receiving data and writing to HDFS.
Any thoughts with the above code that I can somehow "help" it make sure that during the hour for the most part I should have all 60 reading / sorting / Sinking (writing to file system)

Re: Threading issue

Posted by Robert Metzger <rm...@apache.org>.
Hi,

how many unique combinations of your key
"partition","threadNumber","schemaId" exist?
In my opinion, all sinks should receive data if there are enough different
keys.

On Wed, Mar 22, 2017 at 3:41 AM, Telco Phone <te...@yahoo.com> wrote:

> I am looking to get  readers from kafka / keyBy and Sink working with all
> 60 threads.
>
> For the most part it is working correctly
>
> DataStream<SchemaRecord> stream =
> env.addSource(new FlinkKafkaConsumer08<>("kafkatopic", schema, properties)
> ).setParallelism(60).flatMap(new SchemaRecordSplit()).setParallelism(60).
> name("RawAdActivity splitter").keyBy("partition","
> threadNumber","schemaId");
>
> stream.addSink(new CustomMaprFsSink()).setParallelism(60).name("RawAdActivity
> Sink");
>
> However I only get about 24-30 sinks writing data
>
>
> Now the kafka payload I am reading is based on time / schema so to help
> out I put in a random number generator and group by it as well so that it
> will "try" to force 60 sinks receiving data and writing to HDFS.
>
> Any thoughts with the above code that I can somehow "help" it make sure
> that during the hour for the most part I should have all 60 reading /
> sorting / Sinking (writing to file system)
>
>