You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Felix Neutatz <ne...@googlemail.com> on 2016/11/10 11:24:48 UTC

Re: [DISCUSS] FLIP-5 Only send data to each taskmanager once for broadcasts

Hi everybody,

the previous approach turned out to have an issue. Since we only write to
one subpartition, we have N-1 empty subpartitions per Task (where N =
degree of parallelism). In the current approach I didn't consume these
empty subpartitions. When you don't consume a subpartition it won't be
released. So we have a memory leak.

One workaround would be to read the empty subpartitions. But this is a
really ugly work-around.

So I had a chat with Till and we decided to create only one subpartition
instead of N subpartitions per task. I have already implemented this
approach.

Now the problem is that we need to know, when to release this subpartition.
We will create M subpartition-views per subpartition (where M is the number
of task managers & M <= N).

There are many ways to solve this problem:
1. Tell the subpartition how many taskmanagers will consume it.
(=> propagate M)
2. All tasks which don't need to read the subpartition, send a message to
the subpartition. So the subpartition will receive M release requests and
N-M "I am done" requests. So when the subpartition knows the number of
parallelism N, we are fine. (=> propagate N)

Any thoughts how to tackle this problem?

Best regards,
Felix

2016-08-10 19:14 GMT+02:00 Till Rohrmann <tr...@apache.org>:

> Cool first version Felix :-)
>
> On Wed, Aug 10, 2016 at 6:48 PM, Stephan Ewen <se...@apache.org> wrote:
>
> > Cool, nice results!
> >
> > For the iteration unspecialization - we probably should design this hand
> in
> > hand with the streaming fault tolerance, as they share the notion of
> > "intermediate result versions".
> >
> >
> > On Wed, Aug 10, 2016 at 6:09 PM, Felix Neutatz <ne...@googlemail.com>
> > wrote:
> >
> > > Hi everybody,
> > >
> > > I found a quick and dirty way to make the blocking subpartition
> readable
> > by
> > > multiple readers. In the JobGraph generation I make all broadcast
> > > partitions blocking (see more details here:
> > > https://github.com/FelixNeutatz/incubator-flink/
> > > commits/blockingMultipleReads).
> > > I want to point out that this branch is only experimental!
> > >
> > > This works for the simple Map().withBroadcastSet() use case.
> > >
> > > To test this approach, I run our peel bundle flink-broadcast (
> > > https://github.com/TU-Berlin-DIMA/flink-broadcast) on the IBM power
> > > cluster. Ibm-power has 8 nodes and we scale the number of slots per
> node
> > > from 1 - 16:
> > >
> > > broadcast.ibm-power-1 broadcast.01 6597.3333333333
> > > broadcast.ibm-power-1 broadcast.02 5997
> > > broadcast.ibm-power-1 broadcast.04 6576.6666666667
> > > broadcast.ibm-power-1 broadcast.08 7024.3333333333
> > > broadcast.ibm-power-1 broadcast.16 6933.3333333333
> > >
> > > The last row is the averaged run time in milliseconds over 3 runs. You
> > can
> > > clearly see, that the run time stays constant :)
> > >
> > > As discussed, this approach doesn't work yet for native iterations (see
> > > FLINK-1713).
> > >
> > > So in the next weeks I will work on the native iterations as Stephan
> > > proposed.
> > >
> > > Best regards,
> > > Felix
> > >
> > >
> > >
> > > 2016-08-09 21:29 GMT+07:00 Stephan Ewen <se...@apache.org>:
> > >
> > > > I agree with Till. Changing the basic data exchange mechanism would
> > screw
> > > > up many other ongoing efforts, like more incremental recovery.
> > > >
> > > > It seems to make this properly applicable, we need to first
> > un-specialize
> > > > the iterations.
> > > >
> > > > (1) Allow for "versioned" intermediate results, i.e.,
> > > result-x-superstep1,
> > > > result-x-superstep2, result-x-superstep3, result-x-superstep4, ...
> > > > We need something similar for fined grained recovery in streaming
> > > > (result-x-checkpoint1, result-x-checkpoint2, result-x-checkpoint3,
> > > > result-x-checkpoint4, ...) so it may be worth addressing that soon
> > > anyways.
> > > >
> > > > (2) Make iterations not dependent on the special local back channel.
> > > > Then we can simply schedule iterations like all other things.
> > > >
> > > > (3) Do the actual FLIP-5 proposal
> > > >
> > > >
> > > > That's quite an effort, but I fear all else will break the engine and
> > > other
> > > > efforts.
> > > >
> > > > Best,
> > > > Stephan
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Tue, Aug 9, 2016 at 4:05 PM, Till Rohrmann <tr...@apache.org>
> > > > wrote:
> > > >
> > > > > Hi Felix,
> > > > >
> > > > > if we cannot work around the problem with blocking intermediate
> > results
> > > > in
> > > > > iterations, then we have to make FLINK-1713 a blocker for this new
> > > issue.
> > > > > But maybe you can also keep the current broadcasting mechanism to
> be
> > > used
> > > > > within iterations only. Then we can address the iteration problem
> > > later.
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > >
> > > > > On Tue, Aug 9, 2016 at 3:54 PM, Felix Neutatz <
> > neutatz@googlemail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Till,
> > > > > >
> > > > > > thanks for the fast answer. I also think this should be the way
> to
> > > go.
> > > > So
> > > > > > should I open a new jira "Make blocking SpillableSubpartition
> able
> > to
> > > > be
> > > > > > read multiple times". Moreover should I mark this jira and
> > FLINK-1713
> > > > > > <https://issues.apache.org/jira/browse/FLINK-1713> as blocking
> for
> > > the
> > > > > > broadcast jira? What do you think?
> > > > > >
> > > > > > Best regards,
> > > > > > Felix
> > > > > >
> > > > > > 2016-08-09 17:41 GMT+07:00 Till Rohrmann <tr...@apache.org>:
> > > > > >
> > > > > > > Hi Felix,
> > > > > > >
> > > > > > > I'm not sure whether PipelinedSubpartition should be readable
> > more
> > > > than
> > > > > > > once because then it would effectively mean that we materialize
> > the
> > > > > > > elements of the pipelined subpartition for stragglers.
> > Therefore, I
> > > > > think
> > > > > > > that we should make blocking intermediate results readable more
> > > than
> > > > > > once.
> > > > > > > This will also be beneficial for interactive programs where we
> > > > continue
> > > > > > > from the results of previous Flink jobs.
> > > > > > >
> > > > > > > It might also be interesting to have a blocking mode which
> > > schedules
> > > > > its
> > > > > > > consumers once the first result is there. Thus, having a
> mixture
> > of
> > > > > > > pipelined and blocking mode.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Till
> > > > > > >
> > > > > > > On Tue, Aug 9, 2016 at 4:40 AM, Felix Neutatz <
> > > > neutatz@googlemail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Stephan,
> > > > > > > >
> > > > > > > > I did some research about blocking intermediate results. It
> > turns
> > > > out
> > > > > > > that
> > > > > > > > neither PipelinedSubpartition (see line 178) nor blocking
> > > > > intermediate
> > > > > > > > results (see SpillableSubpartition line: 189) can be read
> > > multiple
> > > > > > times.
> > > > > > > > Moreover blocking intermediate results are currently not
> > > supported
> > > > in
> > > > > > > > native iterations (see https://issues.apache.org/
> > > > > > jira/browse/FLINK-1713
> > > > > > > ).
> > > > > > > > So there are three ways to solve this:
> > > > > > > > 1) We extend Pipelined subpartitions to make it possible to
> > read
> > > > them
> > > > > > > > multiple times
> > > > > > > > 2) We extend Blocking subpartitions to make it possible to
> read
> > > > them
> > > > > > > > multiple times, but then we also have to fix FLINK-1713. So
> we
> > > can
> > > > > use
> > > > > > > > broadcasts in native iterations
> > > > > > > > 3) We create one pipelined subpartition for every
> taskmanager.
> > > > > Problem:
> > > > > > > The
> > > > > > > > more taskmanager there are, the more redundant data we store,
> > but
> > > > the
> > > > > > > > network traffic stays optimal.
> > > > > > > >
> > > > > > > > Thank you for your help,
> > > > > > > > Felix
> > > > > > > >
> > > > > > > > 2016-08-01 22:51 GMT+07:00 Stephan Ewen <se...@apache.org>:
> > > > > > > >
> > > > > > > > > Hi Felix!
> > > > > > > > >
> > > > > > > > > Hope this helps_
> > > > > > > > >
> > > > > > > > > Concerning (1.1) - The producer does not think in term of
> > > number
> > > > of
> > > > > > > > target
> > > > > > > > > TaskManagers. That number can, after all, change in the
> > > presence
> > > > > of a
> > > > > > > > > failure and recovery. The producer should, for its own
> > result,
> > > > not
> > > > > > care
> > > > > > > > how
> > > > > > > > > many consumers it will have (Tasks), but produce it only
> > once.
> > > > > > > > >
> > > > > > > > > Concerning (1.2)  - Only "blocking" intermediate results
> can
> > be
> > > > > > > consumed
> > > > > > > > > multiple times. Data sent to broadcast variables must thus
> be
> > > > > always
> > > > > > a
> > > > > > > > > blocking intermediate result.
> > > > > > > > >
> > > > > > > > > Greetings,
> > > > > > > > > Stephan
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Jul 27, 2016 at 11:33 AM, Felix Neutatz <
> > > > > > > neutatz@googlemail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Stephan,
> > > > > > > > > >
> > > > > > > > > > thanks for the great ideas. First I have some questions:
> > > > > > > > > >
> > > > > > > > > > 1.1) Does every task generate an intermediate result
> > > partition
> > > > > for
> > > > > > > > every
> > > > > > > > > > target task or is that already implemented in a way so
> that
> > > > there
> > > > > > are
> > > > > > > > > only
> > > > > > > > > > as many intermediate result partitions per task manager
> as
> > > > target
> > > > > > > > tasks?
> > > > > > > > > > (Example: There are 2 task managers with 2 tasks each. Do
> > we
> > > > get
> > > > > 4
> > > > > > > > > > intermediate result partitions per task manager or do we
> > get
> > > > 8?)
> > > > > > > > > > 1.2) How can I consume an intermediate result partition
> > > > multiple
> > > > > > > times?
> > > > > > > > > > When I tried that I got the following exception:
> > > > > > > > > > Caused by: java.lang.IllegalStateException:
> Subpartition 0
> > > of
> > > > > > > > > > dbe284e3b37c1df1b993a3f0a6020ea6@
> > > > ce9fc38f08a5cc9e93431a9cbf740d
> > > > > cf
> > > > > > is
> > > > > > > > > being
> > > > > > > > > > or already has been consumed, but pipelined subpartitions
> > can
> > > > > only
> > > > > > be
> > > > > > > > > > consumed once.
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > > org.apache.flink.runtime.io.network.partition.
> > > > > > PipelinedSubpartition.
> > > > > > > > > createReadView(PipelinedSubpartition.java:179)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > > org.apache.flink.runtime.io.network.partition.
> > > > > > PipelinedSubpartition.
> > > > > > > > > createReadView(PipelinedSubpartition.java:36)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > > org.apache.flink.runtime.io.network.partition.
> > > ResultPartition.
> > > > > > > > > createSubpartitionView(ResultPartition.java:348)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > > org.apache.flink.runtime.io.network.partition.
> > > > > > > ResultPartitionManager.
> > > > > > > > > createSubpartitionView(ResultPartitionManager.java:81)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > > org.apache.flink.runtime.io.network.netty.
> > > > > > > > PartitionRequestServerHandler.
> > > > > > > > > channelRead0(PartitionRequestServerHandler.java:98)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > > org.apache.flink.runtime.io.network.netty.
> > > > > > > > PartitionRequestServerHandler.
> > > > > > > > > channelRead0(PartitionRequestServerHandler.java:41)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > > io.netty.channel.SimpleChannelInboundHandler.
> channelRead(
> > > > > > > > > SimpleChannelInboundHandler.java:105)
> > > > > > > > > >
> > > > > > > > > > My status update: Since Friday I am implementing your
> idea
> > > > > > described
> > > > > > > in
> > > > > > > > > > (2). Locally this approach already works (for less than
> 170
> > > > > > > > iterations).
> > > > > > > > > I
> > > > > > > > > > will investigate further to solve that issue.
> > > > > > > > > >
> > > > > > > > > > But I am still not sure how to implement (1). Maybe we
> > > > introduce
> > > > > a
> > > > > > > > > similar
> > > > > > > > > > construct like the BroadcastVariableManager to share the
> > > > > > RecordWriter
> > > > > > > > > among
> > > > > > > > > > all tasks of a taskmanager. I am interested in your
> > thoughts
> > > :)
> > > > > > > > > >
> > > > > > > > > > Best regards,
> > > > > > > > > > Felix
> > > > > > > > > >
> > > > > > > > > > 2016-07-22 17:25 GMT+02:00 Stephan Ewen <
> sewen@apache.org
> > >:
> > > > > > > > > >
> > > > > > > > > > > Hi Felix!
> > > > > > > > > > >
> > > > > > > > > > > Interesting suggestion. Here are some thoughts on the
> > > design.
> > > > > > > > > > >
> > > > > > > > > > > The two core changes needed to send data once to the
> > > > > TaskManagers
> > > > > > > > are:
> > > > > > > > > > >
> > > > > > > > > > >   (1) Every sender needs to produce its stuff once
> > (rather
> > > > than
> > > > > > for
> > > > > > > > > every
> > > > > > > > > > > target task), there should not be redundancy there.
> > > > > > > > > > >   (2) Every TaskManager should request the data once,
> > other
> > > > > tasks
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > > same TaskManager pick it up from there.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > The current receiver-initialted pull model is actually
> a
> > > good
> > > > > > > > > abstraction
> > > > > > > > > > > for that, I think.
> > > > > > > > > > >
> > > > > > > > > > > Lets look at (1):
> > > > > > > > > > >
> > > > > > > > > > >   - Currently, the TaskManagers have a separate
> > > intermediate
> > > > > > result
> > > > > > > > > > > partition for each target slot. They should rather have
> > one
> > > > > > > > > intermediate
> > > > > > > > > > > result partition (saves also repeated serialization)
> that
> > > is
> > > > > > > consumed
> > > > > > > > > > > multiple times.
> > > > > > > > > > >
> > > > > > > > > > >   - Since the results that are to be broadcasted are
> > always
> > > > > > > > "blocking",
> > > > > > > > > > > they can be consumed (pulled)  multiples times.
> > > > > > > > > > >
> > > > > > > > > > > Lets look at (2):
> > > > > > > > > > >
> > > > > > > > > > >   - The current BroadcastVariableManager has the
> > > > functionality
> > > > > to
> > > > > > > let
> > > > > > > > > the
> > > > > > > > > > > first accessor of the BC-variable materialize the
> result.
> > > > > > > > > > >
> > > > > > > > > > >   - It could be changed such that only the first
> accessor
> > > > > > creates a
> > > > > > > > > > > RecordReader, so the others do not even request the
> > stream.
> > > > > That
> > > > > > > way,
> > > > > > > > > the
> > > > > > > > > > > TaskManager should pull only one stream from each
> > producing
> > > > > task,
> > > > > > > > which
> > > > > > > > > > > means the data is transferred once.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > That would also work perfectly with the current
> failure /
> > > > > > recovery
> > > > > > > > > model.
> > > > > > > > > > >
> > > > > > > > > > > What do you think?
> > > > > > > > > > >
> > > > > > > > > > > Stephan
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Jul 22, 2016 at 2:59 PM, Felix Neutatz <
> > > > > > > > neutatz@googlemail.com
> > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi everybody,
> > > > > > > > > > > >
> > > > > > > > > > > > I want to improve the performance of broadcasts in
> > Flink.
> > > > > > > Therefore
> > > > > > > > > > Till
> > > > > > > > > > > > told me to start a FLIP on this topic to discuss how
> to
> > > go
> > > > > > > forward
> > > > > > > > to
> > > > > > > > > > > solve
> > > > > > > > > > > > the current issues for broadcasts.
> > > > > > > > > > > >
> > > > > > > > > > > > The problem in a nutshell: Instead of sending data to
> > > each
> > > > > > > > > taskmanager
> > > > > > > > > > > only
> > > > > > > > > > > > once, at the moment the data is sent to each task.
> This
> > > > means
> > > > > > if
> > > > > > > > > there
> > > > > > > > > > > are
> > > > > > > > > > > > 3 slots on each taskmanager we will send the data 3
> > times
> > > > > > instead
> > > > > > > > of
> > > > > > > > > > > once.
> > > > > > > > > > > >
> > > > > > > > > > > > There are multiple ways to tackle this problem and I
> > > > started
> > > > > to
> > > > > > > do
> > > > > > > > > some
> > > > > > > > > > > > research and investigate. You can follow my thought
> > > process
> > > > > > here:
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > > > > > > > > 5%3A+Only+send+data+to+each+taskmanager+once+for+
> broadcasts
> > > > > > > > > > > >
> > > > > > > > > > > > This is my first FLIP. So please correct me, if I did
> > > > > something
> > > > > > > > > wrong.
> > > > > > > > > > > >
> > > > > > > > > > > > I am interested in your thoughts about how to solve
> > this
> > > > > issue.
> > > > > > > Do
> > > > > > > > > you
> > > > > > > > > > > > think my approach is heading into the right direction
> > or
> > > > > should
> > > > > > > we
> > > > > > > > > > > follow a
> > > > > > > > > > > > totally different one.
> > > > > > > > > > > >
> > > > > > > > > > > > I am happy about any comment :)
> > > > > > > > > > > >
> > > > > > > > > > > > Best regards,
> > > > > > > > > > > > Felix
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-5 Only send data to each taskmanager once for broadcasts

Posted by Felix Neutatz <ne...@googlemail.com>.
Hi everybody,

I implemented the second approach (see https://cwiki.apache.org/confl
uence/display/FLINK/FLIP-5%3A+Only+send+data+to+each+taskman
ager+once+for+broadcasts). So each subpartition will be read by m tasks (m
= number of task managers) and the other tasks will notify the subpartition
that they don't need to read. This solves the problem, and we release the
subpartition just when we don't need it anymore.

The message I sent for all task which don't need to read is "
notifySubpartitionConsumed()"
https://github.com/FelixNeutatz/incubator-flink/blob/1b58d9c9df89620f2557b59e7fde40ffe04f49d8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java#L460

This means I first have to connect via a PartitionRequest and then i will
notify all channels.
One problem of using the standard PartitionRequest is that we will already
fill the first buffer:
https://github.com/FelixNeutatz/incubator-flink/blob/oneSubpartition/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java#L123

So my question is: is that ok, or:
1) should we introduce another Netty.Message
PartitionRequestAndNotifyConsumed
2) should we extend the PartitionRequest with the attribute "boolean
getAhead"

Current problems:
Native iterations:
Native iterations work but are not optimized. Theoretically, in the case of
native iterations we can also notify the subpartitions instead of reading
them, but at the moment I get the following exception when I do so:
java.lang.IllegalStateException: Queried for a buffer before requesting the
subpartition.
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at org.apache.flink.runtime.io.network.partition.consumer.Local
InputChannel.getNextBuffer(LocalInputChannel.java:152)
at org.apache.flink.runtime.io.network.partition.consumer.Singl
eInputGate.getNextBufferOrEvent(SingleInputGate.java:424)
at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
dReader.getNextRecord(AbstractRecordReader.java:87)
at org.apache.flink.runtime.io.network.api.reader.MutableRecord
Reader.next(MutableRecordReader.java:42)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(
ReaderIterator.java:73)
at org.apache.flink.runtime.broadcast.BroadcastVariableMaterial
ization.materializeVariable(BroadcastVariableMaterialization.java:114)

In general, it seems to me that this reduces network traffic even for
pipelined partitions in the "old" architecture. I will further investigate
why this is not working. For a simple map job with pipelined partitions
this already works. So there has to be some kind of iteration specific
thing which keeps that from working.

But for now I would propose to first push the improvements to Flink without
the iteration improvements. The overhead of the two code paths are just 2
lines of codes, which is really little.

I am happy to hear your thoughts :)

Best regards,
Felix

2016-11-10 12:24 GMT+01:00 Felix Neutatz <ne...@googlemail.com>:

> Hi everybody,
>
> the previous approach turned out to have an issue. Since we only write to
> one subpartition, we have N-1 empty subpartitions per Task (where N =
> degree of parallelism). In the current approach I didn't consume these
> empty subpartitions. When you don't consume a subpartition it won't be
> released. So we have a memory leak.
>
> One workaround would be to read the empty subpartitions. But this is a
> really ugly work-around.
>
> So I had a chat with Till and we decided to create only one subpartition
> instead of N subpartitions per task. I have already implemented this
> approach.
>
> Now the problem is that we need to know, when to release this
> subpartition. We will create M subpartition-views per subpartition (where M
> is the number of task managers & M <= N).
>
> There are many ways to solve this problem:
> 1. Tell the subpartition how many taskmanagers will consume it.
> (=> propagate M)
> 2. All tasks which don't need to read the subpartition, send a message to
> the subpartition. So the subpartition will receive M release requests and
> N-M "I am done" requests. So when the subpartition knows the number of
> parallelism N, we are fine. (=> propagate N)
>
> Any thoughts how to tackle this problem?
>
> Best regards,
> Felix
>
> 2016-08-10 19:14 GMT+02:00 Till Rohrmann <tr...@apache.org>:
>
>> Cool first version Felix :-)
>>
>> On Wed, Aug 10, 2016 at 6:48 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>> > Cool, nice results!
>> >
>> > For the iteration unspecialization - we probably should design this
>> hand in
>> > hand with the streaming fault tolerance, as they share the notion of
>> > "intermediate result versions".
>> >
>> >
>> > On Wed, Aug 10, 2016 at 6:09 PM, Felix Neutatz <ne...@googlemail.com>
>> > wrote:
>> >
>> > > Hi everybody,
>> > >
>> > > I found a quick and dirty way to make the blocking subpartition
>> readable
>> > by
>> > > multiple readers. In the JobGraph generation I make all broadcast
>> > > partitions blocking (see more details here:
>> > > https://github.com/FelixNeutatz/incubator-flink/
>> > > commits/blockingMultipleReads).
>> > > I want to point out that this branch is only experimental!
>> > >
>> > > This works for the simple Map().withBroadcastSet() use case.
>> > >
>> > > To test this approach, I run our peel bundle flink-broadcast (
>> > > https://github.com/TU-Berlin-DIMA/flink-broadcast) on the IBM power
>> > > cluster. Ibm-power has 8 nodes and we scale the number of slots per
>> node
>> > > from 1 - 16:
>> > >
>> > > broadcast.ibm-power-1 broadcast.01 6597.3333333333
>> > > broadcast.ibm-power-1 broadcast.02 5997
>> > > broadcast.ibm-power-1 broadcast.04 6576.6666666667
>> > > broadcast.ibm-power-1 broadcast.08 7024.3333333333
>> > > broadcast.ibm-power-1 broadcast.16 6933.3333333333
>> > >
>> > > The last row is the averaged run time in milliseconds over 3 runs. You
>> > can
>> > > clearly see, that the run time stays constant :)
>> > >
>> > > As discussed, this approach doesn't work yet for native iterations
>> (see
>> > > FLINK-1713).
>> > >
>> > > So in the next weeks I will work on the native iterations as Stephan
>> > > proposed.
>> > >
>> > > Best regards,
>> > > Felix
>> > >
>> > >
>> > >
>> > > 2016-08-09 21:29 GMT+07:00 Stephan Ewen <se...@apache.org>:
>> > >
>> > > > I agree with Till. Changing the basic data exchange mechanism would
>> > screw
>> > > > up many other ongoing efforts, like more incremental recovery.
>> > > >
>> > > > It seems to make this properly applicable, we need to first
>> > un-specialize
>> > > > the iterations.
>> > > >
>> > > > (1) Allow for "versioned" intermediate results, i.e.,
>> > > result-x-superstep1,
>> > > > result-x-superstep2, result-x-superstep3, result-x-superstep4, ...
>> > > > We need something similar for fined grained recovery in streaming
>> > > > (result-x-checkpoint1, result-x-checkpoint2, result-x-checkpoint3,
>> > > > result-x-checkpoint4, ...) so it may be worth addressing that soon
>> > > anyways.
>> > > >
>> > > > (2) Make iterations not dependent on the special local back channel.
>> > > > Then we can simply schedule iterations like all other things.
>> > > >
>> > > > (3) Do the actual FLIP-5 proposal
>> > > >
>> > > >
>> > > > That's quite an effort, but I fear all else will break the engine
>> and
>> > > other
>> > > > efforts.
>> > > >
>> > > > Best,
>> > > > Stephan
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > On Tue, Aug 9, 2016 at 4:05 PM, Till Rohrmann <trohrmann@apache.org
>> >
>> > > > wrote:
>> > > >
>> > > > > Hi Felix,
>> > > > >
>> > > > > if we cannot work around the problem with blocking intermediate
>> > results
>> > > > in
>> > > > > iterations, then we have to make FLINK-1713 a blocker for this new
>> > > issue.
>> > > > > But maybe you can also keep the current broadcasting mechanism to
>> be
>> > > used
>> > > > > within iterations only. Then we can address the iteration problem
>> > > later.
>> > > > >
>> > > > > Cheers,
>> > > > > Till
>> > > > >
>> > > > > On Tue, Aug 9, 2016 at 3:54 PM, Felix Neutatz <
>> > neutatz@googlemail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > Hi Till,
>> > > > > >
>> > > > > > thanks for the fast answer. I also think this should be the way
>> to
>> > > go.
>> > > > So
>> > > > > > should I open a new jira "Make blocking SpillableSubpartition
>> able
>> > to
>> > > > be
>> > > > > > read multiple times". Moreover should I mark this jira and
>> > FLINK-1713
>> > > > > > <https://issues.apache.org/jira/browse/FLINK-1713> as blocking
>> for
>> > > the
>> > > > > > broadcast jira? What do you think?
>> > > > > >
>> > > > > > Best regards,
>> > > > > > Felix
>> > > > > >
>> > > > > > 2016-08-09 17:41 GMT+07:00 Till Rohrmann <trohrmann@apache.org
>> >:
>> > > > > >
>> > > > > > > Hi Felix,
>> > > > > > >
>> > > > > > > I'm not sure whether PipelinedSubpartition should be readable
>> > more
>> > > > than
>> > > > > > > once because then it would effectively mean that we
>> materialize
>> > the
>> > > > > > > elements of the pipelined subpartition for stragglers.
>> > Therefore, I
>> > > > > think
>> > > > > > > that we should make blocking intermediate results readable
>> more
>> > > than
>> > > > > > once.
>> > > > > > > This will also be beneficial for interactive programs where we
>> > > > continue
>> > > > > > > from the results of previous Flink jobs.
>> > > > > > >
>> > > > > > > It might also be interesting to have a blocking mode which
>> > > schedules
>> > > > > its
>> > > > > > > consumers once the first result is there. Thus, having a
>> mixture
>> > of
>> > > > > > > pipelined and blocking mode.
>> > > > > > >
>> > > > > > > Cheers,
>> > > > > > > Till
>> > > > > > >
>> > > > > > > On Tue, Aug 9, 2016 at 4:40 AM, Felix Neutatz <
>> > > > neutatz@googlemail.com>
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > > > Hi Stephan,
>> > > > > > > >
>> > > > > > > > I did some research about blocking intermediate results. It
>> > turns
>> > > > out
>> > > > > > > that
>> > > > > > > > neither PipelinedSubpartition (see line 178) nor blocking
>> > > > > intermediate
>> > > > > > > > results (see SpillableSubpartition line: 189) can be read
>> > > multiple
>> > > > > > times.
>> > > > > > > > Moreover blocking intermediate results are currently not
>> > > supported
>> > > > in
>> > > > > > > > native iterations (see https://issues.apache.org/
>> > > > > > jira/browse/FLINK-1713
>> > > > > > > ).
>> > > > > > > > So there are three ways to solve this:
>> > > > > > > > 1) We extend Pipelined subpartitions to make it possible to
>> > read
>> > > > them
>> > > > > > > > multiple times
>> > > > > > > > 2) We extend Blocking subpartitions to make it possible to
>> read
>> > > > them
>> > > > > > > > multiple times, but then we also have to fix FLINK-1713. So
>> we
>> > > can
>> > > > > use
>> > > > > > > > broadcasts in native iterations
>> > > > > > > > 3) We create one pipelined subpartition for every
>> taskmanager.
>> > > > > Problem:
>> > > > > > > The
>> > > > > > > > more taskmanager there are, the more redundant data we
>> store,
>> > but
>> > > > the
>> > > > > > > > network traffic stays optimal.
>> > > > > > > >
>> > > > > > > > Thank you for your help,
>> > > > > > > > Felix
>> > > > > > > >
>> > > > > > > > 2016-08-01 22:51 GMT+07:00 Stephan Ewen <se...@apache.org>:
>> > > > > > > >
>> > > > > > > > > Hi Felix!
>> > > > > > > > >
>> > > > > > > > > Hope this helps_
>> > > > > > > > >
>> > > > > > > > > Concerning (1.1) - The producer does not think in term of
>> > > number
>> > > > of
>> > > > > > > > target
>> > > > > > > > > TaskManagers. That number can, after all, change in the
>> > > presence
>> > > > > of a
>> > > > > > > > > failure and recovery. The producer should, for its own
>> > result,
>> > > > not
>> > > > > > care
>> > > > > > > > how
>> > > > > > > > > many consumers it will have (Tasks), but produce it only
>> > once.
>> > > > > > > > >
>> > > > > > > > > Concerning (1.2)  - Only "blocking" intermediate results
>> can
>> > be
>> > > > > > > consumed
>> > > > > > > > > multiple times. Data sent to broadcast variables must
>> thus be
>> > > > > always
>> > > > > > a
>> > > > > > > > > blocking intermediate result.
>> > > > > > > > >
>> > > > > > > > > Greetings,
>> > > > > > > > > Stephan
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > On Wed, Jul 27, 2016 at 11:33 AM, Felix Neutatz <
>> > > > > > > neutatz@googlemail.com>
>> > > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Hi Stephan,
>> > > > > > > > > >
>> > > > > > > > > > thanks for the great ideas. First I have some questions:
>> > > > > > > > > >
>> > > > > > > > > > 1.1) Does every task generate an intermediate result
>> > > partition
>> > > > > for
>> > > > > > > > every
>> > > > > > > > > > target task or is that already implemented in a way so
>> that
>> > > > there
>> > > > > > are
>> > > > > > > > > only
>> > > > > > > > > > as many intermediate result partitions per task manager
>> as
>> > > > target
>> > > > > > > > tasks?
>> > > > > > > > > > (Example: There are 2 task managers with 2 tasks each.
>> Do
>> > we
>> > > > get
>> > > > > 4
>> > > > > > > > > > intermediate result partitions per task manager or do we
>> > get
>> > > > 8?)
>> > > > > > > > > > 1.2) How can I consume an intermediate result partition
>> > > > multiple
>> > > > > > > times?
>> > > > > > > > > > When I tried that I got the following exception:
>> > > > > > > > > > Caused by: java.lang.IllegalStateException:
>> Subpartition 0
>> > > of
>> > > > > > > > > > dbe284e3b37c1df1b993a3f0a6020ea6@
>> > > > ce9fc38f08a5cc9e93431a9cbf740d
>> > > > > cf
>> > > > > > is
>> > > > > > > > > being
>> > > > > > > > > > or already has been consumed, but pipelined
>> subpartitions
>> > can
>> > > > > only
>> > > > > > be
>> > > > > > > > > > consumed once.
>> > > > > > > > > > at
>> > > > > > > > > >
>> > > > > > > > > > org.apache.flink.runtime.io.network.partition.
>> > > > > > PipelinedSubpartition.
>> > > > > > > > > createReadView(PipelinedSubpartition.java:179)
>> > > > > > > > > > at
>> > > > > > > > > >
>> > > > > > > > > > org.apache.flink.runtime.io.network.partition.
>> > > > > > PipelinedSubpartition.
>> > > > > > > > > createReadView(PipelinedSubpartition.java:36)
>> > > > > > > > > > at
>> > > > > > > > > >
>> > > > > > > > > > org.apache.flink.runtime.io.network.partition.
>> > > ResultPartition.
>> > > > > > > > > createSubpartitionView(ResultPartition.java:348)
>> > > > > > > > > > at
>> > > > > > > > > >
>> > > > > > > > > > org.apache.flink.runtime.io.network.partition.
>> > > > > > > ResultPartitionManager.
>> > > > > > > > > createSubpartitionView(ResultPartitionManager.java:81)
>> > > > > > > > > > at
>> > > > > > > > > >
>> > > > > > > > > > org.apache.flink.runtime.io.network.netty.
>> > > > > > > > PartitionRequestServerHandler.
>> > > > > > > > > channelRead0(PartitionRequestServerHandler.java:98)
>> > > > > > > > > > at
>> > > > > > > > > >
>> > > > > > > > > > org.apache.flink.runtime.io.network.netty.
>> > > > > > > > PartitionRequestServerHandler.
>> > > > > > > > > channelRead0(PartitionRequestServerHandler.java:41)
>> > > > > > > > > > at
>> > > > > > > > > >
>> > > > > > > > > > io.netty.channel.SimpleChannel
>> InboundHandler.channelRead(
>> > > > > > > > > SimpleChannelInboundHandler.java:105)
>> > > > > > > > > >
>> > > > > > > > > > My status update: Since Friday I am implementing your
>> idea
>> > > > > > described
>> > > > > > > in
>> > > > > > > > > > (2). Locally this approach already works (for less than
>> 170
>> > > > > > > > iterations).
>> > > > > > > > > I
>> > > > > > > > > > will investigate further to solve that issue.
>> > > > > > > > > >
>> > > > > > > > > > But I am still not sure how to implement (1). Maybe we
>> > > > introduce
>> > > > > a
>> > > > > > > > > similar
>> > > > > > > > > > construct like the BroadcastVariableManager to share the
>> > > > > > RecordWriter
>> > > > > > > > > among
>> > > > > > > > > > all tasks of a taskmanager. I am interested in your
>> > thoughts
>> > > :)
>> > > > > > > > > >
>> > > > > > > > > > Best regards,
>> > > > > > > > > > Felix
>> > > > > > > > > >
>> > > > > > > > > > 2016-07-22 17:25 GMT+02:00 Stephan Ewen <
>> sewen@apache.org
>> > >:
>> > > > > > > > > >
>> > > > > > > > > > > Hi Felix!
>> > > > > > > > > > >
>> > > > > > > > > > > Interesting suggestion. Here are some thoughts on the
>> > > design.
>> > > > > > > > > > >
>> > > > > > > > > > > The two core changes needed to send data once to the
>> > > > > TaskManagers
>> > > > > > > > are:
>> > > > > > > > > > >
>> > > > > > > > > > >   (1) Every sender needs to produce its stuff once
>> > (rather
>> > > > than
>> > > > > > for
>> > > > > > > > > every
>> > > > > > > > > > > target task), there should not be redundancy there.
>> > > > > > > > > > >   (2) Every TaskManager should request the data once,
>> > other
>> > > > > tasks
>> > > > > > > in
>> > > > > > > > > the
>> > > > > > > > > > > same TaskManager pick it up from there.
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > The current receiver-initialted pull model is
>> actually a
>> > > good
>> > > > > > > > > abstraction
>> > > > > > > > > > > for that, I think.
>> > > > > > > > > > >
>> > > > > > > > > > > Lets look at (1):
>> > > > > > > > > > >
>> > > > > > > > > > >   - Currently, the TaskManagers have a separate
>> > > intermediate
>> > > > > > result
>> > > > > > > > > > > partition for each target slot. They should rather
>> have
>> > one
>> > > > > > > > > intermediate
>> > > > > > > > > > > result partition (saves also repeated serialization)
>> that
>> > > is
>> > > > > > > consumed
>> > > > > > > > > > > multiple times.
>> > > > > > > > > > >
>> > > > > > > > > > >   - Since the results that are to be broadcasted are
>> > always
>> > > > > > > > "blocking",
>> > > > > > > > > > > they can be consumed (pulled)  multiples times.
>> > > > > > > > > > >
>> > > > > > > > > > > Lets look at (2):
>> > > > > > > > > > >
>> > > > > > > > > > >   - The current BroadcastVariableManager has the
>> > > > functionality
>> > > > > to
>> > > > > > > let
>> > > > > > > > > the
>> > > > > > > > > > > first accessor of the BC-variable materialize the
>> result.
>> > > > > > > > > > >
>> > > > > > > > > > >   - It could be changed such that only the first
>> accessor
>> > > > > > creates a
>> > > > > > > > > > > RecordReader, so the others do not even request the
>> > stream.
>> > > > > That
>> > > > > > > way,
>> > > > > > > > > the
>> > > > > > > > > > > TaskManager should pull only one stream from each
>> > producing
>> > > > > task,
>> > > > > > > > which
>> > > > > > > > > > > means the data is transferred once.
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > That would also work perfectly with the current
>> failure /
>> > > > > > recovery
>> > > > > > > > > model.
>> > > > > > > > > > >
>> > > > > > > > > > > What do you think?
>> > > > > > > > > > >
>> > > > > > > > > > > Stephan
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > On Fri, Jul 22, 2016 at 2:59 PM, Felix Neutatz <
>> > > > > > > > neutatz@googlemail.com
>> > > > > > > > > >
>> > > > > > > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > Hi everybody,
>> > > > > > > > > > > >
>> > > > > > > > > > > > I want to improve the performance of broadcasts in
>> > Flink.
>> > > > > > > Therefore
>> > > > > > > > > > Till
>> > > > > > > > > > > > told me to start a FLIP on this topic to discuss
>> how to
>> > > go
>> > > > > > > forward
>> > > > > > > > to
>> > > > > > > > > > > solve
>> > > > > > > > > > > > the current issues for broadcasts.
>> > > > > > > > > > > >
>> > > > > > > > > > > > The problem in a nutshell: Instead of sending data
>> to
>> > > each
>> > > > > > > > > taskmanager
>> > > > > > > > > > > only
>> > > > > > > > > > > > once, at the moment the data is sent to each task.
>> This
>> > > > means
>> > > > > > if
>> > > > > > > > > there
>> > > > > > > > > > > are
>> > > > > > > > > > > > 3 slots on each taskmanager we will send the data 3
>> > times
>> > > > > > instead
>> > > > > > > > of
>> > > > > > > > > > > once.
>> > > > > > > > > > > >
>> > > > > > > > > > > > There are multiple ways to tackle this problem and I
>> > > > started
>> > > > > to
>> > > > > > > do
>> > > > > > > > > some
>> > > > > > > > > > > > research and investigate. You can follow my thought
>> > > process
>> > > > > > here:
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-
>> > > > > > > > > 5%3A+Only+send+data+to+each+ta
>> skmanager+once+for+broadcasts
>> > > > > > > > > > > >
>> > > > > > > > > > > > This is my first FLIP. So please correct me, if I
>> did
>> > > > > something
>> > > > > > > > > wrong.
>> > > > > > > > > > > >
>> > > > > > > > > > > > I am interested in your thoughts about how to solve
>> > this
>> > > > > issue.
>> > > > > > > Do
>> > > > > > > > > you
>> > > > > > > > > > > > think my approach is heading into the right
>> direction
>> > or
>> > > > > should
>> > > > > > > we
>> > > > > > > > > > > follow a
>> > > > > > > > > > > > totally different one.
>> > > > > > > > > > > >
>> > > > > > > > > > > > I am happy about any comment :)
>> > > > > > > > > > > >
>> > > > > > > > > > > > Best regards,
>> > > > > > > > > > > > Felix
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>