You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Fabian Hueske <fh...@gmail.com> on 2017/09/06 08:28:47 UTC

Re: Union limit

Hi,

the following code should do what you want.
I included an implementation of an IdMapper.
At the end, I print the execution plan which is generated after the
optimization (so the pipeline is working until then).

Best, Fabian

val data: Seq[Seq[Int]] = (1 until 315).map(i => Seq(1, 2, 3))

val dataSets: Seq[DataSet[Int]] = data.map(env.fromCollection(_))

dataSets.sliding(60, 60)
  .map(dsg => dsg.reduce( (ds1: DataSet[Int], ds2: DataSet[Int]) =>
ds1.union(ds2)).map(new IdMapper[Int]()))
  .reduce( (dsg1: DataSet[Int], dsg2: DataSet[Int]) => dsg1.union(dsg2))
  .map(x => x * 2) // do something with the union result
  .output(new DiscardingOutputFormat[Int])

println(env.getExecutionPlan())

class IdMapper[T] extends MapFunction[T, T] {
  override def map(value: T): T = value
}

2017-08-31 12:30 GMT+02:00 boci <bo...@gmail.com>:

> Dear Fabian,
>
> Thanks to your answer (I think you said same in StackOverflow) but as you
> see in my code your solution does not work anymore:
>
> Here is the code, it's split the datasets to list (each list contains
> maximum 60 datasets)
> After that, I  reduce the dataset using union and map with an IdMapper and
> return the id mapped data set.
> But when the next reduce (where I want to merge the id mapped stream) the
> flink said I reached the limit.
>
> Maybe my IdMapper is wrong... Can you show a correct working IdMapper?
>
> b0c1
>
> ps:
> Here is the code segment:
> listOfDataSet
> .sliding(60,60)
> .map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)).map(new IdMapper()))
> //There is an iterator of DataSet
> .reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception
> .map(finalDataSet => ... some transformation ...)
> .count()
>
>
>
> On Wed, 30 Aug 2017 at 15:44 Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi b0c1,
>>
>> This is an limitation in Flink's optimizer.
>> Internally, all binary unions are merged into a single n-ary union. The
>> optimizer restricts the number of inputs for an operator to 64.
>>
>> You can work around this limitation with an identity mapper which
>> prevents the union operators from merging:
>>
>> in1----\
>> in2------ Id-Map--- NextOp
>> ...       /             / /
>> in14--/             / /
>>                       / /
>> in15------------/ /
>> ...                   /
>> in74------------/
>>
>> This is not a super nice solution, but the only way that comes to my mind.
>>
>> Cheers, Fabian
>>
>> 2017-08-28 23:29 GMT+02:00 boci <bo...@gmail.com>:
>>
>>> Hi guys!
>>>
>>> I have one input (from mongo) and I split the incoming data to multiple
>>> datasets (each created dynamically from configuration) and before I write
>>> back the result I want to merge it to one dataset (there is some common
>>> transformation).
>>> so the flow:
>>>
>>> DataSet from Mongod =>
>>> Create Mappers dynamically (currently 74) so I have 74 DataSet =>
>>> Custom filter and mapping on each dataset =>
>>> Union dynamically to one (every mapper result is same type) =>
>>> Some another common transformation =>
>>> Count the result
>>>
>>> but when I want to union more than 64 dataset I got these exception:
>>>
>>> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
>>> Cannot currently handle nodes with more than 64 outputs.
>>> at org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection(
>>> OptimizerNode.java:348)
>>> at org.apache.flink.optimizer.dag.SingleInputNode.setInput(
>>> SingleInputNode.java:202)
>>> at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(
>>> GraphCreatingVisitor.java:268)
>>> at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(
>>> GraphCreatingVisitor.java:82)
>>>
>>> I try to split the incoming (74) list of dataset to split to 60 + 14
>>>  dataset and create an id mapper and union the result datasets but no
>>> success:
>>>
>>> val listOfDataSet: List[DataSet[...]] = ....
>>>
>>> listOfDataSet
>>> .sliding(60,60)
>>> .map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)),map(new IdMapper()))
>>> //There is an iterator of DataSet
>>> .reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception
>>> .map(finalDataSet => ... some transformation ...)
>>> .count()
>>>
>>> There is any solution to solve this?
>>>
>>> Thanks
>>> b0c1
>>>
>>
>>