You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by boci <bo...@gmail.com> on 2017/08/28 21:29:41 UTC

Union limit

Hi guys!

I have one input (from mongo) and I split the incoming data to multiple
datasets (each created dynamically from configuration) and before I write
back the result I want to merge it to one dataset (there is some common
transformation).
so the flow:

DataSet from Mongod =>
Create Mappers dynamically (currently 74) so I have 74 DataSet =>
Custom filter and mapping on each dataset =>
Union dynamically to one (every mapper result is same type) =>
Some another common transformation =>
Count the result

but when I want to union more than 64 dataset I got these exception:

Exception in thread "main" org.apache.flink.optimizer.CompilerException:
Cannot currently handle nodes with more than 64 outputs.
at
org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection(OptimizerNode.java:348)
at
org.apache.flink.optimizer.dag.SingleInputNode.setInput(SingleInputNode.java:202)
at
org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:268)
at
org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:82)

I try to split the incoming (74) list of dataset to split to 60 + 14
 dataset and create an id mapper and union the result datasets but no
success:

val listOfDataSet: List[DataSet[...]] = ....

listOfDataSet
.sliding(60,60)
.map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)),map(new IdMapper()))
//There is an iterator of DataSet
.reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception
.map(finalDataSet => ... some transformation ...)
.count()

There is any solution to solve this?

Thanks
b0c1

Re: Union limit

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

the following code should do what you want.
I included an implementation of an IdMapper.
At the end, I print the execution plan which is generated after the
optimization (so the pipeline is working until then).

Best, Fabian

val data: Seq[Seq[Int]] = (1 until 315).map(i => Seq(1, 2, 3))

val dataSets: Seq[DataSet[Int]] = data.map(env.fromCollection(_))

dataSets.sliding(60, 60)
  .map(dsg => dsg.reduce( (ds1: DataSet[Int], ds2: DataSet[Int]) =>
ds1.union(ds2)).map(new IdMapper[Int]()))
  .reduce( (dsg1: DataSet[Int], dsg2: DataSet[Int]) => dsg1.union(dsg2))
  .map(x => x * 2) // do something with the union result
  .output(new DiscardingOutputFormat[Int])

println(env.getExecutionPlan())

class IdMapper[T] extends MapFunction[T, T] {
  override def map(value: T): T = value
}

2017-08-31 12:30 GMT+02:00 boci <bo...@gmail.com>:

> Dear Fabian,
>
> Thanks to your answer (I think you said same in StackOverflow) but as you
> see in my code your solution does not work anymore:
>
> Here is the code, it's split the datasets to list (each list contains
> maximum 60 datasets)
> After that, I  reduce the dataset using union and map with an IdMapper and
> return the id mapped data set.
> But when the next reduce (where I want to merge the id mapped stream) the
> flink said I reached the limit.
>
> Maybe my IdMapper is wrong... Can you show a correct working IdMapper?
>
> b0c1
>
> ps:
> Here is the code segment:
> listOfDataSet
> .sliding(60,60)
> .map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)).map(new IdMapper()))
> //There is an iterator of DataSet
> .reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception
> .map(finalDataSet => ... some transformation ...)
> .count()
>
>
>
> On Wed, 30 Aug 2017 at 15:44 Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi b0c1,
>>
>> This is an limitation in Flink's optimizer.
>> Internally, all binary unions are merged into a single n-ary union. The
>> optimizer restricts the number of inputs for an operator to 64.
>>
>> You can work around this limitation with an identity mapper which
>> prevents the union operators from merging:
>>
>> in1----\
>> in2------ Id-Map--- NextOp
>> ...       /             / /
>> in14--/             / /
>>                       / /
>> in15------------/ /
>> ...                   /
>> in74------------/
>>
>> This is not a super nice solution, but the only way that comes to my mind.
>>
>> Cheers, Fabian
>>
>> 2017-08-28 23:29 GMT+02:00 boci <bo...@gmail.com>:
>>
>>> Hi guys!
>>>
>>> I have one input (from mongo) and I split the incoming data to multiple
>>> datasets (each created dynamically from configuration) and before I write
>>> back the result I want to merge it to one dataset (there is some common
>>> transformation).
>>> so the flow:
>>>
>>> DataSet from Mongod =>
>>> Create Mappers dynamically (currently 74) so I have 74 DataSet =>
>>> Custom filter and mapping on each dataset =>
>>> Union dynamically to one (every mapper result is same type) =>
>>> Some another common transformation =>
>>> Count the result
>>>
>>> but when I want to union more than 64 dataset I got these exception:
>>>
>>> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
>>> Cannot currently handle nodes with more than 64 outputs.
>>> at org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection(
>>> OptimizerNode.java:348)
>>> at org.apache.flink.optimizer.dag.SingleInputNode.setInput(
>>> SingleInputNode.java:202)
>>> at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(
>>> GraphCreatingVisitor.java:268)
>>> at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(
>>> GraphCreatingVisitor.java:82)
>>>
>>> I try to split the incoming (74) list of dataset to split to 60 + 14
>>>  dataset and create an id mapper and union the result datasets but no
>>> success:
>>>
>>> val listOfDataSet: List[DataSet[...]] = ....
>>>
>>> listOfDataSet
>>> .sliding(60,60)
>>> .map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)),map(new IdMapper()))
>>> //There is an iterator of DataSet
>>> .reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception
>>> .map(finalDataSet => ... some transformation ...)
>>> .count()
>>>
>>> There is any solution to solve this?
>>>
>>> Thanks
>>> b0c1
>>>
>>
>>

Re: Union limit

Posted by boci <bo...@gmail.com>.
Dear Fabian,

Thanks to your answer (I think you said same in StackOverflow) but as you
see in my code your solution does not work anymore:

Here is the code, it's split the datasets to list (each list contains
maximum 60 datasets)
After that, I  reduce the dataset using union and map with an IdMapper and
return the id mapped data set.
But when the next reduce (where I want to merge the id mapped stream) the
flink said I reached the limit.

Maybe my IdMapper is wrong... Can you show a correct working IdMapper?

b0c1

ps:
Here is the code segment:
listOfDataSet
.sliding(60,60)
.map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)).map(new IdMapper()))
//There is an iterator of DataSet
.reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception
.map(finalDataSet => ... some transformation ...)
.count()



On Wed, 30 Aug 2017 at 15:44 Fabian Hueske <fh...@gmail.com> wrote:

> Hi b0c1,
>
> This is an limitation in Flink's optimizer.
> Internally, all binary unions are merged into a single n-ary union. The
> optimizer restricts the number of inputs for an operator to 64.
>
> You can work around this limitation with an identity mapper which prevents
> the union operators from merging:
>
> in1----\
> in2------ Id-Map--- NextOp
> ...       /             / /
> in14--/             / /
>                       / /
> in15------------/ /
> ...                   /
> in74------------/
>
> This is not a super nice solution, but the only way that comes to my mind.
>
> Cheers, Fabian
>
> 2017-08-28 23:29 GMT+02:00 boci <bo...@gmail.com>:
>
>> Hi guys!
>>
>> I have one input (from mongo) and I split the incoming data to multiple
>> datasets (each created dynamically from configuration) and before I write
>> back the result I want to merge it to one dataset (there is some common
>> transformation).
>> so the flow:
>>
>> DataSet from Mongod =>
>> Create Mappers dynamically (currently 74) so I have 74 DataSet =>
>> Custom filter and mapping on each dataset =>
>> Union dynamically to one (every mapper result is same type) =>
>> Some another common transformation =>
>> Count the result
>>
>> but when I want to union more than 64 dataset I got these exception:
>>
>> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
>> Cannot currently handle nodes with more than 64 outputs.
>> at
>> org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection(OptimizerNode.java:348)
>> at
>> org.apache.flink.optimizer.dag.SingleInputNode.setInput(SingleInputNode.java:202)
>> at
>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:268)
>> at
>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:82)
>>
>> I try to split the incoming (74) list of dataset to split to 60 + 14
>>  dataset and create an id mapper and union the result datasets but no
>> success:
>>
>> val listOfDataSet: List[DataSet[...]] = ....
>>
>> listOfDataSet
>> .sliding(60,60)
>> .map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)),map(new IdMapper()))
>> //There is an iterator of DataSet
>> .reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception
>> .map(finalDataSet => ... some transformation ...)
>> .count()
>>
>> There is any solution to solve this?
>>
>> Thanks
>> b0c1
>>
>
>

Re: Union limit

Posted by Fabian Hueske <fh...@gmail.com>.
Hi b0c1,

This is an limitation in Flink's optimizer.
Internally, all binary unions are merged into a single n-ary union. The
optimizer restricts the number of inputs for an operator to 64.

You can work around this limitation with an identity mapper which prevents
the union operators from merging:

in1----\
in2------ Id-Map--- NextOp
...       /             / /
in14--/             / /
                      / /
in15------------/ /
...                   /
in74------------/

This is not a super nice solution, but the only way that comes to my mind.

Cheers, Fabian

2017-08-28 23:29 GMT+02:00 boci <bo...@gmail.com>:

> Hi guys!
>
> I have one input (from mongo) and I split the incoming data to multiple
> datasets (each created dynamically from configuration) and before I write
> back the result I want to merge it to one dataset (there is some common
> transformation).
> so the flow:
>
> DataSet from Mongod =>
> Create Mappers dynamically (currently 74) so I have 74 DataSet =>
> Custom filter and mapping on each dataset =>
> Union dynamically to one (every mapper result is same type) =>
> Some another common transformation =>
> Count the result
>
> but when I want to union more than 64 dataset I got these exception:
>
> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
> Cannot currently handle nodes with more than 64 outputs.
> at org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection(
> OptimizerNode.java:348)
> at org.apache.flink.optimizer.dag.SingleInputNode.setInput(
> SingleInputNode.java:202)
> at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(
> GraphCreatingVisitor.java:268)
> at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(
> GraphCreatingVisitor.java:82)
>
> I try to split the incoming (74) list of dataset to split to 60 + 14
>  dataset and create an id mapper and union the result datasets but no
> success:
>
> val listOfDataSet: List[DataSet[...]] = ....
>
> listOfDataSet
> .sliding(60,60)
> .map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)),map(new IdMapper()))
> //There is an iterator of DataSet
> .reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception
> .map(finalDataSet => ... some transformation ...)
> .count()
>
> There is any solution to solve this?
>
> Thanks
> b0c1
>