You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Maximilian Alber <al...@gmail.com> on 2014/11/11 14:09:26 UTC
Re: No Nested Iterations??? And where is the Nested Iteration?
Hi Stephan,
you already had time to investigate this issue?
Cheers,
Max
On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <se...@apache.org> wrote:
> Hey!
>
> Clearly, this looks like a bug. Let me investigate that and get back at
> you later...
>
> Greetings,
> Stephan
>
>
> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
> alber.maximilian@gmail.com> wrote:
>
>> Hi Flinksters!
>>
>> First some good news: the cumsum code from the last issue works now
>> correctly and is tested.
>>
>> Bad news (at least for me): I just run into this (for the error and code
>> see below). You have a road map when this feature will be available?
>> Regardless of the rest, I would need it in the near future.
>>
>> So far so good. But I wonder where this nested iteration should be. At
>> least I do not see them... I have an iteration and inside a lot of
>> filters/maps/etc. but not another iteration.
>>
>> Cheers,
>> Max
>>
>> Error:
>>
>> org.apache.flink.compiler.CompilerException: An error occurred while
>> translating the optimized plan to a nephele JobGraph: An error occurred
>> while translating the optimized plan to a nephele JobGraph: Nested
>> Iterations are not possible at the moment!
>> at
>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>> at
>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>> at
>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>> at
>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>> at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>> at org.apache.flink.client.program.Client.run(Client.java:290)
>> at org.apache.flink.client.program.Client.run(Client.java:285)
>> at org.apache.flink.client.program.Client.run(Client.java:230)
>> at
>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>> at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>> Caused by: org.apache.flink.compiler.CompilerException: An error occurred
>> while translating the optimized plan to a nephele JobGraph: Nested
>> Iterations are not possible at the moment!
>> at
>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>> at
>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>> at
>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>> at
>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>> at
>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>> ... 14 more
>> Caused by: org.apache.flink.compiler.CompilerException: Nested Iterations
>> are not possible at the moment!
>> at
>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>> ... 33 more
>>
>> Code:
>>
>> def createPlanFirstIteration(env: ExecutionEnvironment) = {
>> val X = env readTextFile config.xFile map
>> {Vector.parseFromString(config.dimensions, _)}
>> val residual = env readTextFile config.yFile map
>> {Vector.parseFromString(_)}
>> val randoms = env readTextFile config.randomFile map
>> {Vector.parseFromString(_)}
>> val widthCandidates = env readTextFile config.widthCandidatesFile map
>> {Vector.parseFromString(config.dimensions, _)}
>>
>> val center = calcCenter(env, X, residual, randoms, 0)
>>
>> val x = calcWidthHeight(env, X, residual, widthCandidates, center)
>>
>> x map { _ toString } writeAsText config.outFile
>> }
>>
>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual:
>> DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector]
>> = {
>> val residual_2 = residual * residual
>> val ys = (residual_2 sumV) * (randoms filter {_.id == iteration}
>> neutralize)
>>
>> val emptyDataSet = env.fromCollection[Vector](Seq())
>> val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>> val cumSum = emptyDataSet.iterateDelta(sumVector union residual_2,
>> config.N+1, Array("id")) {
>> (solutionset, workset) =>
>> val current = workset filter (new RichFilterFunction[Vector]{
>> def filter(x: Vector) = x.id ==
>> (getIterationRuntimeContext.getSuperstepNumber-1)
>> })
>> val old_sum = workset filter {_.id == -1}
>> val sum = VectorDataSet.add(old_sum, current.neutralize())
>>
>> val new_workset = workset filter {_.id != -1} union sum
>> (sum map (new RichMapFunction[Vector, Vector]{
>> def map(x: Vector) = new
>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>> }),
>> new_workset)
>> }
>> val index = cumSum.filter(new RichFilterFunction[Vector](){
>> var y: Vector = null
>> override def open(config: Configuration) = {
>> y = getRuntimeContext.getBroadcastVariable("ys").toList.head
>> }
>> def filter(x: Vector) = x.values(0) < y.values(0)
>> }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0
>>
>> val center = X.filter(new RichFilterFunction[Vector](){
>> var index: Int = -1
>> override def open(config: Configuration) = {
>> val x: Tuple1[Int] =
>> getRuntimeContext.getBroadcastVariable("index").toList.head
>> index = x._1
>> }
>> def filter(x: Vector) = x.id == index
>> }).withBroadcastSet(index, "index")
>>
>> center neutralize
>> }
>>
>> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector], width:
>> DataSet[Vector]): DataSet[Vector] = {
>> X.map(new RichMapFunction[Vector, Vector]{
>> var center: Vector = null
>> var width: Vector = null
>> override def open(config: Configuration) = {
>> center =
>> getRuntimeContext.getBroadcastVariable("center").toList.head
>> width = getRuntimeContext.getBroadcastVariable("width").toList.head
>> }
>>
>> def map(x: Vector) = new Vector(x.id,
>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>> }).withBroadcastSet(center, "center").withBroadcastSet(width, "width")
>> }
>>
>>
>> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
>> residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
>> DataSet[Vector]): DataSet[Vector] = {
>> val emptyDataSet = env.fromCollection[Vector](Seq())
>> val costs = emptyDataSet.iterateDelta(widthCandidates,
>> config.NWidthCandidates, Array("id")) {
>> (solutionset, workset) =>
>> val currentWidth = workset filter (new RichFilterFunction[Vector]{
>> def filter(x: Vector) = x.id ==
>> (getIterationRuntimeContext.getSuperstepNumber-1)
>> })
>>
>> val kernelVector = getKernelVector(X, center, currentWidth)
>>
>> val x1 = kernelVector dot residual map {x => x*x}
>> val x2 = kernelVector dot kernelVector
>>
>> val cost = (x1 / x2) neutralize
>>
>>
>> (cost map (new RichMapFunction[Vector, Vector]{
>> def map(x: Vector) = new
>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>> }),
>> workset)
>> }
>>
>> // todo: will not work
>> //val width = costs max(0)
>>
>> //val kernelVector = getKernelVector(X, center, width)
>>
>> //val x1 = kernelVector dot residual
>> //val x2 = kernelVector dot kernelVector
>> //val height = x1 / x2
>> costs
>> }
>>
>
>
Re: No Nested Iterations??? And where is the Nested Iteration?
Posted by Maximilian Alber <al...@gmail.com>.
The error is gone.
Thanks!
Cheers,
Max
On Wed, Nov 12, 2014 at 11:41 PM, Stephan Ewen <se...@apache.org> wrote:
> The current master contains a fix for the incorrectly identified nested
> iteration bug.
>
> Please let us know if it fixes your problem!
>
> Greetings,
> Stephan
>
>
> On Wed, Nov 12, 2014 at 5:19 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Talking at the meetup sounds good!
>>
>> On Wed, Nov 12, 2014 at 5:15 PM, Maximilian Alber <
>> alber.maximilian@gmail.com> wrote:
>>
>>> Ok. With for loop style you intend a loop with a fixed range?
>>> In my case I would have a delta-iteration inside a bulk-iteration. I
>>> guess wouldn't be "roll-out-able"?
>>>
>>> Btw is there any intention to allow bulk-style iterations on several
>>> datasets "concurrently"?
>>>
>>> Maybe we could discuss my problem next week at the meetup?
>>>
>>> Thank you for the offer, but I'm in the middle of thesis, thus I don't
>>> have time for it.
>>>
>>> Cheers,
>>> Max
>>>
>>> On Wed, Nov 12, 2014 at 4:59 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> We are not planning to add closed-loop nested iterations in the near
>>>> future. That is a bit of an effort and so far, and I think no one can pick
>>>> that up very soon.
>>>>
>>>> We will be supporting roll-out iterations (for loop style) much more
>>>> efficiently soon. There is no reason why you could not nest two for-loops.
>>>> However, those are only bulk-style, not delta-iteration style.
>>>>
>>>> If you would like to contribute iteration nesting, I could help you to
>>>> get started.
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Nov 12, 2014 at 4:47 PM, Maximilian Alber <
>>>> alber.maximilian@gmail.com> wrote:
>>>>
>>>>> Oh sorry, I just read the bug title. So my questions is when you are
>>>>> planning to add nested iterations?
>>>>>
>>>>> Cheers,
>>>>> Max
>>>>>
>>>>> On Wed, Nov 12, 2014 at 4:45 PM, Maximilian Alber <
>>>>> alber.maximilian@gmail.com> wrote:
>>>>>
>>>>>> Ok, thanks.
>>>>>>
>>>>>> But the bug causes that it Flink "sees" a nested iteration where none
>>>>>> is?
>>>>>> Or is it a bug that nested are not supported? If not when you plan to
>>>>>> add this feature?
>>>>>> Because I need nested iterations for my algorithm, so it would be
>>>>>> nice to know when I can expect them.
>>>>>>
>>>>>> Cheers,
>>>>>> Max
>>>>>>
>>>>>> On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <se...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> I found the cause of the bug and have opened a JIRA to track it.
>>>>>>>
>>>>>>> https://issues.apache.org/jira/browse/FLINK-1235
>>>>>>>
>>>>>>> You can watch that one to keep updated.
>>>>>>>
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <se...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi!
>>>>>>>>
>>>>>>>> I am looking into it right now...
>>>>>>>>
>>>>>>>> Stephan
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <
>>>>>>>> alber.maximilian@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Stephan,
>>>>>>>>>
>>>>>>>>> you already had time to investigate this issue?
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Max
>>>>>>>>>
>>>>>>>>> On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <se...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hey!
>>>>>>>>>>
>>>>>>>>>> Clearly, this looks like a bug. Let me investigate that and get
>>>>>>>>>> back at you later...
>>>>>>>>>>
>>>>>>>>>> Greetings,
>>>>>>>>>> Stephan
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
>>>>>>>>>> alber.maximilian@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Flinksters!
>>>>>>>>>>>
>>>>>>>>>>> First some good news: the cumsum code from the last issue works
>>>>>>>>>>> now correctly and is tested.
>>>>>>>>>>>
>>>>>>>>>>> Bad news (at least for me): I just run into this (for the error
>>>>>>>>>>> and code see below). You have a road map when this feature will be
>>>>>>>>>>> available? Regardless of the rest, I would need it in the near future.
>>>>>>>>>>>
>>>>>>>>>>> So far so good. But I wonder where this nested iteration should
>>>>>>>>>>> be. At least I do not see them... I have an iteration and inside a lot of
>>>>>>>>>>> filters/maps/etc. but not another iteration.
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Max
>>>>>>>>>>>
>>>>>>>>>>> Error:
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.compiler.CompilerException: An error occurred
>>>>>>>>>>> while translating the optimized plan to a nephele JobGraph: An error
>>>>>>>>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>>>>>>>>> Iterations are not possible at the moment!
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>>>>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:290)
>>>>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:285)
>>>>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:230)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>>>>>>>>>> Caused by: org.apache.flink.compiler.CompilerException: An error
>>>>>>>>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>>>>>>>>> Iterations are not possible at the moment!
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>>>>>>>>>>> ... 14 more
>>>>>>>>>>> Caused by: org.apache.flink.compiler.CompilerException: Nested
>>>>>>>>>>> Iterations are not possible at the moment!
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>>>>>>>>>>> ... 33 more
>>>>>>>>>>>
>>>>>>>>>>> Code:
>>>>>>>>>>>
>>>>>>>>>>> def createPlanFirstIteration(env: ExecutionEnvironment) = {
>>>>>>>>>>> val X = env readTextFile config.xFile map
>>>>>>>>>>> {Vector.parseFromString(config.dimensions, _)}
>>>>>>>>>>> val residual = env readTextFile config.yFile map
>>>>>>>>>>> {Vector.parseFromString(_)}
>>>>>>>>>>> val randoms = env readTextFile config.randomFile map
>>>>>>>>>>> {Vector.parseFromString(_)}
>>>>>>>>>>> val widthCandidates = env readTextFile
>>>>>>>>>>> config.widthCandidatesFile map {Vector.parseFromString(config.dimensions,
>>>>>>>>>>> _)}
>>>>>>>>>>>
>>>>>>>>>>> val center = calcCenter(env, X, residual, randoms, 0)
>>>>>>>>>>>
>>>>>>>>>>> val x = calcWidthHeight(env, X, residual, widthCandidates,
>>>>>>>>>>> center)
>>>>>>>>>>>
>>>>>>>>>>> x map { _ toString } writeAsText config.outFile
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>>>>>>> residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int):
>>>>>>>>>>> DataSet[Vector] = {
>>>>>>>>>>> val residual_2 = residual * residual
>>>>>>>>>>> val ys = (residual_2 sumV) * (randoms filter {_.id ==
>>>>>>>>>>> iteration} neutralize)
>>>>>>>>>>>
>>>>>>>>>>> val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>>>>>> val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>>>>>>>>>>> val cumSum = emptyDataSet.iterateDelta(sumVector union
>>>>>>>>>>> residual_2, config.N+1, Array("id")) {
>>>>>>>>>>> (solutionset, workset) =>
>>>>>>>>>>> val current = workset filter (new
>>>>>>>>>>> RichFilterFunction[Vector]{
>>>>>>>>>>> def filter(x: Vector) = x.id ==
>>>>>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>>>>>> })
>>>>>>>>>>> val old_sum = workset filter {_.id == -1}
>>>>>>>>>>> val sum = VectorDataSet.add(old_sum, current.neutralize())
>>>>>>>>>>>
>>>>>>>>>>> val new_workset = workset filter {_.id != -1} union sum
>>>>>>>>>>> (sum map (new RichMapFunction[Vector, Vector]{
>>>>>>>>>>> def map(x: Vector) = new
>>>>>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>>>>>> }),
>>>>>>>>>>> new_workset)
>>>>>>>>>>> }
>>>>>>>>>>> val index = cumSum.filter(new RichFilterFunction[Vector](){
>>>>>>>>>>> var y: Vector = null
>>>>>>>>>>> override def open(config: Configuration) = {
>>>>>>>>>>> y =
>>>>>>>>>>> getRuntimeContext.getBroadcastVariable("ys").toList.head
>>>>>>>>>>> }
>>>>>>>>>>> def filter(x: Vector) = x.values(0) < y.values(0)
>>>>>>>>>>> }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)}
>>>>>>>>>>> sum 0
>>>>>>>>>>>
>>>>>>>>>>> val center = X.filter(new RichFilterFunction[Vector](){
>>>>>>>>>>> var index: Int = -1
>>>>>>>>>>> override def open(config: Configuration) = {
>>>>>>>>>>> val x: Tuple1[Int] =
>>>>>>>>>>> getRuntimeContext.getBroadcastVariable("index").toList.head
>>>>>>>>>>> index = x._1
>>>>>>>>>>> }
>>>>>>>>>>> def filter(x: Vector) = x.id == index
>>>>>>>>>>> }).withBroadcastSet(index, "index")
>>>>>>>>>>>
>>>>>>>>>>> center neutralize
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector],
>>>>>>>>>>> width: DataSet[Vector]): DataSet[Vector] = {
>>>>>>>>>>> X.map(new RichMapFunction[Vector, Vector]{
>>>>>>>>>>> var center: Vector = null
>>>>>>>>>>> var width: Vector = null
>>>>>>>>>>> override def open(config: Configuration) = {
>>>>>>>>>>> center =
>>>>>>>>>>> getRuntimeContext.getBroadcastVariable("center").toList.head
>>>>>>>>>>> width =
>>>>>>>>>>> getRuntimeContext.getBroadcastVariable("width").toList.head
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> def map(x: Vector) = new Vector(x.id,
>>>>>>>>>>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>>>>>>>>>>> }).withBroadcastSet(center,
>>>>>>>>>>> "center").withBroadcastSet(width, "width")
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> def calcWidthHeight(env: ExecutionEnvironment, X:
>>>>>>>>>>> DataSet[Vector], residual: DataSet[Vector], widthCandidates:
>>>>>>>>>>> DataSet[Vector], center: DataSet[Vector]): DataSet[Vector] = {
>>>>>>>>>>> val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>>>>>> val costs = emptyDataSet.iterateDelta(widthCandidates,
>>>>>>>>>>> config.NWidthCandidates, Array("id")) {
>>>>>>>>>>> (solutionset, workset) =>
>>>>>>>>>>> val currentWidth = workset filter (new
>>>>>>>>>>> RichFilterFunction[Vector]{
>>>>>>>>>>> def filter(x: Vector) = x.id ==
>>>>>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>>>>>> })
>>>>>>>>>>>
>>>>>>>>>>> val kernelVector = getKernelVector(X, center, currentWidth)
>>>>>>>>>>>
>>>>>>>>>>> val x1 = kernelVector dot residual map {x => x*x}
>>>>>>>>>>> val x2 = kernelVector dot kernelVector
>>>>>>>>>>>
>>>>>>>>>>> val cost = (x1 / x2) neutralize
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> (cost map (new RichMapFunction[Vector, Vector]{
>>>>>>>>>>> def map(x: Vector) = new
>>>>>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>>>>>> }),
>>>>>>>>>>> workset)
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> // todo: will not work
>>>>>>>>>>> //val width = costs max(0)
>>>>>>>>>>>
>>>>>>>>>>> //val kernelVector = getKernelVector(X, center, width)
>>>>>>>>>>>
>>>>>>>>>>> //val x1 = kernelVector dot residual
>>>>>>>>>>> //val x2 = kernelVector dot kernelVector
>>>>>>>>>>> //val height = x1 / x2
>>>>>>>>>>> costs
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
Re: No Nested Iterations??? And where is the Nested Iteration?
Posted by Stephan Ewen <se...@apache.org>.
The current master contains a fix for the incorrectly identified nested
iteration bug.
Please let us know if it fixes your problem!
Greetings,
Stephan
On Wed, Nov 12, 2014 at 5:19 PM, Stephan Ewen <se...@apache.org> wrote:
> Talking at the meetup sounds good!
>
> On Wed, Nov 12, 2014 at 5:15 PM, Maximilian Alber <
> alber.maximilian@gmail.com> wrote:
>
>> Ok. With for loop style you intend a loop with a fixed range?
>> In my case I would have a delta-iteration inside a bulk-iteration. I
>> guess wouldn't be "roll-out-able"?
>>
>> Btw is there any intention to allow bulk-style iterations on several
>> datasets "concurrently"?
>>
>> Maybe we could discuss my problem next week at the meetup?
>>
>> Thank you for the offer, but I'm in the middle of thesis, thus I don't
>> have time for it.
>>
>> Cheers,
>> Max
>>
>> On Wed, Nov 12, 2014 at 4:59 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> We are not planning to add closed-loop nested iterations in the near
>>> future. That is a bit of an effort and so far, and I think no one can pick
>>> that up very soon.
>>>
>>> We will be supporting roll-out iterations (for loop style) much more
>>> efficiently soon. There is no reason why you could not nest two for-loops.
>>> However, those are only bulk-style, not delta-iteration style.
>>>
>>> If you would like to contribute iteration nesting, I could help you to
>>> get started.
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Wed, Nov 12, 2014 at 4:47 PM, Maximilian Alber <
>>> alber.maximilian@gmail.com> wrote:
>>>
>>>> Oh sorry, I just read the bug title. So my questions is when you are
>>>> planning to add nested iterations?
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> On Wed, Nov 12, 2014 at 4:45 PM, Maximilian Alber <
>>>> alber.maximilian@gmail.com> wrote:
>>>>
>>>>> Ok, thanks.
>>>>>
>>>>> But the bug causes that it Flink "sees" a nested iteration where none
>>>>> is?
>>>>> Or is it a bug that nested are not supported? If not when you plan to
>>>>> add this feature?
>>>>> Because I need nested iterations for my algorithm, so it would be nice
>>>>> to know when I can expect them.
>>>>>
>>>>> Cheers,
>>>>> Max
>>>>>
>>>>> On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <se...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> I found the cause of the bug and have opened a JIRA to track it.
>>>>>>
>>>>>> https://issues.apache.org/jira/browse/FLINK-1235
>>>>>>
>>>>>> You can watch that one to keep updated.
>>>>>>
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>> On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <se...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi!
>>>>>>>
>>>>>>> I am looking into it right now...
>>>>>>>
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <
>>>>>>> alber.maximilian@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Stephan,
>>>>>>>>
>>>>>>>> you already had time to investigate this issue?
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Max
>>>>>>>>
>>>>>>>> On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <se...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hey!
>>>>>>>>>
>>>>>>>>> Clearly, this looks like a bug. Let me investigate that and get
>>>>>>>>> back at you later...
>>>>>>>>>
>>>>>>>>> Greetings,
>>>>>>>>> Stephan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
>>>>>>>>> alber.maximilian@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Flinksters!
>>>>>>>>>>
>>>>>>>>>> First some good news: the cumsum code from the last issue works
>>>>>>>>>> now correctly and is tested.
>>>>>>>>>>
>>>>>>>>>> Bad news (at least for me): I just run into this (for the error
>>>>>>>>>> and code see below). You have a road map when this feature will be
>>>>>>>>>> available? Regardless of the rest, I would need it in the near future.
>>>>>>>>>>
>>>>>>>>>> So far so good. But I wonder where this nested iteration should
>>>>>>>>>> be. At least I do not see them... I have an iteration and inside a lot of
>>>>>>>>>> filters/maps/etc. but not another iteration.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Max
>>>>>>>>>>
>>>>>>>>>> Error:
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.compiler.CompilerException: An error occurred
>>>>>>>>>> while translating the optimized plan to a nephele JobGraph: An error
>>>>>>>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>>>>>>>> Iterations are not possible at the moment!
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>>>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:290)
>>>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:285)
>>>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:230)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>>>>>>>>> Caused by: org.apache.flink.compiler.CompilerException: An error
>>>>>>>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>>>>>>>> Iterations are not possible at the moment!
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>>>>>>>>>> ... 14 more
>>>>>>>>>> Caused by: org.apache.flink.compiler.CompilerException: Nested
>>>>>>>>>> Iterations are not possible at the moment!
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>>>>>>>>>> ... 33 more
>>>>>>>>>>
>>>>>>>>>> Code:
>>>>>>>>>>
>>>>>>>>>> def createPlanFirstIteration(env: ExecutionEnvironment) = {
>>>>>>>>>> val X = env readTextFile config.xFile map
>>>>>>>>>> {Vector.parseFromString(config.dimensions, _)}
>>>>>>>>>> val residual = env readTextFile config.yFile map
>>>>>>>>>> {Vector.parseFromString(_)}
>>>>>>>>>> val randoms = env readTextFile config.randomFile map
>>>>>>>>>> {Vector.parseFromString(_)}
>>>>>>>>>> val widthCandidates = env readTextFile
>>>>>>>>>> config.widthCandidatesFile map {Vector.parseFromString(config.dimensions,
>>>>>>>>>> _)}
>>>>>>>>>>
>>>>>>>>>> val center = calcCenter(env, X, residual, randoms, 0)
>>>>>>>>>>
>>>>>>>>>> val x = calcWidthHeight(env, X, residual, widthCandidates,
>>>>>>>>>> center)
>>>>>>>>>>
>>>>>>>>>> x map { _ toString } writeAsText config.outFile
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>>>>>> residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int):
>>>>>>>>>> DataSet[Vector] = {
>>>>>>>>>> val residual_2 = residual * residual
>>>>>>>>>> val ys = (residual_2 sumV) * (randoms filter {_.id ==
>>>>>>>>>> iteration} neutralize)
>>>>>>>>>>
>>>>>>>>>> val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>>>>> val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>>>>>>>>>> val cumSum = emptyDataSet.iterateDelta(sumVector union
>>>>>>>>>> residual_2, config.N+1, Array("id")) {
>>>>>>>>>> (solutionset, workset) =>
>>>>>>>>>> val current = workset filter (new
>>>>>>>>>> RichFilterFunction[Vector]{
>>>>>>>>>> def filter(x: Vector) = x.id ==
>>>>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>>>>> })
>>>>>>>>>> val old_sum = workset filter {_.id == -1}
>>>>>>>>>> val sum = VectorDataSet.add(old_sum, current.neutralize())
>>>>>>>>>>
>>>>>>>>>> val new_workset = workset filter {_.id != -1} union sum
>>>>>>>>>> (sum map (new RichMapFunction[Vector, Vector]{
>>>>>>>>>> def map(x: Vector) = new
>>>>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>>>>> }),
>>>>>>>>>> new_workset)
>>>>>>>>>> }
>>>>>>>>>> val index = cumSum.filter(new RichFilterFunction[Vector](){
>>>>>>>>>> var y: Vector = null
>>>>>>>>>> override def open(config: Configuration) = {
>>>>>>>>>> y =
>>>>>>>>>> getRuntimeContext.getBroadcastVariable("ys").toList.head
>>>>>>>>>> }
>>>>>>>>>> def filter(x: Vector) = x.values(0) < y.values(0)
>>>>>>>>>> }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum
>>>>>>>>>> 0
>>>>>>>>>>
>>>>>>>>>> val center = X.filter(new RichFilterFunction[Vector](){
>>>>>>>>>> var index: Int = -1
>>>>>>>>>> override def open(config: Configuration) = {
>>>>>>>>>> val x: Tuple1[Int] =
>>>>>>>>>> getRuntimeContext.getBroadcastVariable("index").toList.head
>>>>>>>>>> index = x._1
>>>>>>>>>> }
>>>>>>>>>> def filter(x: Vector) = x.id == index
>>>>>>>>>> }).withBroadcastSet(index, "index")
>>>>>>>>>>
>>>>>>>>>> center neutralize
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector],
>>>>>>>>>> width: DataSet[Vector]): DataSet[Vector] = {
>>>>>>>>>> X.map(new RichMapFunction[Vector, Vector]{
>>>>>>>>>> var center: Vector = null
>>>>>>>>>> var width: Vector = null
>>>>>>>>>> override def open(config: Configuration) = {
>>>>>>>>>> center =
>>>>>>>>>> getRuntimeContext.getBroadcastVariable("center").toList.head
>>>>>>>>>> width =
>>>>>>>>>> getRuntimeContext.getBroadcastVariable("width").toList.head
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> def map(x: Vector) = new Vector(x.id,
>>>>>>>>>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>>>>>>>>>> }).withBroadcastSet(center, "center").withBroadcastSet(width,
>>>>>>>>>> "width")
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> def calcWidthHeight(env: ExecutionEnvironment, X:
>>>>>>>>>> DataSet[Vector], residual: DataSet[Vector], widthCandidates:
>>>>>>>>>> DataSet[Vector], center: DataSet[Vector]): DataSet[Vector] = {
>>>>>>>>>> val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>>>>> val costs = emptyDataSet.iterateDelta(widthCandidates,
>>>>>>>>>> config.NWidthCandidates, Array("id")) {
>>>>>>>>>> (solutionset, workset) =>
>>>>>>>>>> val currentWidth = workset filter (new
>>>>>>>>>> RichFilterFunction[Vector]{
>>>>>>>>>> def filter(x: Vector) = x.id ==
>>>>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>>>>> })
>>>>>>>>>>
>>>>>>>>>> val kernelVector = getKernelVector(X, center, currentWidth)
>>>>>>>>>>
>>>>>>>>>> val x1 = kernelVector dot residual map {x => x*x}
>>>>>>>>>> val x2 = kernelVector dot kernelVector
>>>>>>>>>>
>>>>>>>>>> val cost = (x1 / x2) neutralize
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> (cost map (new RichMapFunction[Vector, Vector]{
>>>>>>>>>> def map(x: Vector) = new
>>>>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>>>>> }),
>>>>>>>>>> workset)
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> // todo: will not work
>>>>>>>>>> //val width = costs max(0)
>>>>>>>>>>
>>>>>>>>>> //val kernelVector = getKernelVector(X, center, width)
>>>>>>>>>>
>>>>>>>>>> //val x1 = kernelVector dot residual
>>>>>>>>>> //val x2 = kernelVector dot kernelVector
>>>>>>>>>> //val height = x1 / x2
>>>>>>>>>> costs
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
Re: No Nested Iterations??? And where is the Nested Iteration?
Posted by Stephan Ewen <se...@apache.org>.
Talking at the meetup sounds good!
On Wed, Nov 12, 2014 at 5:15 PM, Maximilian Alber <
alber.maximilian@gmail.com> wrote:
> Ok. With for loop style you intend a loop with a fixed range?
> In my case I would have a delta-iteration inside a bulk-iteration. I guess
> wouldn't be "roll-out-able"?
>
> Btw is there any intention to allow bulk-style iterations on several
> datasets "concurrently"?
>
> Maybe we could discuss my problem next week at the meetup?
>
> Thank you for the offer, but I'm in the middle of thesis, thus I don't
> have time for it.
>
> Cheers,
> Max
>
> On Wed, Nov 12, 2014 at 4:59 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> We are not planning to add closed-loop nested iterations in the near
>> future. That is a bit of an effort and so far, and I think no one can pick
>> that up very soon.
>>
>> We will be supporting roll-out iterations (for loop style) much more
>> efficiently soon. There is no reason why you could not nest two for-loops.
>> However, those are only bulk-style, not delta-iteration style.
>>
>> If you would like to contribute iteration nesting, I could help you to
>> get started.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Wed, Nov 12, 2014 at 4:47 PM, Maximilian Alber <
>> alber.maximilian@gmail.com> wrote:
>>
>>> Oh sorry, I just read the bug title. So my questions is when you are
>>> planning to add nested iterations?
>>>
>>> Cheers,
>>> Max
>>>
>>> On Wed, Nov 12, 2014 at 4:45 PM, Maximilian Alber <
>>> alber.maximilian@gmail.com> wrote:
>>>
>>>> Ok, thanks.
>>>>
>>>> But the bug causes that it Flink "sees" a nested iteration where none
>>>> is?
>>>> Or is it a bug that nested are not supported? If not when you plan to
>>>> add this feature?
>>>> Because I need nested iterations for my algorithm, so it would be nice
>>>> to know when I can expect them.
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <se...@apache.org> wrote:
>>>>
>>>>> I found the cause of the bug and have opened a JIRA to track it.
>>>>>
>>>>> https://issues.apache.org/jira/browse/FLINK-1235
>>>>>
>>>>> You can watch that one to keep updated.
>>>>>
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <se...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> I am looking into it right now...
>>>>>>
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>> On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <
>>>>>> alber.maximilian@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Stephan,
>>>>>>>
>>>>>>> you already had time to investigate this issue?
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Max
>>>>>>>
>>>>>>> On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <se...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey!
>>>>>>>>
>>>>>>>> Clearly, this looks like a bug. Let me investigate that and get
>>>>>>>> back at you later...
>>>>>>>>
>>>>>>>> Greetings,
>>>>>>>> Stephan
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
>>>>>>>> alber.maximilian@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Flinksters!
>>>>>>>>>
>>>>>>>>> First some good news: the cumsum code from the last issue works
>>>>>>>>> now correctly and is tested.
>>>>>>>>>
>>>>>>>>> Bad news (at least for me): I just run into this (for the error
>>>>>>>>> and code see below). You have a road map when this feature will be
>>>>>>>>> available? Regardless of the rest, I would need it in the near future.
>>>>>>>>>
>>>>>>>>> So far so good. But I wonder where this nested iteration should
>>>>>>>>> be. At least I do not see them... I have an iteration and inside a lot of
>>>>>>>>> filters/maps/etc. but not another iteration.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Max
>>>>>>>>>
>>>>>>>>> Error:
>>>>>>>>>
>>>>>>>>> org.apache.flink.compiler.CompilerException: An error occurred
>>>>>>>>> while translating the optimized plan to a nephele JobGraph: An error
>>>>>>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>>>>>>> Iterations are not possible at the moment!
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:290)
>>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:285)
>>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:230)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>>>>>>>> Caused by: org.apache.flink.compiler.CompilerException: An error
>>>>>>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>>>>>>> Iterations are not possible at the moment!
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>>>>>>>>> ... 14 more
>>>>>>>>> Caused by: org.apache.flink.compiler.CompilerException: Nested
>>>>>>>>> Iterations are not possible at the moment!
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>>>>>>>>> ... 33 more
>>>>>>>>>
>>>>>>>>> Code:
>>>>>>>>>
>>>>>>>>> def createPlanFirstIteration(env: ExecutionEnvironment) = {
>>>>>>>>> val X = env readTextFile config.xFile map
>>>>>>>>> {Vector.parseFromString(config.dimensions, _)}
>>>>>>>>> val residual = env readTextFile config.yFile map
>>>>>>>>> {Vector.parseFromString(_)}
>>>>>>>>> val randoms = env readTextFile config.randomFile map
>>>>>>>>> {Vector.parseFromString(_)}
>>>>>>>>> val widthCandidates = env readTextFile
>>>>>>>>> config.widthCandidatesFile map {Vector.parseFromString(config.dimensions,
>>>>>>>>> _)}
>>>>>>>>>
>>>>>>>>> val center = calcCenter(env, X, residual, randoms, 0)
>>>>>>>>>
>>>>>>>>> val x = calcWidthHeight(env, X, residual, widthCandidates,
>>>>>>>>> center)
>>>>>>>>>
>>>>>>>>> x map { _ toString } writeAsText config.outFile
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>>>>> residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int):
>>>>>>>>> DataSet[Vector] = {
>>>>>>>>> val residual_2 = residual * residual
>>>>>>>>> val ys = (residual_2 sumV) * (randoms filter {_.id ==
>>>>>>>>> iteration} neutralize)
>>>>>>>>>
>>>>>>>>> val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>>>> val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>>>>>>>>> val cumSum = emptyDataSet.iterateDelta(sumVector union
>>>>>>>>> residual_2, config.N+1, Array("id")) {
>>>>>>>>> (solutionset, workset) =>
>>>>>>>>> val current = workset filter (new RichFilterFunction[Vector]{
>>>>>>>>> def filter(x: Vector) = x.id ==
>>>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>>>> })
>>>>>>>>> val old_sum = workset filter {_.id == -1}
>>>>>>>>> val sum = VectorDataSet.add(old_sum, current.neutralize())
>>>>>>>>>
>>>>>>>>> val new_workset = workset filter {_.id != -1} union sum
>>>>>>>>> (sum map (new RichMapFunction[Vector, Vector]{
>>>>>>>>> def map(x: Vector) = new
>>>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>>>> }),
>>>>>>>>> new_workset)
>>>>>>>>> }
>>>>>>>>> val index = cumSum.filter(new RichFilterFunction[Vector](){
>>>>>>>>> var y: Vector = null
>>>>>>>>> override def open(config: Configuration) = {
>>>>>>>>> y =
>>>>>>>>> getRuntimeContext.getBroadcastVariable("ys").toList.head
>>>>>>>>> }
>>>>>>>>> def filter(x: Vector) = x.values(0) < y.values(0)
>>>>>>>>> }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0
>>>>>>>>>
>>>>>>>>> val center = X.filter(new RichFilterFunction[Vector](){
>>>>>>>>> var index: Int = -1
>>>>>>>>> override def open(config: Configuration) = {
>>>>>>>>> val x: Tuple1[Int] =
>>>>>>>>> getRuntimeContext.getBroadcastVariable("index").toList.head
>>>>>>>>> index = x._1
>>>>>>>>> }
>>>>>>>>> def filter(x: Vector) = x.id == index
>>>>>>>>> }).withBroadcastSet(index, "index")
>>>>>>>>>
>>>>>>>>> center neutralize
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector],
>>>>>>>>> width: DataSet[Vector]): DataSet[Vector] = {
>>>>>>>>> X.map(new RichMapFunction[Vector, Vector]{
>>>>>>>>> var center: Vector = null
>>>>>>>>> var width: Vector = null
>>>>>>>>> override def open(config: Configuration) = {
>>>>>>>>> center =
>>>>>>>>> getRuntimeContext.getBroadcastVariable("center").toList.head
>>>>>>>>> width =
>>>>>>>>> getRuntimeContext.getBroadcastVariable("width").toList.head
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> def map(x: Vector) = new Vector(x.id,
>>>>>>>>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>>>>>>>>> }).withBroadcastSet(center, "center").withBroadcastSet(width,
>>>>>>>>> "width")
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>>>>> residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
>>>>>>>>> DataSet[Vector]): DataSet[Vector] = {
>>>>>>>>> val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>>>> val costs = emptyDataSet.iterateDelta(widthCandidates,
>>>>>>>>> config.NWidthCandidates, Array("id")) {
>>>>>>>>> (solutionset, workset) =>
>>>>>>>>> val currentWidth = workset filter (new
>>>>>>>>> RichFilterFunction[Vector]{
>>>>>>>>> def filter(x: Vector) = x.id ==
>>>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>>>> })
>>>>>>>>>
>>>>>>>>> val kernelVector = getKernelVector(X, center, currentWidth)
>>>>>>>>>
>>>>>>>>> val x1 = kernelVector dot residual map {x => x*x}
>>>>>>>>> val x2 = kernelVector dot kernelVector
>>>>>>>>>
>>>>>>>>> val cost = (x1 / x2) neutralize
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> (cost map (new RichMapFunction[Vector, Vector]{
>>>>>>>>> def map(x: Vector) = new
>>>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>>>> }),
>>>>>>>>> workset)
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> // todo: will not work
>>>>>>>>> //val width = costs max(0)
>>>>>>>>>
>>>>>>>>> //val kernelVector = getKernelVector(X, center, width)
>>>>>>>>>
>>>>>>>>> //val x1 = kernelVector dot residual
>>>>>>>>> //val x2 = kernelVector dot kernelVector
>>>>>>>>> //val height = x1 / x2
>>>>>>>>> costs
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
Re: No Nested Iterations??? And where is the Nested Iteration?
Posted by Maximilian Alber <al...@gmail.com>.
Ok. With for loop style you intend a loop with a fixed range?
In my case I would have a delta-iteration inside a bulk-iteration. I guess
wouldn't be "roll-out-able"?
Btw is there any intention to allow bulk-style iterations on several
datasets "concurrently"?
Maybe we could discuss my problem next week at the meetup?
Thank you for the offer, but I'm in the middle of thesis, thus I don't have
time for it.
Cheers,
Max
On Wed, Nov 12, 2014 at 4:59 PM, Stephan Ewen <se...@apache.org> wrote:
> We are not planning to add closed-loop nested iterations in the near
> future. That is a bit of an effort and so far, and I think no one can pick
> that up very soon.
>
> We will be supporting roll-out iterations (for loop style) much more
> efficiently soon. There is no reason why you could not nest two for-loops.
> However, those are only bulk-style, not delta-iteration style.
>
> If you would like to contribute iteration nesting, I could help you to get
> started.
>
> Greetings,
> Stephan
>
>
> On Wed, Nov 12, 2014 at 4:47 PM, Maximilian Alber <
> alber.maximilian@gmail.com> wrote:
>
>> Oh sorry, I just read the bug title. So my questions is when you are
>> planning to add nested iterations?
>>
>> Cheers,
>> Max
>>
>> On Wed, Nov 12, 2014 at 4:45 PM, Maximilian Alber <
>> alber.maximilian@gmail.com> wrote:
>>
>>> Ok, thanks.
>>>
>>> But the bug causes that it Flink "sees" a nested iteration where none is?
>>> Or is it a bug that nested are not supported? If not when you plan to
>>> add this feature?
>>> Because I need nested iterations for my algorithm, so it would be nice
>>> to know when I can expect them.
>>>
>>> Cheers,
>>> Max
>>>
>>> On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> I found the cause of the bug and have opened a JIRA to track it.
>>>>
>>>> https://issues.apache.org/jira/browse/FLINK-1235
>>>>
>>>> You can watch that one to keep updated.
>>>>
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <se...@apache.org> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> I am looking into it right now...
>>>>>
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <
>>>>> alber.maximilian@gmail.com> wrote:
>>>>>
>>>>>> Hi Stephan,
>>>>>>
>>>>>> you already had time to investigate this issue?
>>>>>>
>>>>>> Cheers,
>>>>>> Max
>>>>>>
>>>>>> On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <se...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey!
>>>>>>>
>>>>>>> Clearly, this looks like a bug. Let me investigate that and get back
>>>>>>> at you later...
>>>>>>>
>>>>>>> Greetings,
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
>>>>>>> alber.maximilian@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Flinksters!
>>>>>>>>
>>>>>>>> First some good news: the cumsum code from the last issue works now
>>>>>>>> correctly and is tested.
>>>>>>>>
>>>>>>>> Bad news (at least for me): I just run into this (for the error and
>>>>>>>> code see below). You have a road map when this feature will be available?
>>>>>>>> Regardless of the rest, I would need it in the near future.
>>>>>>>>
>>>>>>>> So far so good. But I wonder where this nested iteration should be.
>>>>>>>> At least I do not see them... I have an iteration and inside a lot of
>>>>>>>> filters/maps/etc. but not another iteration.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Max
>>>>>>>>
>>>>>>>> Error:
>>>>>>>>
>>>>>>>> org.apache.flink.compiler.CompilerException: An error occurred
>>>>>>>> while translating the optimized plan to a nephele JobGraph: An error
>>>>>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>>>>>> Iterations are not possible at the moment!
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:290)
>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:285)
>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:230)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>>>>>>> Caused by: org.apache.flink.compiler.CompilerException: An error
>>>>>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>>>>>> Iterations are not possible at the moment!
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>>>>>>>> ... 14 more
>>>>>>>> Caused by: org.apache.flink.compiler.CompilerException: Nested
>>>>>>>> Iterations are not possible at the moment!
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>>>>>>>> ... 33 more
>>>>>>>>
>>>>>>>> Code:
>>>>>>>>
>>>>>>>> def createPlanFirstIteration(env: ExecutionEnvironment) = {
>>>>>>>> val X = env readTextFile config.xFile map
>>>>>>>> {Vector.parseFromString(config.dimensions, _)}
>>>>>>>> val residual = env readTextFile config.yFile map
>>>>>>>> {Vector.parseFromString(_)}
>>>>>>>> val randoms = env readTextFile config.randomFile map
>>>>>>>> {Vector.parseFromString(_)}
>>>>>>>> val widthCandidates = env readTextFile
>>>>>>>> config.widthCandidatesFile map {Vector.parseFromString(config.dimensions,
>>>>>>>> _)}
>>>>>>>>
>>>>>>>> val center = calcCenter(env, X, residual, randoms, 0)
>>>>>>>>
>>>>>>>> val x = calcWidthHeight(env, X, residual, widthCandidates,
>>>>>>>> center)
>>>>>>>>
>>>>>>>> x map { _ toString } writeAsText config.outFile
>>>>>>>> }
>>>>>>>>
>>>>>>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>>>> residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int):
>>>>>>>> DataSet[Vector] = {
>>>>>>>> val residual_2 = residual * residual
>>>>>>>> val ys = (residual_2 sumV) * (randoms filter {_.id ==
>>>>>>>> iteration} neutralize)
>>>>>>>>
>>>>>>>> val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>>> val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>>>>>>>> val cumSum = emptyDataSet.iterateDelta(sumVector union
>>>>>>>> residual_2, config.N+1, Array("id")) {
>>>>>>>> (solutionset, workset) =>
>>>>>>>> val current = workset filter (new RichFilterFunction[Vector]{
>>>>>>>> def filter(x: Vector) = x.id ==
>>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>>> })
>>>>>>>> val old_sum = workset filter {_.id == -1}
>>>>>>>> val sum = VectorDataSet.add(old_sum, current.neutralize())
>>>>>>>>
>>>>>>>> val new_workset = workset filter {_.id != -1} union sum
>>>>>>>> (sum map (new RichMapFunction[Vector, Vector]{
>>>>>>>> def map(x: Vector) = new
>>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>>> }),
>>>>>>>> new_workset)
>>>>>>>> }
>>>>>>>> val index = cumSum.filter(new RichFilterFunction[Vector](){
>>>>>>>> var y: Vector = null
>>>>>>>> override def open(config: Configuration) = {
>>>>>>>> y = getRuntimeContext.getBroadcastVariable("ys").toList.head
>>>>>>>> }
>>>>>>>> def filter(x: Vector) = x.values(0) < y.values(0)
>>>>>>>> }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0
>>>>>>>>
>>>>>>>> val center = X.filter(new RichFilterFunction[Vector](){
>>>>>>>> var index: Int = -1
>>>>>>>> override def open(config: Configuration) = {
>>>>>>>> val x: Tuple1[Int] =
>>>>>>>> getRuntimeContext.getBroadcastVariable("index").toList.head
>>>>>>>> index = x._1
>>>>>>>> }
>>>>>>>> def filter(x: Vector) = x.id == index
>>>>>>>> }).withBroadcastSet(index, "index")
>>>>>>>>
>>>>>>>> center neutralize
>>>>>>>> }
>>>>>>>>
>>>>>>>> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector],
>>>>>>>> width: DataSet[Vector]): DataSet[Vector] = {
>>>>>>>> X.map(new RichMapFunction[Vector, Vector]{
>>>>>>>> var center: Vector = null
>>>>>>>> var width: Vector = null
>>>>>>>> override def open(config: Configuration) = {
>>>>>>>> center =
>>>>>>>> getRuntimeContext.getBroadcastVariable("center").toList.head
>>>>>>>> width =
>>>>>>>> getRuntimeContext.getBroadcastVariable("width").toList.head
>>>>>>>> }
>>>>>>>>
>>>>>>>> def map(x: Vector) = new Vector(x.id,
>>>>>>>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>>>>>>>> }).withBroadcastSet(center, "center").withBroadcastSet(width,
>>>>>>>> "width")
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>>>> residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
>>>>>>>> DataSet[Vector]): DataSet[Vector] = {
>>>>>>>> val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>>> val costs = emptyDataSet.iterateDelta(widthCandidates,
>>>>>>>> config.NWidthCandidates, Array("id")) {
>>>>>>>> (solutionset, workset) =>
>>>>>>>> val currentWidth = workset filter (new
>>>>>>>> RichFilterFunction[Vector]{
>>>>>>>> def filter(x: Vector) = x.id ==
>>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>>> })
>>>>>>>>
>>>>>>>> val kernelVector = getKernelVector(X, center, currentWidth)
>>>>>>>>
>>>>>>>> val x1 = kernelVector dot residual map {x => x*x}
>>>>>>>> val x2 = kernelVector dot kernelVector
>>>>>>>>
>>>>>>>> val cost = (x1 / x2) neutralize
>>>>>>>>
>>>>>>>>
>>>>>>>> (cost map (new RichMapFunction[Vector, Vector]{
>>>>>>>> def map(x: Vector) = new
>>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>>> }),
>>>>>>>> workset)
>>>>>>>> }
>>>>>>>>
>>>>>>>> // todo: will not work
>>>>>>>> //val width = costs max(0)
>>>>>>>>
>>>>>>>> //val kernelVector = getKernelVector(X, center, width)
>>>>>>>>
>>>>>>>> //val x1 = kernelVector dot residual
>>>>>>>> //val x2 = kernelVector dot kernelVector
>>>>>>>> //val height = x1 / x2
>>>>>>>> costs
>>>>>>>> }
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
Re: No Nested Iterations??? And where is the Nested Iteration?
Posted by Stephan Ewen <se...@apache.org>.
We are not planning to add closed-loop nested iterations in the near
future. That is a bit of an effort and so far, and I think no one can pick
that up very soon.
We will be supporting roll-out iterations (for loop style) much more
efficiently soon. There is no reason why you could not nest two for-loops.
However, those are only bulk-style, not delta-iteration style.
If you would like to contribute iteration nesting, I could help you to get
started.
Greetings,
Stephan
On Wed, Nov 12, 2014 at 4:47 PM, Maximilian Alber <
alber.maximilian@gmail.com> wrote:
> Oh sorry, I just read the bug title. So my questions is when you are
> planning to add nested iterations?
>
> Cheers,
> Max
>
> On Wed, Nov 12, 2014 at 4:45 PM, Maximilian Alber <
> alber.maximilian@gmail.com> wrote:
>
>> Ok, thanks.
>>
>> But the bug causes that it Flink "sees" a nested iteration where none is?
>> Or is it a bug that nested are not supported? If not when you plan to add
>> this feature?
>> Because I need nested iterations for my algorithm, so it would be nice to
>> know when I can expect them.
>>
>> Cheers,
>> Max
>>
>> On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> I found the cause of the bug and have opened a JIRA to track it.
>>>
>>> https://issues.apache.org/jira/browse/FLINK-1235
>>>
>>> You can watch that one to keep updated.
>>>
>>> Stephan
>>>
>>>
>>> On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> Hi!
>>>>
>>>> I am looking into it right now...
>>>>
>>>> Stephan
>>>>
>>>>
>>>> On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <
>>>> alber.maximilian@gmail.com> wrote:
>>>>
>>>>> Hi Stephan,
>>>>>
>>>>> you already had time to investigate this issue?
>>>>>
>>>>> Cheers,
>>>>> Max
>>>>>
>>>>> On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <se...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hey!
>>>>>>
>>>>>> Clearly, this looks like a bug. Let me investigate that and get back
>>>>>> at you later...
>>>>>>
>>>>>> Greetings,
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
>>>>>> alber.maximilian@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Flinksters!
>>>>>>>
>>>>>>> First some good news: the cumsum code from the last issue works now
>>>>>>> correctly and is tested.
>>>>>>>
>>>>>>> Bad news (at least for me): I just run into this (for the error and
>>>>>>> code see below). You have a road map when this feature will be available?
>>>>>>> Regardless of the rest, I would need it in the near future.
>>>>>>>
>>>>>>> So far so good. But I wonder where this nested iteration should be.
>>>>>>> At least I do not see them... I have an iteration and inside a lot of
>>>>>>> filters/maps/etc. but not another iteration.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Max
>>>>>>>
>>>>>>> Error:
>>>>>>>
>>>>>>> org.apache.flink.compiler.CompilerException: An error occurred while
>>>>>>> translating the optimized plan to a nephele JobGraph: An error occurred
>>>>>>> while translating the optimized plan to a nephele JobGraph: Nested
>>>>>>> Iterations are not possible at the moment!
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>>>>>>> at
>>>>>>> org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:290)
>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:285)
>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:230)
>>>>>>> at
>>>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>>>>> at
>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>>>>>> Caused by: org.apache.flink.compiler.CompilerException: An error
>>>>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>>>>> Iterations are not possible at the moment!
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>>>>>>> ... 14 more
>>>>>>> Caused by: org.apache.flink.compiler.CompilerException: Nested
>>>>>>> Iterations are not possible at the moment!
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>>>>>>> ... 33 more
>>>>>>>
>>>>>>> Code:
>>>>>>>
>>>>>>> def createPlanFirstIteration(env: ExecutionEnvironment) = {
>>>>>>> val X = env readTextFile config.xFile map
>>>>>>> {Vector.parseFromString(config.dimensions, _)}
>>>>>>> val residual = env readTextFile config.yFile map
>>>>>>> {Vector.parseFromString(_)}
>>>>>>> val randoms = env readTextFile config.randomFile map
>>>>>>> {Vector.parseFromString(_)}
>>>>>>> val widthCandidates = env readTextFile
>>>>>>> config.widthCandidatesFile map {Vector.parseFromString(config.dimensions,
>>>>>>> _)}
>>>>>>>
>>>>>>> val center = calcCenter(env, X, residual, randoms, 0)
>>>>>>>
>>>>>>> val x = calcWidthHeight(env, X, residual, widthCandidates,
>>>>>>> center)
>>>>>>>
>>>>>>> x map { _ toString } writeAsText config.outFile
>>>>>>> }
>>>>>>>
>>>>>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>>> residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int):
>>>>>>> DataSet[Vector] = {
>>>>>>> val residual_2 = residual * residual
>>>>>>> val ys = (residual_2 sumV) * (randoms filter {_.id == iteration}
>>>>>>> neutralize)
>>>>>>>
>>>>>>> val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>> val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>>>>>>> val cumSum = emptyDataSet.iterateDelta(sumVector union
>>>>>>> residual_2, config.N+1, Array("id")) {
>>>>>>> (solutionset, workset) =>
>>>>>>> val current = workset filter (new RichFilterFunction[Vector]{
>>>>>>> def filter(x: Vector) = x.id ==
>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>> })
>>>>>>> val old_sum = workset filter {_.id == -1}
>>>>>>> val sum = VectorDataSet.add(old_sum, current.neutralize())
>>>>>>>
>>>>>>> val new_workset = workset filter {_.id != -1} union sum
>>>>>>> (sum map (new RichMapFunction[Vector, Vector]{
>>>>>>> def map(x: Vector) = new
>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>> }),
>>>>>>> new_workset)
>>>>>>> }
>>>>>>> val index = cumSum.filter(new RichFilterFunction[Vector](){
>>>>>>> var y: Vector = null
>>>>>>> override def open(config: Configuration) = {
>>>>>>> y = getRuntimeContext.getBroadcastVariable("ys").toList.head
>>>>>>> }
>>>>>>> def filter(x: Vector) = x.values(0) < y.values(0)
>>>>>>> }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0
>>>>>>>
>>>>>>> val center = X.filter(new RichFilterFunction[Vector](){
>>>>>>> var index: Int = -1
>>>>>>> override def open(config: Configuration) = {
>>>>>>> val x: Tuple1[Int] =
>>>>>>> getRuntimeContext.getBroadcastVariable("index").toList.head
>>>>>>> index = x._1
>>>>>>> }
>>>>>>> def filter(x: Vector) = x.id == index
>>>>>>> }).withBroadcastSet(index, "index")
>>>>>>>
>>>>>>> center neutralize
>>>>>>> }
>>>>>>>
>>>>>>> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector],
>>>>>>> width: DataSet[Vector]): DataSet[Vector] = {
>>>>>>> X.map(new RichMapFunction[Vector, Vector]{
>>>>>>> var center: Vector = null
>>>>>>> var width: Vector = null
>>>>>>> override def open(config: Configuration) = {
>>>>>>> center =
>>>>>>> getRuntimeContext.getBroadcastVariable("center").toList.head
>>>>>>> width =
>>>>>>> getRuntimeContext.getBroadcastVariable("width").toList.head
>>>>>>> }
>>>>>>>
>>>>>>> def map(x: Vector) = new Vector(x.id,
>>>>>>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>>>>>>> }).withBroadcastSet(center, "center").withBroadcastSet(width,
>>>>>>> "width")
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>>> residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
>>>>>>> DataSet[Vector]): DataSet[Vector] = {
>>>>>>> val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>> val costs = emptyDataSet.iterateDelta(widthCandidates,
>>>>>>> config.NWidthCandidates, Array("id")) {
>>>>>>> (solutionset, workset) =>
>>>>>>> val currentWidth = workset filter (new
>>>>>>> RichFilterFunction[Vector]{
>>>>>>> def filter(x: Vector) = x.id ==
>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>> })
>>>>>>>
>>>>>>> val kernelVector = getKernelVector(X, center, currentWidth)
>>>>>>>
>>>>>>> val x1 = kernelVector dot residual map {x => x*x}
>>>>>>> val x2 = kernelVector dot kernelVector
>>>>>>>
>>>>>>> val cost = (x1 / x2) neutralize
>>>>>>>
>>>>>>>
>>>>>>> (cost map (new RichMapFunction[Vector, Vector]{
>>>>>>> def map(x: Vector) = new
>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>> }),
>>>>>>> workset)
>>>>>>> }
>>>>>>>
>>>>>>> // todo: will not work
>>>>>>> //val width = costs max(0)
>>>>>>>
>>>>>>> //val kernelVector = getKernelVector(X, center, width)
>>>>>>>
>>>>>>> //val x1 = kernelVector dot residual
>>>>>>> //val x2 = kernelVector dot kernelVector
>>>>>>> //val height = x1 / x2
>>>>>>> costs
>>>>>>> }
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
Re: No Nested Iterations??? And where is the Nested Iteration?
Posted by Maximilian Alber <al...@gmail.com>.
Oh sorry, I just read the bug title. So my questions is when you are
planning to add nested iterations?
Cheers,
Max
On Wed, Nov 12, 2014 at 4:45 PM, Maximilian Alber <
alber.maximilian@gmail.com> wrote:
> Ok, thanks.
>
> But the bug causes that it Flink "sees" a nested iteration where none is?
> Or is it a bug that nested are not supported? If not when you plan to add
> this feature?
> Because I need nested iterations for my algorithm, so it would be nice to
> know when I can expect them.
>
> Cheers,
> Max
>
> On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> I found the cause of the bug and have opened a JIRA to track it.
>>
>> https://issues.apache.org/jira/browse/FLINK-1235
>>
>> You can watch that one to keep updated.
>>
>> Stephan
>>
>>
>> On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Hi!
>>>
>>> I am looking into it right now...
>>>
>>> Stephan
>>>
>>>
>>> On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <
>>> alber.maximilian@gmail.com> wrote:
>>>
>>>> Hi Stephan,
>>>>
>>>> you already had time to investigate this issue?
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <se...@apache.org> wrote:
>>>>
>>>>> Hey!
>>>>>
>>>>> Clearly, this looks like a bug. Let me investigate that and get back
>>>>> at you later...
>>>>>
>>>>> Greetings,
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
>>>>> alber.maximilian@gmail.com> wrote:
>>>>>
>>>>>> Hi Flinksters!
>>>>>>
>>>>>> First some good news: the cumsum code from the last issue works now
>>>>>> correctly and is tested.
>>>>>>
>>>>>> Bad news (at least for me): I just run into this (for the error and
>>>>>> code see below). You have a road map when this feature will be available?
>>>>>> Regardless of the rest, I would need it in the near future.
>>>>>>
>>>>>> So far so good. But I wonder where this nested iteration should be.
>>>>>> At least I do not see them... I have an iteration and inside a lot of
>>>>>> filters/maps/etc. but not another iteration.
>>>>>>
>>>>>> Cheers,
>>>>>> Max
>>>>>>
>>>>>> Error:
>>>>>>
>>>>>> org.apache.flink.compiler.CompilerException: An error occurred while
>>>>>> translating the optimized plan to a nephele JobGraph: An error occurred
>>>>>> while translating the optimized plan to a nephele JobGraph: Nested
>>>>>> Iterations are not possible at the moment!
>>>>>> at
>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>> at
>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>>>>> at
>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>>>>>> at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>>>>>> at org.apache.flink.client.program.Client.run(Client.java:290)
>>>>>> at org.apache.flink.client.program.Client.run(Client.java:285)
>>>>>> at org.apache.flink.client.program.Client.run(Client.java:230)
>>>>>> at
>>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>>>> at
>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>>>>> Caused by: org.apache.flink.compiler.CompilerException: An error
>>>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>>>> Iterations are not possible at the moment!
>>>>>> at
>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>> at
>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>>>>>> at
>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>>>>>> ... 14 more
>>>>>> Caused by: org.apache.flink.compiler.CompilerException: Nested
>>>>>> Iterations are not possible at the moment!
>>>>>> at
>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>>>>>> ... 33 more
>>>>>>
>>>>>> Code:
>>>>>>
>>>>>> def createPlanFirstIteration(env: ExecutionEnvironment) = {
>>>>>> val X = env readTextFile config.xFile map
>>>>>> {Vector.parseFromString(config.dimensions, _)}
>>>>>> val residual = env readTextFile config.yFile map
>>>>>> {Vector.parseFromString(_)}
>>>>>> val randoms = env readTextFile config.randomFile map
>>>>>> {Vector.parseFromString(_)}
>>>>>> val widthCandidates = env readTextFile config.widthCandidatesFile
>>>>>> map {Vector.parseFromString(config.dimensions, _)}
>>>>>>
>>>>>> val center = calcCenter(env, X, residual, randoms, 0)
>>>>>>
>>>>>> val x = calcWidthHeight(env, X, residual, widthCandidates, center)
>>>>>>
>>>>>> x map { _ toString } writeAsText config.outFile
>>>>>> }
>>>>>>
>>>>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>> residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int):
>>>>>> DataSet[Vector] = {
>>>>>> val residual_2 = residual * residual
>>>>>> val ys = (residual_2 sumV) * (randoms filter {_.id == iteration}
>>>>>> neutralize)
>>>>>>
>>>>>> val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>> val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>>>>>> val cumSum = emptyDataSet.iterateDelta(sumVector union
>>>>>> residual_2, config.N+1, Array("id")) {
>>>>>> (solutionset, workset) =>
>>>>>> val current = workset filter (new RichFilterFunction[Vector]{
>>>>>> def filter(x: Vector) = x.id ==
>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>> })
>>>>>> val old_sum = workset filter {_.id == -1}
>>>>>> val sum = VectorDataSet.add(old_sum, current.neutralize())
>>>>>>
>>>>>> val new_workset = workset filter {_.id != -1} union sum
>>>>>> (sum map (new RichMapFunction[Vector, Vector]{
>>>>>> def map(x: Vector) = new
>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>> }),
>>>>>> new_workset)
>>>>>> }
>>>>>> val index = cumSum.filter(new RichFilterFunction[Vector](){
>>>>>> var y: Vector = null
>>>>>> override def open(config: Configuration) = {
>>>>>> y = getRuntimeContext.getBroadcastVariable("ys").toList.head
>>>>>> }
>>>>>> def filter(x: Vector) = x.values(0) < y.values(0)
>>>>>> }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0
>>>>>>
>>>>>> val center = X.filter(new RichFilterFunction[Vector](){
>>>>>> var index: Int = -1
>>>>>> override def open(config: Configuration) = {
>>>>>> val x: Tuple1[Int] =
>>>>>> getRuntimeContext.getBroadcastVariable("index").toList.head
>>>>>> index = x._1
>>>>>> }
>>>>>> def filter(x: Vector) = x.id == index
>>>>>> }).withBroadcastSet(index, "index")
>>>>>>
>>>>>> center neutralize
>>>>>> }
>>>>>>
>>>>>> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector],
>>>>>> width: DataSet[Vector]): DataSet[Vector] = {
>>>>>> X.map(new RichMapFunction[Vector, Vector]{
>>>>>> var center: Vector = null
>>>>>> var width: Vector = null
>>>>>> override def open(config: Configuration) = {
>>>>>> center =
>>>>>> getRuntimeContext.getBroadcastVariable("center").toList.head
>>>>>> width =
>>>>>> getRuntimeContext.getBroadcastVariable("width").toList.head
>>>>>> }
>>>>>>
>>>>>> def map(x: Vector) = new Vector(x.id,
>>>>>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>>>>>> }).withBroadcastSet(center, "center").withBroadcastSet(width,
>>>>>> "width")
>>>>>> }
>>>>>>
>>>>>>
>>>>>> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>> residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
>>>>>> DataSet[Vector]): DataSet[Vector] = {
>>>>>> val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>> val costs = emptyDataSet.iterateDelta(widthCandidates,
>>>>>> config.NWidthCandidates, Array("id")) {
>>>>>> (solutionset, workset) =>
>>>>>> val currentWidth = workset filter (new
>>>>>> RichFilterFunction[Vector]{
>>>>>> def filter(x: Vector) = x.id ==
>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>> })
>>>>>>
>>>>>> val kernelVector = getKernelVector(X, center, currentWidth)
>>>>>>
>>>>>> val x1 = kernelVector dot residual map {x => x*x}
>>>>>> val x2 = kernelVector dot kernelVector
>>>>>>
>>>>>> val cost = (x1 / x2) neutralize
>>>>>>
>>>>>>
>>>>>> (cost map (new RichMapFunction[Vector, Vector]{
>>>>>> def map(x: Vector) = new
>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>> }),
>>>>>> workset)
>>>>>> }
>>>>>>
>>>>>> // todo: will not work
>>>>>> //val width = costs max(0)
>>>>>>
>>>>>> //val kernelVector = getKernelVector(X, center, width)
>>>>>>
>>>>>> //val x1 = kernelVector dot residual
>>>>>> //val x2 = kernelVector dot kernelVector
>>>>>> //val height = x1 / x2
>>>>>> costs
>>>>>> }
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>
Re: No Nested Iterations??? And where is the Nested Iteration?
Posted by Maximilian Alber <al...@gmail.com>.
Ok, thanks.
But the bug causes that it Flink "sees" a nested iteration where none is?
Or is it a bug that nested are not supported? If not when you plan to add
this feature?
Because I need nested iterations for my algorithm, so it would be nice to
know when I can expect them.
Cheers,
Max
On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <se...@apache.org> wrote:
> I found the cause of the bug and have opened a JIRA to track it.
>
> https://issues.apache.org/jira/browse/FLINK-1235
>
> You can watch that one to keep updated.
>
> Stephan
>
>
> On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi!
>>
>> I am looking into it right now...
>>
>> Stephan
>>
>>
>> On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <
>> alber.maximilian@gmail.com> wrote:
>>
>>> Hi Stephan,
>>>
>>> you already had time to investigate this issue?
>>>
>>> Cheers,
>>> Max
>>>
>>> On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> Hey!
>>>>
>>>> Clearly, this looks like a bug. Let me investigate that and get back at
>>>> you later...
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
>>>> alber.maximilian@gmail.com> wrote:
>>>>
>>>>> Hi Flinksters!
>>>>>
>>>>> First some good news: the cumsum code from the last issue works now
>>>>> correctly and is tested.
>>>>>
>>>>> Bad news (at least for me): I just run into this (for the error and
>>>>> code see below). You have a road map when this feature will be available?
>>>>> Regardless of the rest, I would need it in the near future.
>>>>>
>>>>> So far so good. But I wonder where this nested iteration should be. At
>>>>> least I do not see them... I have an iteration and inside a lot of
>>>>> filters/maps/etc. but not another iteration.
>>>>>
>>>>> Cheers,
>>>>> Max
>>>>>
>>>>> Error:
>>>>>
>>>>> org.apache.flink.compiler.CompilerException: An error occurred while
>>>>> translating the optimized plan to a nephele JobGraph: An error occurred
>>>>> while translating the optimized plan to a nephele JobGraph: Nested
>>>>> Iterations are not possible at the moment!
>>>>> at
>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>> at
>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>> at
>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>>>> at
>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>>>>> at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>>>>> at org.apache.flink.client.program.Client.run(Client.java:290)
>>>>> at org.apache.flink.client.program.Client.run(Client.java:285)
>>>>> at org.apache.flink.client.program.Client.run(Client.java:230)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>>>> Caused by: org.apache.flink.compiler.CompilerException: An error
>>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>>> Iterations are not possible at the moment!
>>>>> at
>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>> at
>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>> at
>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>> at
>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>>>>> at
>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>>>>> ... 14 more
>>>>> Caused by: org.apache.flink.compiler.CompilerException: Nested
>>>>> Iterations are not possible at the moment!
>>>>> at
>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>>>>> ... 33 more
>>>>>
>>>>> Code:
>>>>>
>>>>> def createPlanFirstIteration(env: ExecutionEnvironment) = {
>>>>> val X = env readTextFile config.xFile map
>>>>> {Vector.parseFromString(config.dimensions, _)}
>>>>> val residual = env readTextFile config.yFile map
>>>>> {Vector.parseFromString(_)}
>>>>> val randoms = env readTextFile config.randomFile map
>>>>> {Vector.parseFromString(_)}
>>>>> val widthCandidates = env readTextFile config.widthCandidatesFile
>>>>> map {Vector.parseFromString(config.dimensions, _)}
>>>>>
>>>>> val center = calcCenter(env, X, residual, randoms, 0)
>>>>>
>>>>> val x = calcWidthHeight(env, X, residual, widthCandidates, center)
>>>>>
>>>>> x map { _ toString } writeAsText config.outFile
>>>>> }
>>>>>
>>>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>> residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int):
>>>>> DataSet[Vector] = {
>>>>> val residual_2 = residual * residual
>>>>> val ys = (residual_2 sumV) * (randoms filter {_.id == iteration}
>>>>> neutralize)
>>>>>
>>>>> val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>> val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>>>>> val cumSum = emptyDataSet.iterateDelta(sumVector union residual_2,
>>>>> config.N+1, Array("id")) {
>>>>> (solutionset, workset) =>
>>>>> val current = workset filter (new RichFilterFunction[Vector]{
>>>>> def filter(x: Vector) = x.id ==
>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>> })
>>>>> val old_sum = workset filter {_.id == -1}
>>>>> val sum = VectorDataSet.add(old_sum, current.neutralize())
>>>>>
>>>>> val new_workset = workset filter {_.id != -1} union sum
>>>>> (sum map (new RichMapFunction[Vector, Vector]{
>>>>> def map(x: Vector) = new
>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>> }),
>>>>> new_workset)
>>>>> }
>>>>> val index = cumSum.filter(new RichFilterFunction[Vector](){
>>>>> var y: Vector = null
>>>>> override def open(config: Configuration) = {
>>>>> y = getRuntimeContext.getBroadcastVariable("ys").toList.head
>>>>> }
>>>>> def filter(x: Vector) = x.values(0) < y.values(0)
>>>>> }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0
>>>>>
>>>>> val center = X.filter(new RichFilterFunction[Vector](){
>>>>> var index: Int = -1
>>>>> override def open(config: Configuration) = {
>>>>> val x: Tuple1[Int] =
>>>>> getRuntimeContext.getBroadcastVariable("index").toList.head
>>>>> index = x._1
>>>>> }
>>>>> def filter(x: Vector) = x.id == index
>>>>> }).withBroadcastSet(index, "index")
>>>>>
>>>>> center neutralize
>>>>> }
>>>>>
>>>>> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector],
>>>>> width: DataSet[Vector]): DataSet[Vector] = {
>>>>> X.map(new RichMapFunction[Vector, Vector]{
>>>>> var center: Vector = null
>>>>> var width: Vector = null
>>>>> override def open(config: Configuration) = {
>>>>> center =
>>>>> getRuntimeContext.getBroadcastVariable("center").toList.head
>>>>> width =
>>>>> getRuntimeContext.getBroadcastVariable("width").toList.head
>>>>> }
>>>>>
>>>>> def map(x: Vector) = new Vector(x.id,
>>>>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>>>>> }).withBroadcastSet(center, "center").withBroadcastSet(width,
>>>>> "width")
>>>>> }
>>>>>
>>>>>
>>>>> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>> residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
>>>>> DataSet[Vector]): DataSet[Vector] = {
>>>>> val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>> val costs = emptyDataSet.iterateDelta(widthCandidates,
>>>>> config.NWidthCandidates, Array("id")) {
>>>>> (solutionset, workset) =>
>>>>> val currentWidth = workset filter (new
>>>>> RichFilterFunction[Vector]{
>>>>> def filter(x: Vector) = x.id ==
>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>> })
>>>>>
>>>>> val kernelVector = getKernelVector(X, center, currentWidth)
>>>>>
>>>>> val x1 = kernelVector dot residual map {x => x*x}
>>>>> val x2 = kernelVector dot kernelVector
>>>>>
>>>>> val cost = (x1 / x2) neutralize
>>>>>
>>>>>
>>>>> (cost map (new RichMapFunction[Vector, Vector]{
>>>>> def map(x: Vector) = new
>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>> }),
>>>>> workset)
>>>>> }
>>>>>
>>>>> // todo: will not work
>>>>> //val width = costs max(0)
>>>>>
>>>>> //val kernelVector = getKernelVector(X, center, width)
>>>>>
>>>>> //val x1 = kernelVector dot residual
>>>>> //val x2 = kernelVector dot kernelVector
>>>>> //val height = x1 / x2
>>>>> costs
>>>>> }
>>>>>
>>>>
>>>>
>>>
>>
>
Re: No Nested Iterations??? And where is the Nested Iteration?
Posted by Stephan Ewen <se...@apache.org>.
I found the cause of the bug and have opened a JIRA to track it.
https://issues.apache.org/jira/browse/FLINK-1235
You can watch that one to keep updated.
Stephan
On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <se...@apache.org> wrote:
> Hi!
>
> I am looking into it right now...
>
> Stephan
>
>
> On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <
> alber.maximilian@gmail.com> wrote:
>
>> Hi Stephan,
>>
>> you already had time to investigate this issue?
>>
>> Cheers,
>> Max
>>
>> On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Hey!
>>>
>>> Clearly, this looks like a bug. Let me investigate that and get back at
>>> you later...
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
>>> alber.maximilian@gmail.com> wrote:
>>>
>>>> Hi Flinksters!
>>>>
>>>> First some good news: the cumsum code from the last issue works now
>>>> correctly and is tested.
>>>>
>>>> Bad news (at least for me): I just run into this (for the error and
>>>> code see below). You have a road map when this feature will be available?
>>>> Regardless of the rest, I would need it in the near future.
>>>>
>>>> So far so good. But I wonder where this nested iteration should be. At
>>>> least I do not see them... I have an iteration and inside a lot of
>>>> filters/maps/etc. but not another iteration.
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> Error:
>>>>
>>>> org.apache.flink.compiler.CompilerException: An error occurred while
>>>> translating the optimized plan to a nephele JobGraph: An error occurred
>>>> while translating the optimized plan to a nephele JobGraph: Nested
>>>> Iterations are not possible at the moment!
>>>> at
>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>> at
>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>> at
>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>>> at
>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>>>> at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>>>> at org.apache.flink.client.program.Client.run(Client.java:290)
>>>> at org.apache.flink.client.program.Client.run(Client.java:285)
>>>> at org.apache.flink.client.program.Client.run(Client.java:230)
>>>> at
>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>> at
>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>>> Caused by: org.apache.flink.compiler.CompilerException: An error
>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>> Iterations are not possible at the moment!
>>>> at
>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>> at
>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>> at
>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>> at
>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>>>> at
>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>>>> ... 14 more
>>>> Caused by: org.apache.flink.compiler.CompilerException: Nested
>>>> Iterations are not possible at the moment!
>>>> at
>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>>>> ... 33 more
>>>>
>>>> Code:
>>>>
>>>> def createPlanFirstIteration(env: ExecutionEnvironment) = {
>>>> val X = env readTextFile config.xFile map
>>>> {Vector.parseFromString(config.dimensions, _)}
>>>> val residual = env readTextFile config.yFile map
>>>> {Vector.parseFromString(_)}
>>>> val randoms = env readTextFile config.randomFile map
>>>> {Vector.parseFromString(_)}
>>>> val widthCandidates = env readTextFile config.widthCandidatesFile
>>>> map {Vector.parseFromString(config.dimensions, _)}
>>>>
>>>> val center = calcCenter(env, X, residual, randoms, 0)
>>>>
>>>> val x = calcWidthHeight(env, X, residual, widthCandidates, center)
>>>>
>>>> x map { _ toString } writeAsText config.outFile
>>>> }
>>>>
>>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual:
>>>> DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector]
>>>> = {
>>>> val residual_2 = residual * residual
>>>> val ys = (residual_2 sumV) * (randoms filter {_.id == iteration}
>>>> neutralize)
>>>>
>>>> val emptyDataSet = env.fromCollection[Vector](Seq())
>>>> val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>>>> val cumSum = emptyDataSet.iterateDelta(sumVector union residual_2,
>>>> config.N+1, Array("id")) {
>>>> (solutionset, workset) =>
>>>> val current = workset filter (new RichFilterFunction[Vector]{
>>>> def filter(x: Vector) = x.id ==
>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>> })
>>>> val old_sum = workset filter {_.id == -1}
>>>> val sum = VectorDataSet.add(old_sum, current.neutralize())
>>>>
>>>> val new_workset = workset filter {_.id != -1} union sum
>>>> (sum map (new RichMapFunction[Vector, Vector]{
>>>> def map(x: Vector) = new
>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>> }),
>>>> new_workset)
>>>> }
>>>> val index = cumSum.filter(new RichFilterFunction[Vector](){
>>>> var y: Vector = null
>>>> override def open(config: Configuration) = {
>>>> y = getRuntimeContext.getBroadcastVariable("ys").toList.head
>>>> }
>>>> def filter(x: Vector) = x.values(0) < y.values(0)
>>>> }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0
>>>>
>>>> val center = X.filter(new RichFilterFunction[Vector](){
>>>> var index: Int = -1
>>>> override def open(config: Configuration) = {
>>>> val x: Tuple1[Int] =
>>>> getRuntimeContext.getBroadcastVariable("index").toList.head
>>>> index = x._1
>>>> }
>>>> def filter(x: Vector) = x.id == index
>>>> }).withBroadcastSet(index, "index")
>>>>
>>>> center neutralize
>>>> }
>>>>
>>>> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector], width:
>>>> DataSet[Vector]): DataSet[Vector] = {
>>>> X.map(new RichMapFunction[Vector, Vector]{
>>>> var center: Vector = null
>>>> var width: Vector = null
>>>> override def open(config: Configuration) = {
>>>> center =
>>>> getRuntimeContext.getBroadcastVariable("center").toList.head
>>>> width =
>>>> getRuntimeContext.getBroadcastVariable("width").toList.head
>>>> }
>>>>
>>>> def map(x: Vector) = new Vector(x.id,
>>>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>>>> }).withBroadcastSet(center, "center").withBroadcastSet(width,
>>>> "width")
>>>> }
>>>>
>>>>
>>>> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
>>>> residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
>>>> DataSet[Vector]): DataSet[Vector] = {
>>>> val emptyDataSet = env.fromCollection[Vector](Seq())
>>>> val costs = emptyDataSet.iterateDelta(widthCandidates,
>>>> config.NWidthCandidates, Array("id")) {
>>>> (solutionset, workset) =>
>>>> val currentWidth = workset filter (new
>>>> RichFilterFunction[Vector]{
>>>> def filter(x: Vector) = x.id ==
>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>> })
>>>>
>>>> val kernelVector = getKernelVector(X, center, currentWidth)
>>>>
>>>> val x1 = kernelVector dot residual map {x => x*x}
>>>> val x2 = kernelVector dot kernelVector
>>>>
>>>> val cost = (x1 / x2) neutralize
>>>>
>>>>
>>>> (cost map (new RichMapFunction[Vector, Vector]{
>>>> def map(x: Vector) = new
>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>> }),
>>>> workset)
>>>> }
>>>>
>>>> // todo: will not work
>>>> //val width = costs max(0)
>>>>
>>>> //val kernelVector = getKernelVector(X, center, width)
>>>>
>>>> //val x1 = kernelVector dot residual
>>>> //val x2 = kernelVector dot kernelVector
>>>> //val height = x1 / x2
>>>> costs
>>>> }
>>>>
>>>
>>>
>>
>
Re: No Nested Iterations??? And where is the Nested Iteration?
Posted by Stephan Ewen <se...@apache.org>.
Hi!
I am looking into it right now...
Stephan
On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <
alber.maximilian@gmail.com> wrote:
> Hi Stephan,
>
> you already had time to investigate this issue?
>
> Cheers,
> Max
>
> On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hey!
>>
>> Clearly, this looks like a bug. Let me investigate that and get back at
>> you later...
>>
>> Greetings,
>> Stephan
>>
>>
>> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
>> alber.maximilian@gmail.com> wrote:
>>
>>> Hi Flinksters!
>>>
>>> First some good news: the cumsum code from the last issue works now
>>> correctly and is tested.
>>>
>>> Bad news (at least for me): I just run into this (for the error and code
>>> see below). You have a road map when this feature will be available?
>>> Regardless of the rest, I would need it in the near future.
>>>
>>> So far so good. But I wonder where this nested iteration should be. At
>>> least I do not see them... I have an iteration and inside a lot of
>>> filters/maps/etc. but not another iteration.
>>>
>>> Cheers,
>>> Max
>>>
>>> Error:
>>>
>>> org.apache.flink.compiler.CompilerException: An error occurred while
>>> translating the optimized plan to a nephele JobGraph: An error occurred
>>> while translating the optimized plan to a nephele JobGraph: Nested
>>> Iterations are not possible at the moment!
>>> at
>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>> at
>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>> at
>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>> at
>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>>> at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>>> at org.apache.flink.client.program.Client.run(Client.java:290)
>>> at org.apache.flink.client.program.Client.run(Client.java:285)
>>> at org.apache.flink.client.program.Client.run(Client.java:230)
>>> at
>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>> at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>> Caused by: org.apache.flink.compiler.CompilerException: An error
>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>> Iterations are not possible at the moment!
>>> at
>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>> at
>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>> at
>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>> at
>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>>> at
>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>>> ... 14 more
>>> Caused by: org.apache.flink.compiler.CompilerException: Nested
>>> Iterations are not possible at the moment!
>>> at
>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>>> ... 33 more
>>>
>>> Code:
>>>
>>> def createPlanFirstIteration(env: ExecutionEnvironment) = {
>>> val X = env readTextFile config.xFile map
>>> {Vector.parseFromString(config.dimensions, _)}
>>> val residual = env readTextFile config.yFile map
>>> {Vector.parseFromString(_)}
>>> val randoms = env readTextFile config.randomFile map
>>> {Vector.parseFromString(_)}
>>> val widthCandidates = env readTextFile config.widthCandidatesFile
>>> map {Vector.parseFromString(config.dimensions, _)}
>>>
>>> val center = calcCenter(env, X, residual, randoms, 0)
>>>
>>> val x = calcWidthHeight(env, X, residual, widthCandidates, center)
>>>
>>> x map { _ toString } writeAsText config.outFile
>>> }
>>>
>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual:
>>> DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector]
>>> = {
>>> val residual_2 = residual * residual
>>> val ys = (residual_2 sumV) * (randoms filter {_.id == iteration}
>>> neutralize)
>>>
>>> val emptyDataSet = env.fromCollection[Vector](Seq())
>>> val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>>> val cumSum = emptyDataSet.iterateDelta(sumVector union residual_2,
>>> config.N+1, Array("id")) {
>>> (solutionset, workset) =>
>>> val current = workset filter (new RichFilterFunction[Vector]{
>>> def filter(x: Vector) = x.id ==
>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>> })
>>> val old_sum = workset filter {_.id == -1}
>>> val sum = VectorDataSet.add(old_sum, current.neutralize())
>>>
>>> val new_workset = workset filter {_.id != -1} union sum
>>> (sum map (new RichMapFunction[Vector, Vector]{
>>> def map(x: Vector) = new
>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>> }),
>>> new_workset)
>>> }
>>> val index = cumSum.filter(new RichFilterFunction[Vector](){
>>> var y: Vector = null
>>> override def open(config: Configuration) = {
>>> y = getRuntimeContext.getBroadcastVariable("ys").toList.head
>>> }
>>> def filter(x: Vector) = x.values(0) < y.values(0)
>>> }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0
>>>
>>> val center = X.filter(new RichFilterFunction[Vector](){
>>> var index: Int = -1
>>> override def open(config: Configuration) = {
>>> val x: Tuple1[Int] =
>>> getRuntimeContext.getBroadcastVariable("index").toList.head
>>> index = x._1
>>> }
>>> def filter(x: Vector) = x.id == index
>>> }).withBroadcastSet(index, "index")
>>>
>>> center neutralize
>>> }
>>>
>>> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector], width:
>>> DataSet[Vector]): DataSet[Vector] = {
>>> X.map(new RichMapFunction[Vector, Vector]{
>>> var center: Vector = null
>>> var width: Vector = null
>>> override def open(config: Configuration) = {
>>> center =
>>> getRuntimeContext.getBroadcastVariable("center").toList.head
>>> width =
>>> getRuntimeContext.getBroadcastVariable("width").toList.head
>>> }
>>>
>>> def map(x: Vector) = new Vector(x.id,
>>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>>> }).withBroadcastSet(center, "center").withBroadcastSet(width,
>>> "width")
>>> }
>>>
>>>
>>> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
>>> residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
>>> DataSet[Vector]): DataSet[Vector] = {
>>> val emptyDataSet = env.fromCollection[Vector](Seq())
>>> val costs = emptyDataSet.iterateDelta(widthCandidates,
>>> config.NWidthCandidates, Array("id")) {
>>> (solutionset, workset) =>
>>> val currentWidth = workset filter (new RichFilterFunction[Vector]{
>>> def filter(x: Vector) = x.id ==
>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>> })
>>>
>>> val kernelVector = getKernelVector(X, center, currentWidth)
>>>
>>> val x1 = kernelVector dot residual map {x => x*x}
>>> val x2 = kernelVector dot kernelVector
>>>
>>> val cost = (x1 / x2) neutralize
>>>
>>>
>>> (cost map (new RichMapFunction[Vector, Vector]{
>>> def map(x: Vector) = new
>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>> }),
>>> workset)
>>> }
>>>
>>> // todo: will not work
>>> //val width = costs max(0)
>>>
>>> //val kernelVector = getKernelVector(X, center, width)
>>>
>>> //val x1 = kernelVector dot residual
>>> //val x2 = kernelVector dot kernelVector
>>> //val height = x1 / x2
>>> costs
>>> }
>>>
>>
>>
>