You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sanne de Roever <sa...@gmail.com> on 2017/10/11 12:52:34 UTC

Decouple Kafka partitions and Flink parallelism for ordered streams

Hi,

Currently we need 75 Kafka partitions per topic and a parallelism of 75 to
meet required performance, increasing the partitions and parallelism gives
diminished returns

Currently the performance is approx. 1500 msg/s per core, having one
pipeline (source, map, sink) deployed as one instance per core.

The Kafka source performance is not an issue. The map is very heavy
(deserialization, validation) on rather complex Avro messages. Object reuse
is enabled.

Ideally we would like to decouple Flink processing parallelism from Kafka
partitions in a following manner:

   - Pick a source parallelism
   - Per source, be able to pick a parallelism for the following map
   - In such a way that some message key determines which -local- map
   instance gets a message from a certain visitor
   - So that messages with the same visitor key get processed by the same
   map and in order for that visitor
   - Output the result to Kafka

AFAIK keyBy, partitionCustom will distribute messages over the network and
rescale has no affinity for message identity.

Am I missing something obvious?

Cheers,

Sanne

Fwd: Decouple Kafka partitions and Flink parallelism for ordered streams

Posted by Sanne de Roever <sa...@gmail.com>.
Hi Chesnay,

/** Fowarding this to group, I mistakingly replied to you directly
previously, apologies */

The side output option works in combination with setting slot sharing
groups. For reference I have included a source file. The job takes three
slots. One slot for input handling, and one slot for two maps each. In this
setup the messages are still processed in order per sub-stream.

In the web client the slots are not really visible, except for the used
slots count. A possible feature could be to color the tasks according to
the slot groups they are on.

This opens up new possibilities, and more importantly decouples Kafka
configuration from Flink configuration. Thank you very much for your input!

Cheers,

Sanne

package org.example

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

/**
  * This example shows an implementation of WordCount with data from a
text socket.
  * To run the example make sure that the service providing the text
data is already up and running.
  *
  * To start an example socket text stream on your local machine run
netcat from a command line,
  * where the parameter specifies the port number:
  *
  * {{{
  *   nc -lk 9999
  * }}}
  *
  * Usage:
  * {{{
  *   SocketTextStreamWordCount <hostname> <port> <output path>
  * }}}
  *
  * This example shows how to:
  *
  *   - use StreamExecutionEnvironment.socketTextStream
  *   - write a simple Flink Streaming program in scala.
  *   - write and use user-defined functions.
  */
object SocketTextStreamWordCount {

  def main(args: Array[String]) {
    if (args.length != 2) {
      System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>")
      return
    }

    val hostName = args(0)
    val port = args(1).toInt
    val outputTag1 = OutputTag[String]("side-1")
    val outputTag2 = OutputTag[String]("side-2")

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.getConfig.enableObjectReuse()

    //Create streams for names and ages by mapping the inputs to the
corresponding objects
    val text = env.socketTextStream(hostName,
port).slotSharingGroup("processElement")
    val counts = text.flatMap {
      _.toLowerCase.split("\\W+") filter {
        _.nonEmpty
      }
    }
      .process(new ProcessFunction[String, String] {
        override def processElement(
                                     value: String,
                                     ctx: ProcessFunction[String,
String]#Context,
                                     out: Collector[String]): Unit = {
          if (value.head <= 'm') ctx.output(outputTag1, String.valueOf(value))
          else ctx.output(outputTag2, String.valueOf(value))
        }
      })

    val sideOutputStream1: DataStream[String] = counts.getSideOutput(outputTag1)
    val sideOutputStream2: DataStream[String] = counts.getSideOutput(outputTag2)

    val output1 = sideOutputStream1.map {
      (_, 1)
    }.slotSharingGroup("map1")
      .keyBy(0)
      .sum(1)

    val output2 = sideOutputStream2.map {
      (_, 1)
    }.slotSharingGroup("map2")
      .keyBy(0)
      .sum(1)

    output1.print()
    output2.print()

    env.execute("Scala SocketTextStreamWordCount Example")
  }

}



On Thu, Oct 12, 2017 at 12:09 PM, Sanne de Roever <sanne.de.roever@gmail.com
> wrote:

> Hi Chesnay,
>
> Thanks for confirming the challenge and putting in the time to help out;
> enlightening.
>
> I wasn't aware of the Async I/O API in Flink, that looks promising; there
> might be some deployment questions to balance the number of slots vs the
> concurrency on machines.
>
> The side-output is also promising: the concurrency remains transparant
> deploymenty wise.
>
> Cheers,
>
> Sanne
>
> On Wed, Oct 11, 2017 at 5:36 PM, Chesnay Schepler <ch...@apache.org>
> wrote:
>
>> I couldn't find a proper solution for this. The easiest solution might be
>> to use the Async I/O
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/asyncio.html>,
>> and do the validation
>> with an ExecutionService or similar in the map function.
>>
>> I've CC'd aljoscha, maybe he has another idea.
>>
>> The local partitioning solution is, theoretically, not impossible to do,
>> but it will not work with all sources and interact oddly with
>> checkpoints/savepoints when changing parallelism.
>>
>> Given a source parallelism S, and a map parallelism M, the idea is to
>> create S sub-plans,
>> each consisting of a distinct source and M map functions, and ensuring
>> that each runs
>> together (the latter part flink should already take care of).
>>
>> something like:
>>
>> for i in S:
>> 	source = createSeparateSource().setParallelism(1)
>> 	partitioned = source.partitionCustom(...)
>> 	partitions = []
>> 	for j in M:
>> 		partitions.add(partitioned.map(...).setParallelism(1).disableChaining())
>> 	union(partitions).write(...)
>>
>> This probably doesn't work with Kafka, since distinct kafka sources
>> cannot cooperate in distributing partitions AFAIK.
>> It also simply obliterates the concept of parallelism, which will make
>> modifications to the parallelism quite a pain when
>> checkpointing is enabled.
>>
>> I've written a sample job that uses side-outputs to do the partitioning
>> (since this was the first thing that came to mind),
>> attached below. Note that I essentially only wrote it to see what would
>> actually happen.
>>
>> public static void main(String[] args) throws Exception {
>>
>>    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();   List<DataStream<String>> sources = new ArrayList<>();   for (int x = 0; x < 6; x++) {
>>       sources.add(env.addSource(new SourceFunction<String>() {
>>          @Override         public void run(SourceContext<String> ctx) throws Exception {
>>             for (String w : WORDS) {
>>                ctx.collect(w);            }
>>             while(true) {
>>                Thread.sleep(5000);            }
>>          }
>>
>>          @Override         public void cancel() {
>>          }
>>       }));   }
>>
>>    int numMaps = 4;   for (int sourceIndex = 0; sourceIndex < sources.size(); sourceIndex++) {
>>
>>       DataStream<String> source = sources.get(sourceIndex);      List<OutputTag<String>> tags = new ArrayList<>(4);      for (int x = 0; x < numMaps; x++) {
>>          tags.add(new OutputTag<String>(sourceIndex + "-" + x) {
>>          });      }
>>
>>       SingleOutputStreamOperator<String> partitioned = source.process(new ProcessFunction<String, String>() {
>>          @Override         public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
>>             ctx.output(tags.get(value.hashCode() % tags.size()), value);         }
>>       });      List<DataStream<String>> toUnion = new ArrayList<>(tags.size());      for (OutputTag<String> tag : tags) {
>>          toUnion.add(partitioned.getSideOutput(tag)
>>             .map(new MapFunction<String, String>() {
>>                @Override               public String map(String value) throws Exception {
>>                   return tag.toString() + " - " + value;               }
>>             }).disableChaining());      }
>>
>>       DataStream<String> unionBase = toUnion.remove(0);      unionBase = unionBase.union(toUnion.toArray(new DataStream[0]));      unionBase.print();   }
>>
>>    // execute program   env.execute("Theory");
>>
>>
>> On 11.10.2017 16:31, Chesnay Schepler wrote:
>>
>> It is correct that keyBy and partition operations will distribute
>> messages over the network
>> as they distribute the data across all subtasks. For this use-case we
>> only want to consider
>> subtasks that are subsequent to our operator, like a local keyBy.
>>
>> I don't think there is an obvious way to implement it, but I'm currently
>> theory-crafting a bit
>> and will get back to you.
>>
>> On 11.10.2017 14:52, Sanne de Roever wrote:
>>
>> Hi,
>>
>> Currently we need 75 Kafka partitions per topic and a parallelism of 75
>> to meet required performance, increasing the partitions and parallelism
>> gives diminished returns
>>
>> Currently the performance is approx. 1500 msg/s per core, having one
>> pipeline (source, map, sink) deployed as one instance per core.
>>
>> The Kafka source performance is not an issue. The map is very heavy
>> (deserialization, validation) on rather complex Avro messages. Object reuse
>> is enabled.
>>
>> Ideally we would like to decouple Flink processing parallelism from Kafka
>> partitions in a following manner:
>>
>>    - Pick a source parallelism
>>    - Per source, be able to pick a parallelism for the following map
>>    - In such a way that some message key determines which -local- map
>>    instance gets a message from a certain visitor
>>    - So that messages with the same visitor key get processed by the
>>    same map and in order for that visitor
>>    - Output the result to Kafka
>>
>> AFAIK keyBy, partitionCustom will distribute messages over the network
>> and rescale has no affinity for message identity.
>>
>> Am I missing something obvious?
>>
>> Cheers,
>>
>> Sanne
>>
>>
>>
>>
>>
>>
>>
>

Re: Decouple Kafka partitions and Flink parallelism for ordered streams

Posted by Chesnay Schepler <ch...@apache.org>.
I couldn't find a proper solution for this. The easiest solution might 
be to use the Async I/O 
<https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/asyncio.html>, 
and do the validation
with an ExecutionService or similar in the map function.

I've CC'd aljoscha, maybe he has another idea.

The local partitioning solution is, theoretically, not impossible to do, 
but it will not work with all sources and interact oddly with 
checkpoints/savepoints when changing parallelism.

Given a source parallelism S, and a map parallelism M, the idea is to 
create S sub-plans,
each consisting of a distinct source and M map functions, and ensuring 
that each runs
together (the latter part flink should already take care of).

something like:

for i in S:
	source = createSeparateSource().setParallelism(1)
	partitioned = source.partitionCustom(...)
	partitions = []
	for j in M:
		partitions.add(partitioned.map(...).setParallelism(1).disableChaining())
	union(partitions).write(...)

This probably doesn't work with Kafka, since distinct kafka sources 
cannot cooperate in distributing partitions AFAIK.
It also simply obliterates the concept of parallelism, which will make 
modifications to the parallelism quite a pain when
checkpointing is enabled.

I've written a sample job that uses side-outputs to do the partitioning 
(since this was the first thing that came to mind),
attached below. Note that I essentially only wrote it to see what would 
actually happen.

public static void main(String[] args) throws Exception { final 
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(); 
List<DataStream<String>> sources = new ArrayList<>(); for (int x = 0; x 
< 6; x++) { sources.add(env.addSource(new SourceFunction<String>() { 
@Override public void run(SourceContext<String> ctx) throws Exception { 
for (String w : WORDS) { ctx.collect(w); } while(true) { 
Thread.sleep(5000); } } @Override public void cancel() { } })); } int 
numMaps = 4; for (int sourceIndex = 0; sourceIndex < sources.size(); 
sourceIndex++) { DataStream<String> source = sources.get(sourceIndex); 
List<OutputTag<String>> tags = new ArrayList<>(4); for (int x = 0; x < 
numMaps; x++) { tags.add(new OutputTag<String>(sourceIndex + "-" + x) { 
}); } SingleOutputStreamOperator<String> partitioned = 
source.process(new ProcessFunction<String, String>() { @Override public 
void processElement(String value, Context ctx, Collector<String> out) 
throws Exception { ctx.output(tags.get(value.hashCode() % tags.size()), 
value); } }); List<DataStream<String>> toUnion = new 
ArrayList<>(tags.size()); for (OutputTag<String> tag : tags) { 
toUnion.add(partitioned.getSideOutput(tag) .map(new MapFunction<String, 
String>() { @Override public String map(String value) throws Exception { 
return tag.toString() + " - " + value; } }).disableChaining()); } 
DataStream<String> unionBase = toUnion.remove(0); unionBase = 
unionBase.union(toUnion.toArray(new DataStream[0])); unionBase.print(); 
} // execute program env.execute("Theory");


On 11.10.2017 16:31, Chesnay Schepler wrote:
> It is correct that keyBy and partition operations will distribute 
> messages over the network
> as they distribute the data across all subtasks. For this use-case we 
> only want to consider
> subtasks that are subsequent to our operator, like a local keyBy.
>
> I don't think there is an obvious way to implement it, but I'm 
> currently theory-crafting a bit
> and will get back to you.
>
> On 11.10.2017 14:52, Sanne de Roever wrote:
>> Hi,
>>
>> Currently we need 75 Kafka partitions per topic and a parallelism of 
>> 75 to meet required performance, increasing the partitions and 
>> parallelism gives diminished returns
>>
>> Currently the performance is approx. 1500 msg/s per core, having one 
>> pipeline (source, map, sink) deployed as one instance per core.
>>
>> The Kafka source performance is not an issue. The map is very heavy 
>> (deserialization, validation) on rather complex Avro messages. Object 
>> reuse is enabled.
>>
>> Ideally we would like to decouple Flink processing parallelism from 
>> Kafka partitions in a following manner:
>>
>>   * Pick a source parallelism
>>   * Per source, be able to pick a parallelism for the following map
>>   * In such a way that some message key determines which -local- map
>>     instance gets a message from a certain visitor
>>   * So that messages with the same visitor key get processed by the
>>     same map and in order for that visitor
>>   * Output the result to Kafka
>>
>> AFAIK keyBy, partitionCustom will distribute messages over the 
>> network and rescale has no affinity for message identity.
>>
>> Am I missing something obvious?
>>
>> Cheers,
>>
>> Sanne
>>
>>
>>
>>
>


Re: Decouple Kafka partitions and Flink parallelism for ordered streams

Posted by Chesnay Schepler <ch...@apache.org>.
It is correct that keyBy and partition operations will distribute 
messages over the network
as they distribute the data across all subtasks. For this use-case we 
only want to consider
subtasks that are subsequent to our operator, like a local keyBy.

I don't think there is an obvious way to implement it, but I'm currently 
theory-crafting a bit
and will get back to you.

On 11.10.2017 14:52, Sanne de Roever wrote:
> Hi,
>
> Currently we need 75 Kafka partitions per topic and a parallelism of 
> 75 to meet required performance, increasing the partitions and 
> parallelism gives diminished returns
>
> Currently the performance is approx. 1500 msg/s per core, having one 
> pipeline (source, map, sink) deployed as one instance per core.
>
> The Kafka source performance is not an issue. The map is very heavy 
> (deserialization, validation) on rather complex Avro messages. Object 
> reuse is enabled.
>
> Ideally we would like to decouple Flink processing parallelism from 
> Kafka partitions in a following manner:
>
>   * Pick a source parallelism
>   * Per source, be able to pick a parallelism for the following map
>   * In such a way that some message key determines which -local- map
>     instance gets a message from a certain visitor
>   * So that messages with the same visitor key get processed by the
>     same map and in order for that visitor
>   * Output the result to Kafka
>
> AFAIK keyBy, partitionCustom will distribute messages over the network 
> and rescale has no affinity for message identity.
>
> Am I missing something obvious?
>
> Cheers,
>
> Sanne
>
>
>
>