You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Maximilian Alber <al...@gmail.com> on 2014/11/11 14:09:26 UTC

Re: No Nested Iterations??? And where is the Nested Iteration?

Hi Stephan,

you already had time to investigate this issue?

Cheers,
Max

On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <se...@apache.org> wrote:

> Hey!
>
> Clearly, this looks like a bug. Let me investigate that and get back at
> you later...
>
> Greetings,
> Stephan
>
>
> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
> alber.maximilian@gmail.com> wrote:
>
>> Hi Flinksters!
>>
>> First some good news: the cumsum code from the last issue works now
>> correctly and is tested.
>>
>> Bad news (at least for me): I just run into this (for the error and code
>> see below). You have a road map when this feature will be available?
>> Regardless of the rest, I would need it in the near future.
>>
>> So far so good. But I wonder where this nested iteration should be. At
>> least I do not see them... I have an iteration and inside a lot of
>> filters/maps/etc. but not another iteration.
>>
>> Cheers,
>> Max
>>
>> Error:
>>
>> org.apache.flink.compiler.CompilerException: An error occurred while
>> translating the optimized plan to a nephele JobGraph: An error occurred
>> while translating the optimized plan to a nephele JobGraph: Nested
>> Iterations are not possible at the moment!
>> at
>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>> at
>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>> at
>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>> at
>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>> at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>> at org.apache.flink.client.program.Client.run(Client.java:290)
>> at org.apache.flink.client.program.Client.run(Client.java:285)
>> at org.apache.flink.client.program.Client.run(Client.java:230)
>> at
>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>> at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>> Caused by: org.apache.flink.compiler.CompilerException: An error occurred
>> while translating the optimized plan to a nephele JobGraph: Nested
>> Iterations are not possible at the moment!
>> at
>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>> at
>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>> at
>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>> at
>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>> at
>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>> ... 14 more
>> Caused by: org.apache.flink.compiler.CompilerException: Nested Iterations
>> are not possible at the moment!
>> at
>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>> ... 33 more
>>
>> Code:
>>
>> def createPlanFirstIteration(env: ExecutionEnvironment) = {
>>     val X = env readTextFile config.xFile map
>> {Vector.parseFromString(config.dimensions, _)}
>>     val residual = env readTextFile config.yFile map
>> {Vector.parseFromString(_)}
>>     val randoms = env readTextFile config.randomFile map
>> {Vector.parseFromString(_)}
>>     val widthCandidates = env readTextFile config.widthCandidatesFile map
>> {Vector.parseFromString(config.dimensions, _)}
>>
>>     val center = calcCenter(env, X, residual, randoms, 0)
>>
>>     val x = calcWidthHeight(env, X, residual, widthCandidates, center)
>>
>>     x map { _ toString } writeAsText config.outFile
>> }
>>
>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual:
>> DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector]
>> = {
>>     val residual_2 = residual * residual
>>     val ys = (residual_2 sumV) * (randoms filter {_.id == iteration}
>> neutralize)
>>
>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>     val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>>     val cumSum = emptyDataSet.iterateDelta(sumVector union residual_2,
>> config.N+1, Array("id")) {
>>       (solutionset, workset) =>
>>       val current = workset filter (new RichFilterFunction[Vector]{
>>         def filter(x: Vector) = x.id ==
>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>        })
>>       val old_sum = workset filter {_.id == -1}
>>       val sum = VectorDataSet.add(old_sum, current.neutralize())
>>
>>       val new_workset = workset filter {_.id != -1} union sum
>>        (sum map (new RichMapFunction[Vector, Vector]{
>>           def map(x: Vector) = new
>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>        }),
>>       new_workset)
>>      }
>>    val index = cumSum.filter(new RichFilterFunction[Vector](){
>>       var y: Vector = null
>>      override def open(config: Configuration) = {
>>         y = getRuntimeContext.getBroadcastVariable("ys").toList.head
>>      }
>>      def filter(x: Vector) = x.values(0) < y.values(0)
>>    }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0
>>
>>     val center = X.filter(new RichFilterFunction[Vector](){
>>     var index: Int = -1
>>     override def open(config: Configuration) = {
>>       val x: Tuple1[Int] =
>> getRuntimeContext.getBroadcastVariable("index").toList.head
>>       index = x._1
>>        }
>>       def filter(x: Vector) = x.id == index
>>     }).withBroadcastSet(index, "index")
>>
>>     center neutralize
>> }
>>
>> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector], width:
>> DataSet[Vector]): DataSet[Vector] = {
>>     X.map(new RichMapFunction[Vector, Vector]{
>>       var center: Vector = null
>>       var width: Vector = null
>>       override def open(config: Configuration) = {
>>        center =
>> getRuntimeContext.getBroadcastVariable("center").toList.head
>>        width = getRuntimeContext.getBroadcastVariable("width").toList.head
>>      }
>>
>>     def map(x: Vector) = new Vector(x.id,
>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>>     }).withBroadcastSet(center, "center").withBroadcastSet(width, "width")
>> }
>>
>>
>> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
>> residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
>> DataSet[Vector]): DataSet[Vector] = {
>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>     val costs = emptyDataSet.iterateDelta(widthCandidates,
>> config.NWidthCandidates, Array("id")) {
>>        (solutionset, workset) =>
>>        val currentWidth = workset filter (new RichFilterFunction[Vector]{
>>          def filter(x: Vector) = x.id ==
>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>        })
>>
>>       val kernelVector = getKernelVector(X, center, currentWidth)
>>
>>       val x1 = kernelVector dot residual map {x => x*x}
>>       val x2 = kernelVector dot kernelVector
>>
>>      val cost = (x1 / x2) neutralize
>>
>>
>>      (cost map (new RichMapFunction[Vector, Vector]{
>>        def map(x: Vector) = new
>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>      }),
>>      workset)
>>     }
>>
>> // todo: will not work
>> //val width = costs max(0)
>>
>> //val kernelVector = getKernelVector(X, center, width)
>>
>> //val x1 = kernelVector dot residual
>> //val x2 = kernelVector dot kernelVector
>> //val height = x1 / x2
>>     costs
>> }
>>
>
>

Re: No Nested Iterations??? And where is the Nested Iteration?

Posted by Maximilian Alber <al...@gmail.com>.
The error is gone.
Thanks!

Cheers,
Max

On Wed, Nov 12, 2014 at 11:41 PM, Stephan Ewen <se...@apache.org> wrote:

> The current master contains a fix for the incorrectly identified nested
> iteration bug.
>
> Please let us know if it fixes your problem!
>
> Greetings,
> Stephan
>
>
> On Wed, Nov 12, 2014 at 5:19 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Talking at the meetup sounds good!
>>
>> On Wed, Nov 12, 2014 at 5:15 PM, Maximilian Alber <
>> alber.maximilian@gmail.com> wrote:
>>
>>> Ok. With for loop style you intend a loop with a fixed range?
>>> In my case I would have a delta-iteration inside a bulk-iteration. I
>>> guess wouldn't be "roll-out-able"?
>>>
>>> Btw is there any intention to allow bulk-style iterations on several
>>> datasets "concurrently"?
>>>
>>> Maybe we could discuss my problem next week at the meetup?
>>>
>>> Thank you for the offer, but I'm in the middle of thesis, thus I don't
>>> have time for it.
>>>
>>> Cheers,
>>> Max
>>>
>>> On Wed, Nov 12, 2014 at 4:59 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> We are not planning to add closed-loop nested iterations in the near
>>>> future. That is a bit of an effort and so far, and I think no one can pick
>>>> that up very soon.
>>>>
>>>> We will be supporting roll-out iterations (for loop style) much more
>>>> efficiently soon. There is no reason why you could not nest two for-loops.
>>>> However, those are only bulk-style, not delta-iteration style.
>>>>
>>>> If you would like to contribute iteration nesting, I could help you to
>>>> get started.
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Nov 12, 2014 at 4:47 PM, Maximilian Alber <
>>>> alber.maximilian@gmail.com> wrote:
>>>>
>>>>> Oh sorry, I just read the bug title. So my questions is when you are
>>>>> planning to add nested iterations?
>>>>>
>>>>> Cheers,
>>>>> Max
>>>>>
>>>>> On Wed, Nov 12, 2014 at 4:45 PM, Maximilian Alber <
>>>>> alber.maximilian@gmail.com> wrote:
>>>>>
>>>>>> Ok, thanks.
>>>>>>
>>>>>> But the bug causes that it Flink "sees" a nested iteration where none
>>>>>> is?
>>>>>> Or is it a bug that nested are not supported? If not when you plan to
>>>>>> add this feature?
>>>>>> Because I need nested iterations for my algorithm, so it would be
>>>>>> nice to know when I can expect them.
>>>>>>
>>>>>> Cheers,
>>>>>> Max
>>>>>>
>>>>>> On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <se...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> I found the cause of the bug and have opened a JIRA to track it.
>>>>>>>
>>>>>>> https://issues.apache.org/jira/browse/FLINK-1235
>>>>>>>
>>>>>>> You can watch that one to keep updated.
>>>>>>>
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <se...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi!
>>>>>>>>
>>>>>>>> I am looking into it right now...
>>>>>>>>
>>>>>>>> Stephan
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <
>>>>>>>> alber.maximilian@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Stephan,
>>>>>>>>>
>>>>>>>>> you already had time to investigate this issue?
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Max
>>>>>>>>>
>>>>>>>>> On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <se...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hey!
>>>>>>>>>>
>>>>>>>>>> Clearly, this looks like a bug. Let me investigate that and get
>>>>>>>>>> back at you later...
>>>>>>>>>>
>>>>>>>>>> Greetings,
>>>>>>>>>> Stephan
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
>>>>>>>>>> alber.maximilian@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Flinksters!
>>>>>>>>>>>
>>>>>>>>>>> First some good news: the cumsum code from the last issue works
>>>>>>>>>>> now correctly and is tested.
>>>>>>>>>>>
>>>>>>>>>>> Bad news (at least for me): I just run into this (for the error
>>>>>>>>>>> and code see below). You have a road map when this feature will be
>>>>>>>>>>> available? Regardless of the rest, I would need it in the near future.
>>>>>>>>>>>
>>>>>>>>>>> So far so good. But I wonder where this nested iteration should
>>>>>>>>>>> be. At least I do not see them... I have an iteration and inside a lot of
>>>>>>>>>>> filters/maps/etc. but not another iteration.
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Max
>>>>>>>>>>>
>>>>>>>>>>> Error:
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.compiler.CompilerException: An error occurred
>>>>>>>>>>> while translating the optimized plan to a nephele JobGraph: An error
>>>>>>>>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>>>>>>>>> Iterations are not possible at the moment!
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>>>>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:290)
>>>>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:285)
>>>>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:230)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>>>>>>>>>> Caused by: org.apache.flink.compiler.CompilerException: An error
>>>>>>>>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>>>>>>>>> Iterations are not possible at the moment!
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>>>>>>>>>>> ... 14 more
>>>>>>>>>>> Caused by: org.apache.flink.compiler.CompilerException: Nested
>>>>>>>>>>> Iterations are not possible at the moment!
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>>>>>>>>>>> ... 33 more
>>>>>>>>>>>
>>>>>>>>>>> Code:
>>>>>>>>>>>
>>>>>>>>>>> def createPlanFirstIteration(env: ExecutionEnvironment) = {
>>>>>>>>>>>     val X = env readTextFile config.xFile map
>>>>>>>>>>> {Vector.parseFromString(config.dimensions, _)}
>>>>>>>>>>>     val residual = env readTextFile config.yFile map
>>>>>>>>>>> {Vector.parseFromString(_)}
>>>>>>>>>>>     val randoms = env readTextFile config.randomFile map
>>>>>>>>>>> {Vector.parseFromString(_)}
>>>>>>>>>>>     val widthCandidates = env readTextFile
>>>>>>>>>>> config.widthCandidatesFile map {Vector.parseFromString(config.dimensions,
>>>>>>>>>>> _)}
>>>>>>>>>>>
>>>>>>>>>>>     val center = calcCenter(env, X, residual, randoms, 0)
>>>>>>>>>>>
>>>>>>>>>>>     val x = calcWidthHeight(env, X, residual, widthCandidates,
>>>>>>>>>>> center)
>>>>>>>>>>>
>>>>>>>>>>>     x map { _ toString } writeAsText config.outFile
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>>>>>>> residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int):
>>>>>>>>>>> DataSet[Vector] = {
>>>>>>>>>>>     val residual_2 = residual * residual
>>>>>>>>>>>     val ys = (residual_2 sumV) * (randoms filter {_.id ==
>>>>>>>>>>> iteration} neutralize)
>>>>>>>>>>>
>>>>>>>>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>>>>>>     val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>>>>>>>>>>>     val cumSum = emptyDataSet.iterateDelta(sumVector union
>>>>>>>>>>> residual_2, config.N+1, Array("id")) {
>>>>>>>>>>>       (solutionset, workset) =>
>>>>>>>>>>>       val current = workset filter (new
>>>>>>>>>>> RichFilterFunction[Vector]{
>>>>>>>>>>>         def filter(x: Vector) = x.id ==
>>>>>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>>>>>>        })
>>>>>>>>>>>       val old_sum = workset filter {_.id == -1}
>>>>>>>>>>>       val sum = VectorDataSet.add(old_sum, current.neutralize())
>>>>>>>>>>>
>>>>>>>>>>>       val new_workset = workset filter {_.id != -1} union sum
>>>>>>>>>>>        (sum map (new RichMapFunction[Vector, Vector]{
>>>>>>>>>>>           def map(x: Vector) = new
>>>>>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>>>>>>        }),
>>>>>>>>>>>       new_workset)
>>>>>>>>>>>      }
>>>>>>>>>>>    val index = cumSum.filter(new RichFilterFunction[Vector](){
>>>>>>>>>>>       var y: Vector = null
>>>>>>>>>>>      override def open(config: Configuration) = {
>>>>>>>>>>>         y =
>>>>>>>>>>> getRuntimeContext.getBroadcastVariable("ys").toList.head
>>>>>>>>>>>      }
>>>>>>>>>>>      def filter(x: Vector) = x.values(0) < y.values(0)
>>>>>>>>>>>    }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)}
>>>>>>>>>>> sum 0
>>>>>>>>>>>
>>>>>>>>>>>     val center = X.filter(new RichFilterFunction[Vector](){
>>>>>>>>>>>     var index: Int = -1
>>>>>>>>>>>     override def open(config: Configuration) = {
>>>>>>>>>>>       val x: Tuple1[Int] =
>>>>>>>>>>> getRuntimeContext.getBroadcastVariable("index").toList.head
>>>>>>>>>>>       index = x._1
>>>>>>>>>>>        }
>>>>>>>>>>>       def filter(x: Vector) = x.id == index
>>>>>>>>>>>     }).withBroadcastSet(index, "index")
>>>>>>>>>>>
>>>>>>>>>>>     center neutralize
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector],
>>>>>>>>>>> width: DataSet[Vector]): DataSet[Vector] = {
>>>>>>>>>>>     X.map(new RichMapFunction[Vector, Vector]{
>>>>>>>>>>>       var center: Vector = null
>>>>>>>>>>>       var width: Vector = null
>>>>>>>>>>>       override def open(config: Configuration) = {
>>>>>>>>>>>        center =
>>>>>>>>>>> getRuntimeContext.getBroadcastVariable("center").toList.head
>>>>>>>>>>>        width =
>>>>>>>>>>> getRuntimeContext.getBroadcastVariable("width").toList.head
>>>>>>>>>>>      }
>>>>>>>>>>>
>>>>>>>>>>>     def map(x: Vector) = new Vector(x.id,
>>>>>>>>>>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>>>>>>>>>>>     }).withBroadcastSet(center,
>>>>>>>>>>> "center").withBroadcastSet(width, "width")
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> def calcWidthHeight(env: ExecutionEnvironment, X:
>>>>>>>>>>> DataSet[Vector], residual: DataSet[Vector], widthCandidates:
>>>>>>>>>>> DataSet[Vector], center: DataSet[Vector]): DataSet[Vector] = {
>>>>>>>>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>>>>>>     val costs = emptyDataSet.iterateDelta(widthCandidates,
>>>>>>>>>>> config.NWidthCandidates, Array("id")) {
>>>>>>>>>>>        (solutionset, workset) =>
>>>>>>>>>>>        val currentWidth = workset filter (new
>>>>>>>>>>> RichFilterFunction[Vector]{
>>>>>>>>>>>          def filter(x: Vector) = x.id ==
>>>>>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>>>>>>        })
>>>>>>>>>>>
>>>>>>>>>>>       val kernelVector = getKernelVector(X, center, currentWidth)
>>>>>>>>>>>
>>>>>>>>>>>       val x1 = kernelVector dot residual map {x => x*x}
>>>>>>>>>>>       val x2 = kernelVector dot kernelVector
>>>>>>>>>>>
>>>>>>>>>>>      val cost = (x1 / x2) neutralize
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>      (cost map (new RichMapFunction[Vector, Vector]{
>>>>>>>>>>>        def map(x: Vector) = new
>>>>>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>>>>>>      }),
>>>>>>>>>>>      workset)
>>>>>>>>>>>     }
>>>>>>>>>>>
>>>>>>>>>>> // todo: will not work
>>>>>>>>>>> //val width = costs max(0)
>>>>>>>>>>>
>>>>>>>>>>> //val kernelVector = getKernelVector(X, center, width)
>>>>>>>>>>>
>>>>>>>>>>> //val x1 = kernelVector dot residual
>>>>>>>>>>> //val x2 = kernelVector dot kernelVector
>>>>>>>>>>> //val height = x1 / x2
>>>>>>>>>>>     costs
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: No Nested Iterations??? And where is the Nested Iteration?

Posted by Stephan Ewen <se...@apache.org>.
The current master contains a fix for the incorrectly identified nested
iteration bug.

Please let us know if it fixes your problem!

Greetings,
Stephan


On Wed, Nov 12, 2014 at 5:19 PM, Stephan Ewen <se...@apache.org> wrote:

> Talking at the meetup sounds good!
>
> On Wed, Nov 12, 2014 at 5:15 PM, Maximilian Alber <
> alber.maximilian@gmail.com> wrote:
>
>> Ok. With for loop style you intend a loop with a fixed range?
>> In my case I would have a delta-iteration inside a bulk-iteration. I
>> guess wouldn't be "roll-out-able"?
>>
>> Btw is there any intention to allow bulk-style iterations on several
>> datasets "concurrently"?
>>
>> Maybe we could discuss my problem next week at the meetup?
>>
>> Thank you for the offer, but I'm in the middle of thesis, thus I don't
>> have time for it.
>>
>> Cheers,
>> Max
>>
>> On Wed, Nov 12, 2014 at 4:59 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> We are not planning to add closed-loop nested iterations in the near
>>> future. That is a bit of an effort and so far, and I think no one can pick
>>> that up very soon.
>>>
>>> We will be supporting roll-out iterations (for loop style) much more
>>> efficiently soon. There is no reason why you could not nest two for-loops.
>>> However, those are only bulk-style, not delta-iteration style.
>>>
>>> If you would like to contribute iteration nesting, I could help you to
>>> get started.
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Wed, Nov 12, 2014 at 4:47 PM, Maximilian Alber <
>>> alber.maximilian@gmail.com> wrote:
>>>
>>>> Oh sorry, I just read the bug title. So my questions is when you are
>>>> planning to add nested iterations?
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> On Wed, Nov 12, 2014 at 4:45 PM, Maximilian Alber <
>>>> alber.maximilian@gmail.com> wrote:
>>>>
>>>>> Ok, thanks.
>>>>>
>>>>> But the bug causes that it Flink "sees" a nested iteration where none
>>>>> is?
>>>>> Or is it a bug that nested are not supported? If not when you plan to
>>>>> add this feature?
>>>>> Because I need nested iterations for my algorithm, so it would be nice
>>>>> to know when I can expect them.
>>>>>
>>>>> Cheers,
>>>>> Max
>>>>>
>>>>> On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <se...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> I found the cause of the bug and have opened a JIRA to track it.
>>>>>>
>>>>>> https://issues.apache.org/jira/browse/FLINK-1235
>>>>>>
>>>>>> You can watch that one to keep updated.
>>>>>>
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>> On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <se...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi!
>>>>>>>
>>>>>>> I am looking into it right now...
>>>>>>>
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <
>>>>>>> alber.maximilian@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Stephan,
>>>>>>>>
>>>>>>>> you already had time to investigate this issue?
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Max
>>>>>>>>
>>>>>>>> On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <se...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hey!
>>>>>>>>>
>>>>>>>>> Clearly, this looks like a bug. Let me investigate that and get
>>>>>>>>> back at you later...
>>>>>>>>>
>>>>>>>>> Greetings,
>>>>>>>>> Stephan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
>>>>>>>>> alber.maximilian@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Flinksters!
>>>>>>>>>>
>>>>>>>>>> First some good news: the cumsum code from the last issue works
>>>>>>>>>> now correctly and is tested.
>>>>>>>>>>
>>>>>>>>>> Bad news (at least for me): I just run into this (for the error
>>>>>>>>>> and code see below). You have a road map when this feature will be
>>>>>>>>>> available? Regardless of the rest, I would need it in the near future.
>>>>>>>>>>
>>>>>>>>>> So far so good. But I wonder where this nested iteration should
>>>>>>>>>> be. At least I do not see them... I have an iteration and inside a lot of
>>>>>>>>>> filters/maps/etc. but not another iteration.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Max
>>>>>>>>>>
>>>>>>>>>> Error:
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.compiler.CompilerException: An error occurred
>>>>>>>>>> while translating the optimized plan to a nephele JobGraph: An error
>>>>>>>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>>>>>>>> Iterations are not possible at the moment!
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>>>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:290)
>>>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:285)
>>>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:230)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>>>>>>>>> Caused by: org.apache.flink.compiler.CompilerException: An error
>>>>>>>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>>>>>>>> Iterations are not possible at the moment!
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>>>>>>>>>> ... 14 more
>>>>>>>>>> Caused by: org.apache.flink.compiler.CompilerException: Nested
>>>>>>>>>> Iterations are not possible at the moment!
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>>>>>>>>>> ... 33 more
>>>>>>>>>>
>>>>>>>>>> Code:
>>>>>>>>>>
>>>>>>>>>> def createPlanFirstIteration(env: ExecutionEnvironment) = {
>>>>>>>>>>     val X = env readTextFile config.xFile map
>>>>>>>>>> {Vector.parseFromString(config.dimensions, _)}
>>>>>>>>>>     val residual = env readTextFile config.yFile map
>>>>>>>>>> {Vector.parseFromString(_)}
>>>>>>>>>>     val randoms = env readTextFile config.randomFile map
>>>>>>>>>> {Vector.parseFromString(_)}
>>>>>>>>>>     val widthCandidates = env readTextFile
>>>>>>>>>> config.widthCandidatesFile map {Vector.parseFromString(config.dimensions,
>>>>>>>>>> _)}
>>>>>>>>>>
>>>>>>>>>>     val center = calcCenter(env, X, residual, randoms, 0)
>>>>>>>>>>
>>>>>>>>>>     val x = calcWidthHeight(env, X, residual, widthCandidates,
>>>>>>>>>> center)
>>>>>>>>>>
>>>>>>>>>>     x map { _ toString } writeAsText config.outFile
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>>>>>> residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int):
>>>>>>>>>> DataSet[Vector] = {
>>>>>>>>>>     val residual_2 = residual * residual
>>>>>>>>>>     val ys = (residual_2 sumV) * (randoms filter {_.id ==
>>>>>>>>>> iteration} neutralize)
>>>>>>>>>>
>>>>>>>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>>>>>     val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>>>>>>>>>>     val cumSum = emptyDataSet.iterateDelta(sumVector union
>>>>>>>>>> residual_2, config.N+1, Array("id")) {
>>>>>>>>>>       (solutionset, workset) =>
>>>>>>>>>>       val current = workset filter (new
>>>>>>>>>> RichFilterFunction[Vector]{
>>>>>>>>>>         def filter(x: Vector) = x.id ==
>>>>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>>>>>        })
>>>>>>>>>>       val old_sum = workset filter {_.id == -1}
>>>>>>>>>>       val sum = VectorDataSet.add(old_sum, current.neutralize())
>>>>>>>>>>
>>>>>>>>>>       val new_workset = workset filter {_.id != -1} union sum
>>>>>>>>>>        (sum map (new RichMapFunction[Vector, Vector]{
>>>>>>>>>>           def map(x: Vector) = new
>>>>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>>>>>        }),
>>>>>>>>>>       new_workset)
>>>>>>>>>>      }
>>>>>>>>>>    val index = cumSum.filter(new RichFilterFunction[Vector](){
>>>>>>>>>>       var y: Vector = null
>>>>>>>>>>      override def open(config: Configuration) = {
>>>>>>>>>>         y =
>>>>>>>>>> getRuntimeContext.getBroadcastVariable("ys").toList.head
>>>>>>>>>>      }
>>>>>>>>>>      def filter(x: Vector) = x.values(0) < y.values(0)
>>>>>>>>>>    }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum
>>>>>>>>>> 0
>>>>>>>>>>
>>>>>>>>>>     val center = X.filter(new RichFilterFunction[Vector](){
>>>>>>>>>>     var index: Int = -1
>>>>>>>>>>     override def open(config: Configuration) = {
>>>>>>>>>>       val x: Tuple1[Int] =
>>>>>>>>>> getRuntimeContext.getBroadcastVariable("index").toList.head
>>>>>>>>>>       index = x._1
>>>>>>>>>>        }
>>>>>>>>>>       def filter(x: Vector) = x.id == index
>>>>>>>>>>     }).withBroadcastSet(index, "index")
>>>>>>>>>>
>>>>>>>>>>     center neutralize
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector],
>>>>>>>>>> width: DataSet[Vector]): DataSet[Vector] = {
>>>>>>>>>>     X.map(new RichMapFunction[Vector, Vector]{
>>>>>>>>>>       var center: Vector = null
>>>>>>>>>>       var width: Vector = null
>>>>>>>>>>       override def open(config: Configuration) = {
>>>>>>>>>>        center =
>>>>>>>>>> getRuntimeContext.getBroadcastVariable("center").toList.head
>>>>>>>>>>        width =
>>>>>>>>>> getRuntimeContext.getBroadcastVariable("width").toList.head
>>>>>>>>>>      }
>>>>>>>>>>
>>>>>>>>>>     def map(x: Vector) = new Vector(x.id,
>>>>>>>>>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>>>>>>>>>>     }).withBroadcastSet(center, "center").withBroadcastSet(width,
>>>>>>>>>> "width")
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> def calcWidthHeight(env: ExecutionEnvironment, X:
>>>>>>>>>> DataSet[Vector], residual: DataSet[Vector], widthCandidates:
>>>>>>>>>> DataSet[Vector], center: DataSet[Vector]): DataSet[Vector] = {
>>>>>>>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>>>>>     val costs = emptyDataSet.iterateDelta(widthCandidates,
>>>>>>>>>> config.NWidthCandidates, Array("id")) {
>>>>>>>>>>        (solutionset, workset) =>
>>>>>>>>>>        val currentWidth = workset filter (new
>>>>>>>>>> RichFilterFunction[Vector]{
>>>>>>>>>>          def filter(x: Vector) = x.id ==
>>>>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>>>>>        })
>>>>>>>>>>
>>>>>>>>>>       val kernelVector = getKernelVector(X, center, currentWidth)
>>>>>>>>>>
>>>>>>>>>>       val x1 = kernelVector dot residual map {x => x*x}
>>>>>>>>>>       val x2 = kernelVector dot kernelVector
>>>>>>>>>>
>>>>>>>>>>      val cost = (x1 / x2) neutralize
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>      (cost map (new RichMapFunction[Vector, Vector]{
>>>>>>>>>>        def map(x: Vector) = new
>>>>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>>>>>      }),
>>>>>>>>>>      workset)
>>>>>>>>>>     }
>>>>>>>>>>
>>>>>>>>>> // todo: will not work
>>>>>>>>>> //val width = costs max(0)
>>>>>>>>>>
>>>>>>>>>> //val kernelVector = getKernelVector(X, center, width)
>>>>>>>>>>
>>>>>>>>>> //val x1 = kernelVector dot residual
>>>>>>>>>> //val x2 = kernelVector dot kernelVector
>>>>>>>>>> //val height = x1 / x2
>>>>>>>>>>     costs
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: No Nested Iterations??? And where is the Nested Iteration?

Posted by Stephan Ewen <se...@apache.org>.
Talking at the meetup sounds good!

On Wed, Nov 12, 2014 at 5:15 PM, Maximilian Alber <
alber.maximilian@gmail.com> wrote:

> Ok. With for loop style you intend a loop with a fixed range?
> In my case I would have a delta-iteration inside a bulk-iteration. I guess
> wouldn't be "roll-out-able"?
>
> Btw is there any intention to allow bulk-style iterations on several
> datasets "concurrently"?
>
> Maybe we could discuss my problem next week at the meetup?
>
> Thank you for the offer, but I'm in the middle of thesis, thus I don't
> have time for it.
>
> Cheers,
> Max
>
> On Wed, Nov 12, 2014 at 4:59 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> We are not planning to add closed-loop nested iterations in the near
>> future. That is a bit of an effort and so far, and I think no one can pick
>> that up very soon.
>>
>> We will be supporting roll-out iterations (for loop style) much more
>> efficiently soon. There is no reason why you could not nest two for-loops.
>> However, those are only bulk-style, not delta-iteration style.
>>
>> If you would like to contribute iteration nesting, I could help you to
>> get started.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Wed, Nov 12, 2014 at 4:47 PM, Maximilian Alber <
>> alber.maximilian@gmail.com> wrote:
>>
>>> Oh sorry, I just read the bug title. So my questions is when you are
>>> planning to add nested iterations?
>>>
>>> Cheers,
>>> Max
>>>
>>> On Wed, Nov 12, 2014 at 4:45 PM, Maximilian Alber <
>>> alber.maximilian@gmail.com> wrote:
>>>
>>>> Ok, thanks.
>>>>
>>>> But the bug causes that it Flink "sees" a nested iteration where none
>>>> is?
>>>> Or is it a bug that nested are not supported? If not when you plan to
>>>> add this feature?
>>>> Because I need nested iterations for my algorithm, so it would be nice
>>>> to know when I can expect them.
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <se...@apache.org> wrote:
>>>>
>>>>> I found the cause of the bug and have opened a JIRA to track it.
>>>>>
>>>>> https://issues.apache.org/jira/browse/FLINK-1235
>>>>>
>>>>> You can watch that one to keep updated.
>>>>>
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <se...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> I am looking into it right now...
>>>>>>
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>> On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <
>>>>>> alber.maximilian@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Stephan,
>>>>>>>
>>>>>>> you already had time to investigate this issue?
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Max
>>>>>>>
>>>>>>> On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <se...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey!
>>>>>>>>
>>>>>>>> Clearly, this looks like a bug. Let me investigate that and get
>>>>>>>> back at you later...
>>>>>>>>
>>>>>>>> Greetings,
>>>>>>>> Stephan
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
>>>>>>>> alber.maximilian@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Flinksters!
>>>>>>>>>
>>>>>>>>> First some good news: the cumsum code from the last issue works
>>>>>>>>> now correctly and is tested.
>>>>>>>>>
>>>>>>>>> Bad news (at least for me): I just run into this (for the error
>>>>>>>>> and code see below). You have a road map when this feature will be
>>>>>>>>> available? Regardless of the rest, I would need it in the near future.
>>>>>>>>>
>>>>>>>>> So far so good. But I wonder where this nested iteration should
>>>>>>>>> be. At least I do not see them... I have an iteration and inside a lot of
>>>>>>>>> filters/maps/etc. but not another iteration.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Max
>>>>>>>>>
>>>>>>>>> Error:
>>>>>>>>>
>>>>>>>>> org.apache.flink.compiler.CompilerException: An error occurred
>>>>>>>>> while translating the optimized plan to a nephele JobGraph: An error
>>>>>>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>>>>>>> Iterations are not possible at the moment!
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:290)
>>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:285)
>>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:230)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>>>>>>>> Caused by: org.apache.flink.compiler.CompilerException: An error
>>>>>>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>>>>>>> Iterations are not possible at the moment!
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>>>>>>>>> ... 14 more
>>>>>>>>> Caused by: org.apache.flink.compiler.CompilerException: Nested
>>>>>>>>> Iterations are not possible at the moment!
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>>>>>>>>> ... 33 more
>>>>>>>>>
>>>>>>>>> Code:
>>>>>>>>>
>>>>>>>>> def createPlanFirstIteration(env: ExecutionEnvironment) = {
>>>>>>>>>     val X = env readTextFile config.xFile map
>>>>>>>>> {Vector.parseFromString(config.dimensions, _)}
>>>>>>>>>     val residual = env readTextFile config.yFile map
>>>>>>>>> {Vector.parseFromString(_)}
>>>>>>>>>     val randoms = env readTextFile config.randomFile map
>>>>>>>>> {Vector.parseFromString(_)}
>>>>>>>>>     val widthCandidates = env readTextFile
>>>>>>>>> config.widthCandidatesFile map {Vector.parseFromString(config.dimensions,
>>>>>>>>> _)}
>>>>>>>>>
>>>>>>>>>     val center = calcCenter(env, X, residual, randoms, 0)
>>>>>>>>>
>>>>>>>>>     val x = calcWidthHeight(env, X, residual, widthCandidates,
>>>>>>>>> center)
>>>>>>>>>
>>>>>>>>>     x map { _ toString } writeAsText config.outFile
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>>>>> residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int):
>>>>>>>>> DataSet[Vector] = {
>>>>>>>>>     val residual_2 = residual * residual
>>>>>>>>>     val ys = (residual_2 sumV) * (randoms filter {_.id ==
>>>>>>>>> iteration} neutralize)
>>>>>>>>>
>>>>>>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>>>>     val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>>>>>>>>>     val cumSum = emptyDataSet.iterateDelta(sumVector union
>>>>>>>>> residual_2, config.N+1, Array("id")) {
>>>>>>>>>       (solutionset, workset) =>
>>>>>>>>>       val current = workset filter (new RichFilterFunction[Vector]{
>>>>>>>>>         def filter(x: Vector) = x.id ==
>>>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>>>>        })
>>>>>>>>>       val old_sum = workset filter {_.id == -1}
>>>>>>>>>       val sum = VectorDataSet.add(old_sum, current.neutralize())
>>>>>>>>>
>>>>>>>>>       val new_workset = workset filter {_.id != -1} union sum
>>>>>>>>>        (sum map (new RichMapFunction[Vector, Vector]{
>>>>>>>>>           def map(x: Vector) = new
>>>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>>>>        }),
>>>>>>>>>       new_workset)
>>>>>>>>>      }
>>>>>>>>>    val index = cumSum.filter(new RichFilterFunction[Vector](){
>>>>>>>>>       var y: Vector = null
>>>>>>>>>      override def open(config: Configuration) = {
>>>>>>>>>         y =
>>>>>>>>> getRuntimeContext.getBroadcastVariable("ys").toList.head
>>>>>>>>>      }
>>>>>>>>>      def filter(x: Vector) = x.values(0) < y.values(0)
>>>>>>>>>    }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0
>>>>>>>>>
>>>>>>>>>     val center = X.filter(new RichFilterFunction[Vector](){
>>>>>>>>>     var index: Int = -1
>>>>>>>>>     override def open(config: Configuration) = {
>>>>>>>>>       val x: Tuple1[Int] =
>>>>>>>>> getRuntimeContext.getBroadcastVariable("index").toList.head
>>>>>>>>>       index = x._1
>>>>>>>>>        }
>>>>>>>>>       def filter(x: Vector) = x.id == index
>>>>>>>>>     }).withBroadcastSet(index, "index")
>>>>>>>>>
>>>>>>>>>     center neutralize
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector],
>>>>>>>>> width: DataSet[Vector]): DataSet[Vector] = {
>>>>>>>>>     X.map(new RichMapFunction[Vector, Vector]{
>>>>>>>>>       var center: Vector = null
>>>>>>>>>       var width: Vector = null
>>>>>>>>>       override def open(config: Configuration) = {
>>>>>>>>>        center =
>>>>>>>>> getRuntimeContext.getBroadcastVariable("center").toList.head
>>>>>>>>>        width =
>>>>>>>>> getRuntimeContext.getBroadcastVariable("width").toList.head
>>>>>>>>>      }
>>>>>>>>>
>>>>>>>>>     def map(x: Vector) = new Vector(x.id,
>>>>>>>>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>>>>>>>>>     }).withBroadcastSet(center, "center").withBroadcastSet(width,
>>>>>>>>> "width")
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>>>>> residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
>>>>>>>>> DataSet[Vector]): DataSet[Vector] = {
>>>>>>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>>>>     val costs = emptyDataSet.iterateDelta(widthCandidates,
>>>>>>>>> config.NWidthCandidates, Array("id")) {
>>>>>>>>>        (solutionset, workset) =>
>>>>>>>>>        val currentWidth = workset filter (new
>>>>>>>>> RichFilterFunction[Vector]{
>>>>>>>>>          def filter(x: Vector) = x.id ==
>>>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>>>>        })
>>>>>>>>>
>>>>>>>>>       val kernelVector = getKernelVector(X, center, currentWidth)
>>>>>>>>>
>>>>>>>>>       val x1 = kernelVector dot residual map {x => x*x}
>>>>>>>>>       val x2 = kernelVector dot kernelVector
>>>>>>>>>
>>>>>>>>>      val cost = (x1 / x2) neutralize
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>      (cost map (new RichMapFunction[Vector, Vector]{
>>>>>>>>>        def map(x: Vector) = new
>>>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>>>>      }),
>>>>>>>>>      workset)
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>> // todo: will not work
>>>>>>>>> //val width = costs max(0)
>>>>>>>>>
>>>>>>>>> //val kernelVector = getKernelVector(X, center, width)
>>>>>>>>>
>>>>>>>>> //val x1 = kernelVector dot residual
>>>>>>>>> //val x2 = kernelVector dot kernelVector
>>>>>>>>> //val height = x1 / x2
>>>>>>>>>     costs
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: No Nested Iterations??? And where is the Nested Iteration?

Posted by Maximilian Alber <al...@gmail.com>.
Ok. With for loop style you intend a loop with a fixed range?
In my case I would have a delta-iteration inside a bulk-iteration. I guess
wouldn't be "roll-out-able"?

Btw is there any intention to allow bulk-style iterations on several
datasets "concurrently"?

Maybe we could discuss my problem next week at the meetup?

Thank you for the offer, but I'm in the middle of thesis, thus I don't have
time for it.

Cheers,
Max

On Wed, Nov 12, 2014 at 4:59 PM, Stephan Ewen <se...@apache.org> wrote:

> We are not planning to add closed-loop nested iterations in the near
> future. That is a bit of an effort and so far, and I think no one can pick
> that up very soon.
>
> We will be supporting roll-out iterations (for loop style) much more
> efficiently soon. There is no reason why you could not nest two for-loops.
> However, those are only bulk-style, not delta-iteration style.
>
> If you would like to contribute iteration nesting, I could help you to get
> started.
>
> Greetings,
> Stephan
>
>
> On Wed, Nov 12, 2014 at 4:47 PM, Maximilian Alber <
> alber.maximilian@gmail.com> wrote:
>
>> Oh sorry, I just read the bug title. So my questions is when you are
>> planning to add nested iterations?
>>
>> Cheers,
>> Max
>>
>> On Wed, Nov 12, 2014 at 4:45 PM, Maximilian Alber <
>> alber.maximilian@gmail.com> wrote:
>>
>>> Ok, thanks.
>>>
>>> But the bug causes that it Flink "sees" a nested iteration where none is?
>>> Or is it a bug that nested are not supported? If not when you plan to
>>> add this feature?
>>> Because I need nested iterations for my algorithm, so it would be nice
>>> to know when I can expect them.
>>>
>>> Cheers,
>>> Max
>>>
>>> On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> I found the cause of the bug and have opened a JIRA to track it.
>>>>
>>>> https://issues.apache.org/jira/browse/FLINK-1235
>>>>
>>>> You can watch that one to keep updated.
>>>>
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <se...@apache.org> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> I am looking into it right now...
>>>>>
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <
>>>>> alber.maximilian@gmail.com> wrote:
>>>>>
>>>>>> Hi Stephan,
>>>>>>
>>>>>> you already had time to investigate this issue?
>>>>>>
>>>>>> Cheers,
>>>>>> Max
>>>>>>
>>>>>> On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <se...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey!
>>>>>>>
>>>>>>> Clearly, this looks like a bug. Let me investigate that and get back
>>>>>>> at you later...
>>>>>>>
>>>>>>> Greetings,
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
>>>>>>> alber.maximilian@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Flinksters!
>>>>>>>>
>>>>>>>> First some good news: the cumsum code from the last issue works now
>>>>>>>> correctly and is tested.
>>>>>>>>
>>>>>>>> Bad news (at least for me): I just run into this (for the error and
>>>>>>>> code see below). You have a road map when this feature will be available?
>>>>>>>> Regardless of the rest, I would need it in the near future.
>>>>>>>>
>>>>>>>> So far so good. But I wonder where this nested iteration should be.
>>>>>>>> At least I do not see them... I have an iteration and inside a lot of
>>>>>>>> filters/maps/etc. but not another iteration.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Max
>>>>>>>>
>>>>>>>> Error:
>>>>>>>>
>>>>>>>> org.apache.flink.compiler.CompilerException: An error occurred
>>>>>>>> while translating the optimized plan to a nephele JobGraph: An error
>>>>>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>>>>>> Iterations are not possible at the moment!
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:290)
>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:285)
>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:230)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>>>>>>> Caused by: org.apache.flink.compiler.CompilerException: An error
>>>>>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>>>>>> Iterations are not possible at the moment!
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>>>>>>>> ... 14 more
>>>>>>>> Caused by: org.apache.flink.compiler.CompilerException: Nested
>>>>>>>> Iterations are not possible at the moment!
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>>>>>>>> ... 33 more
>>>>>>>>
>>>>>>>> Code:
>>>>>>>>
>>>>>>>> def createPlanFirstIteration(env: ExecutionEnvironment) = {
>>>>>>>>     val X = env readTextFile config.xFile map
>>>>>>>> {Vector.parseFromString(config.dimensions, _)}
>>>>>>>>     val residual = env readTextFile config.yFile map
>>>>>>>> {Vector.parseFromString(_)}
>>>>>>>>     val randoms = env readTextFile config.randomFile map
>>>>>>>> {Vector.parseFromString(_)}
>>>>>>>>     val widthCandidates = env readTextFile
>>>>>>>> config.widthCandidatesFile map {Vector.parseFromString(config.dimensions,
>>>>>>>> _)}
>>>>>>>>
>>>>>>>>     val center = calcCenter(env, X, residual, randoms, 0)
>>>>>>>>
>>>>>>>>     val x = calcWidthHeight(env, X, residual, widthCandidates,
>>>>>>>> center)
>>>>>>>>
>>>>>>>>     x map { _ toString } writeAsText config.outFile
>>>>>>>> }
>>>>>>>>
>>>>>>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>>>> residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int):
>>>>>>>> DataSet[Vector] = {
>>>>>>>>     val residual_2 = residual * residual
>>>>>>>>     val ys = (residual_2 sumV) * (randoms filter {_.id ==
>>>>>>>> iteration} neutralize)
>>>>>>>>
>>>>>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>>>     val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>>>>>>>>     val cumSum = emptyDataSet.iterateDelta(sumVector union
>>>>>>>> residual_2, config.N+1, Array("id")) {
>>>>>>>>       (solutionset, workset) =>
>>>>>>>>       val current = workset filter (new RichFilterFunction[Vector]{
>>>>>>>>         def filter(x: Vector) = x.id ==
>>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>>>        })
>>>>>>>>       val old_sum = workset filter {_.id == -1}
>>>>>>>>       val sum = VectorDataSet.add(old_sum, current.neutralize())
>>>>>>>>
>>>>>>>>       val new_workset = workset filter {_.id != -1} union sum
>>>>>>>>        (sum map (new RichMapFunction[Vector, Vector]{
>>>>>>>>           def map(x: Vector) = new
>>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>>>        }),
>>>>>>>>       new_workset)
>>>>>>>>      }
>>>>>>>>    val index = cumSum.filter(new RichFilterFunction[Vector](){
>>>>>>>>       var y: Vector = null
>>>>>>>>      override def open(config: Configuration) = {
>>>>>>>>         y = getRuntimeContext.getBroadcastVariable("ys").toList.head
>>>>>>>>      }
>>>>>>>>      def filter(x: Vector) = x.values(0) < y.values(0)
>>>>>>>>    }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0
>>>>>>>>
>>>>>>>>     val center = X.filter(new RichFilterFunction[Vector](){
>>>>>>>>     var index: Int = -1
>>>>>>>>     override def open(config: Configuration) = {
>>>>>>>>       val x: Tuple1[Int] =
>>>>>>>> getRuntimeContext.getBroadcastVariable("index").toList.head
>>>>>>>>       index = x._1
>>>>>>>>        }
>>>>>>>>       def filter(x: Vector) = x.id == index
>>>>>>>>     }).withBroadcastSet(index, "index")
>>>>>>>>
>>>>>>>>     center neutralize
>>>>>>>> }
>>>>>>>>
>>>>>>>> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector],
>>>>>>>> width: DataSet[Vector]): DataSet[Vector] = {
>>>>>>>>     X.map(new RichMapFunction[Vector, Vector]{
>>>>>>>>       var center: Vector = null
>>>>>>>>       var width: Vector = null
>>>>>>>>       override def open(config: Configuration) = {
>>>>>>>>        center =
>>>>>>>> getRuntimeContext.getBroadcastVariable("center").toList.head
>>>>>>>>        width =
>>>>>>>> getRuntimeContext.getBroadcastVariable("width").toList.head
>>>>>>>>      }
>>>>>>>>
>>>>>>>>     def map(x: Vector) = new Vector(x.id,
>>>>>>>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>>>>>>>>     }).withBroadcastSet(center, "center").withBroadcastSet(width,
>>>>>>>> "width")
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>>>> residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
>>>>>>>> DataSet[Vector]): DataSet[Vector] = {
>>>>>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>>>     val costs = emptyDataSet.iterateDelta(widthCandidates,
>>>>>>>> config.NWidthCandidates, Array("id")) {
>>>>>>>>        (solutionset, workset) =>
>>>>>>>>        val currentWidth = workset filter (new
>>>>>>>> RichFilterFunction[Vector]{
>>>>>>>>          def filter(x: Vector) = x.id ==
>>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>>>        })
>>>>>>>>
>>>>>>>>       val kernelVector = getKernelVector(X, center, currentWidth)
>>>>>>>>
>>>>>>>>       val x1 = kernelVector dot residual map {x => x*x}
>>>>>>>>       val x2 = kernelVector dot kernelVector
>>>>>>>>
>>>>>>>>      val cost = (x1 / x2) neutralize
>>>>>>>>
>>>>>>>>
>>>>>>>>      (cost map (new RichMapFunction[Vector, Vector]{
>>>>>>>>        def map(x: Vector) = new
>>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>>>      }),
>>>>>>>>      workset)
>>>>>>>>     }
>>>>>>>>
>>>>>>>> // todo: will not work
>>>>>>>> //val width = costs max(0)
>>>>>>>>
>>>>>>>> //val kernelVector = getKernelVector(X, center, width)
>>>>>>>>
>>>>>>>> //val x1 = kernelVector dot residual
>>>>>>>> //val x2 = kernelVector dot kernelVector
>>>>>>>> //val height = x1 / x2
>>>>>>>>     costs
>>>>>>>> }
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: No Nested Iterations??? And where is the Nested Iteration?

Posted by Stephan Ewen <se...@apache.org>.
We are not planning to add closed-loop nested iterations in the near
future. That is a bit of an effort and so far, and I think no one can pick
that up very soon.

We will be supporting roll-out iterations (for loop style) much more
efficiently soon. There is no reason why you could not nest two for-loops.
However, those are only bulk-style, not delta-iteration style.

If you would like to contribute iteration nesting, I could help you to get
started.

Greetings,
Stephan


On Wed, Nov 12, 2014 at 4:47 PM, Maximilian Alber <
alber.maximilian@gmail.com> wrote:

> Oh sorry, I just read the bug title. So my questions is when you are
> planning to add nested iterations?
>
> Cheers,
> Max
>
> On Wed, Nov 12, 2014 at 4:45 PM, Maximilian Alber <
> alber.maximilian@gmail.com> wrote:
>
>> Ok, thanks.
>>
>> But the bug causes that it Flink "sees" a nested iteration where none is?
>> Or is it a bug that nested are not supported? If not when you plan to add
>> this feature?
>> Because I need nested iterations for my algorithm, so it would be nice to
>> know when I can expect them.
>>
>> Cheers,
>> Max
>>
>> On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> I found the cause of the bug and have opened a JIRA to track it.
>>>
>>> https://issues.apache.org/jira/browse/FLINK-1235
>>>
>>> You can watch that one to keep updated.
>>>
>>> Stephan
>>>
>>>
>>> On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> Hi!
>>>>
>>>> I am looking into it right now...
>>>>
>>>> Stephan
>>>>
>>>>
>>>> On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <
>>>> alber.maximilian@gmail.com> wrote:
>>>>
>>>>> Hi Stephan,
>>>>>
>>>>> you already had time to investigate this issue?
>>>>>
>>>>> Cheers,
>>>>> Max
>>>>>
>>>>> On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <se...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hey!
>>>>>>
>>>>>> Clearly, this looks like a bug. Let me investigate that and get back
>>>>>> at you later...
>>>>>>
>>>>>> Greetings,
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
>>>>>> alber.maximilian@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Flinksters!
>>>>>>>
>>>>>>> First some good news: the cumsum code from the last issue works now
>>>>>>> correctly and is tested.
>>>>>>>
>>>>>>> Bad news (at least for me): I just run into this (for the error and
>>>>>>> code see below). You have a road map when this feature will be available?
>>>>>>> Regardless of the rest, I would need it in the near future.
>>>>>>>
>>>>>>> So far so good. But I wonder where this nested iteration should be.
>>>>>>> At least I do not see them... I have an iteration and inside a lot of
>>>>>>> filters/maps/etc. but not another iteration.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Max
>>>>>>>
>>>>>>> Error:
>>>>>>>
>>>>>>> org.apache.flink.compiler.CompilerException: An error occurred while
>>>>>>> translating the optimized plan to a nephele JobGraph: An error occurred
>>>>>>> while translating the optimized plan to a nephele JobGraph: Nested
>>>>>>> Iterations are not possible at the moment!
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>>>>>>> at
>>>>>>> org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:290)
>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:285)
>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:230)
>>>>>>> at
>>>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>>>>> at
>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>>>>>> Caused by: org.apache.flink.compiler.CompilerException: An error
>>>>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>>>>> Iterations are not possible at the moment!
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>>>>>>> ... 14 more
>>>>>>> Caused by: org.apache.flink.compiler.CompilerException: Nested
>>>>>>> Iterations are not possible at the moment!
>>>>>>> at
>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>>>>>>> ... 33 more
>>>>>>>
>>>>>>> Code:
>>>>>>>
>>>>>>> def createPlanFirstIteration(env: ExecutionEnvironment) = {
>>>>>>>     val X = env readTextFile config.xFile map
>>>>>>> {Vector.parseFromString(config.dimensions, _)}
>>>>>>>     val residual = env readTextFile config.yFile map
>>>>>>> {Vector.parseFromString(_)}
>>>>>>>     val randoms = env readTextFile config.randomFile map
>>>>>>> {Vector.parseFromString(_)}
>>>>>>>     val widthCandidates = env readTextFile
>>>>>>> config.widthCandidatesFile map {Vector.parseFromString(config.dimensions,
>>>>>>> _)}
>>>>>>>
>>>>>>>     val center = calcCenter(env, X, residual, randoms, 0)
>>>>>>>
>>>>>>>     val x = calcWidthHeight(env, X, residual, widthCandidates,
>>>>>>> center)
>>>>>>>
>>>>>>>     x map { _ toString } writeAsText config.outFile
>>>>>>> }
>>>>>>>
>>>>>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>>> residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int):
>>>>>>> DataSet[Vector] = {
>>>>>>>     val residual_2 = residual * residual
>>>>>>>     val ys = (residual_2 sumV) * (randoms filter {_.id == iteration}
>>>>>>> neutralize)
>>>>>>>
>>>>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>>     val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>>>>>>>     val cumSum = emptyDataSet.iterateDelta(sumVector union
>>>>>>> residual_2, config.N+1, Array("id")) {
>>>>>>>       (solutionset, workset) =>
>>>>>>>       val current = workset filter (new RichFilterFunction[Vector]{
>>>>>>>         def filter(x: Vector) = x.id ==
>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>>        })
>>>>>>>       val old_sum = workset filter {_.id == -1}
>>>>>>>       val sum = VectorDataSet.add(old_sum, current.neutralize())
>>>>>>>
>>>>>>>       val new_workset = workset filter {_.id != -1} union sum
>>>>>>>        (sum map (new RichMapFunction[Vector, Vector]{
>>>>>>>           def map(x: Vector) = new
>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>>        }),
>>>>>>>       new_workset)
>>>>>>>      }
>>>>>>>    val index = cumSum.filter(new RichFilterFunction[Vector](){
>>>>>>>       var y: Vector = null
>>>>>>>      override def open(config: Configuration) = {
>>>>>>>         y = getRuntimeContext.getBroadcastVariable("ys").toList.head
>>>>>>>      }
>>>>>>>      def filter(x: Vector) = x.values(0) < y.values(0)
>>>>>>>    }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0
>>>>>>>
>>>>>>>     val center = X.filter(new RichFilterFunction[Vector](){
>>>>>>>     var index: Int = -1
>>>>>>>     override def open(config: Configuration) = {
>>>>>>>       val x: Tuple1[Int] =
>>>>>>> getRuntimeContext.getBroadcastVariable("index").toList.head
>>>>>>>       index = x._1
>>>>>>>        }
>>>>>>>       def filter(x: Vector) = x.id == index
>>>>>>>     }).withBroadcastSet(index, "index")
>>>>>>>
>>>>>>>     center neutralize
>>>>>>> }
>>>>>>>
>>>>>>> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector],
>>>>>>> width: DataSet[Vector]): DataSet[Vector] = {
>>>>>>>     X.map(new RichMapFunction[Vector, Vector]{
>>>>>>>       var center: Vector = null
>>>>>>>       var width: Vector = null
>>>>>>>       override def open(config: Configuration) = {
>>>>>>>        center =
>>>>>>> getRuntimeContext.getBroadcastVariable("center").toList.head
>>>>>>>        width =
>>>>>>> getRuntimeContext.getBroadcastVariable("width").toList.head
>>>>>>>      }
>>>>>>>
>>>>>>>     def map(x: Vector) = new Vector(x.id,
>>>>>>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>>>>>>>     }).withBroadcastSet(center, "center").withBroadcastSet(width,
>>>>>>> "width")
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>>> residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
>>>>>>> DataSet[Vector]): DataSet[Vector] = {
>>>>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>>     val costs = emptyDataSet.iterateDelta(widthCandidates,
>>>>>>> config.NWidthCandidates, Array("id")) {
>>>>>>>        (solutionset, workset) =>
>>>>>>>        val currentWidth = workset filter (new
>>>>>>> RichFilterFunction[Vector]{
>>>>>>>          def filter(x: Vector) = x.id ==
>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>>        })
>>>>>>>
>>>>>>>       val kernelVector = getKernelVector(X, center, currentWidth)
>>>>>>>
>>>>>>>       val x1 = kernelVector dot residual map {x => x*x}
>>>>>>>       val x2 = kernelVector dot kernelVector
>>>>>>>
>>>>>>>      val cost = (x1 / x2) neutralize
>>>>>>>
>>>>>>>
>>>>>>>      (cost map (new RichMapFunction[Vector, Vector]{
>>>>>>>        def map(x: Vector) = new
>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>>      }),
>>>>>>>      workset)
>>>>>>>     }
>>>>>>>
>>>>>>> // todo: will not work
>>>>>>> //val width = costs max(0)
>>>>>>>
>>>>>>> //val kernelVector = getKernelVector(X, center, width)
>>>>>>>
>>>>>>> //val x1 = kernelVector dot residual
>>>>>>> //val x2 = kernelVector dot kernelVector
>>>>>>> //val height = x1 / x2
>>>>>>>     costs
>>>>>>> }
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: No Nested Iterations??? And where is the Nested Iteration?

Posted by Maximilian Alber <al...@gmail.com>.
Oh sorry, I just read the bug title. So my questions is when you are
planning to add nested iterations?

Cheers,
Max

On Wed, Nov 12, 2014 at 4:45 PM, Maximilian Alber <
alber.maximilian@gmail.com> wrote:

> Ok, thanks.
>
> But the bug causes that it Flink "sees" a nested iteration where none is?
> Or is it a bug that nested are not supported? If not when you plan to add
> this feature?
> Because I need nested iterations for my algorithm, so it would be nice to
> know when I can expect them.
>
> Cheers,
> Max
>
> On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> I found the cause of the bug and have opened a JIRA to track it.
>>
>> https://issues.apache.org/jira/browse/FLINK-1235
>>
>> You can watch that one to keep updated.
>>
>> Stephan
>>
>>
>> On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Hi!
>>>
>>> I am looking into it right now...
>>>
>>> Stephan
>>>
>>>
>>> On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <
>>> alber.maximilian@gmail.com> wrote:
>>>
>>>> Hi Stephan,
>>>>
>>>> you already had time to investigate this issue?
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <se...@apache.org> wrote:
>>>>
>>>>> Hey!
>>>>>
>>>>> Clearly, this looks like a bug. Let me investigate that and get back
>>>>> at you later...
>>>>>
>>>>> Greetings,
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
>>>>> alber.maximilian@gmail.com> wrote:
>>>>>
>>>>>> Hi Flinksters!
>>>>>>
>>>>>> First some good news: the cumsum code from the last issue works now
>>>>>> correctly and is tested.
>>>>>>
>>>>>> Bad news (at least for me): I just run into this (for the error and
>>>>>> code see below). You have a road map when this feature will be available?
>>>>>> Regardless of the rest, I would need it in the near future.
>>>>>>
>>>>>> So far so good. But I wonder where this nested iteration should be.
>>>>>> At least I do not see them... I have an iteration and inside a lot of
>>>>>> filters/maps/etc. but not another iteration.
>>>>>>
>>>>>> Cheers,
>>>>>> Max
>>>>>>
>>>>>> Error:
>>>>>>
>>>>>> org.apache.flink.compiler.CompilerException: An error occurred while
>>>>>> translating the optimized plan to a nephele JobGraph: An error occurred
>>>>>> while translating the optimized plan to a nephele JobGraph: Nested
>>>>>> Iterations are not possible at the moment!
>>>>>> at
>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>> at
>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>>>>> at
>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>>>>>> at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>>>>>> at org.apache.flink.client.program.Client.run(Client.java:290)
>>>>>> at org.apache.flink.client.program.Client.run(Client.java:285)
>>>>>> at org.apache.flink.client.program.Client.run(Client.java:230)
>>>>>> at
>>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>>>> at
>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>>>>> Caused by: org.apache.flink.compiler.CompilerException: An error
>>>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>>>> Iterations are not possible at the moment!
>>>>>> at
>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>> at
>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>>>>>> at
>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>>>>>> ... 14 more
>>>>>> Caused by: org.apache.flink.compiler.CompilerException: Nested
>>>>>> Iterations are not possible at the moment!
>>>>>> at
>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>>>>>> ... 33 more
>>>>>>
>>>>>> Code:
>>>>>>
>>>>>> def createPlanFirstIteration(env: ExecutionEnvironment) = {
>>>>>>     val X = env readTextFile config.xFile map
>>>>>> {Vector.parseFromString(config.dimensions, _)}
>>>>>>     val residual = env readTextFile config.yFile map
>>>>>> {Vector.parseFromString(_)}
>>>>>>     val randoms = env readTextFile config.randomFile map
>>>>>> {Vector.parseFromString(_)}
>>>>>>     val widthCandidates = env readTextFile config.widthCandidatesFile
>>>>>> map {Vector.parseFromString(config.dimensions, _)}
>>>>>>
>>>>>>     val center = calcCenter(env, X, residual, randoms, 0)
>>>>>>
>>>>>>     val x = calcWidthHeight(env, X, residual, widthCandidates, center)
>>>>>>
>>>>>>     x map { _ toString } writeAsText config.outFile
>>>>>> }
>>>>>>
>>>>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>> residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int):
>>>>>> DataSet[Vector] = {
>>>>>>     val residual_2 = residual * residual
>>>>>>     val ys = (residual_2 sumV) * (randoms filter {_.id == iteration}
>>>>>> neutralize)
>>>>>>
>>>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>     val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>>>>>>     val cumSum = emptyDataSet.iterateDelta(sumVector union
>>>>>> residual_2, config.N+1, Array("id")) {
>>>>>>       (solutionset, workset) =>
>>>>>>       val current = workset filter (new RichFilterFunction[Vector]{
>>>>>>         def filter(x: Vector) = x.id ==
>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>        })
>>>>>>       val old_sum = workset filter {_.id == -1}
>>>>>>       val sum = VectorDataSet.add(old_sum, current.neutralize())
>>>>>>
>>>>>>       val new_workset = workset filter {_.id != -1} union sum
>>>>>>        (sum map (new RichMapFunction[Vector, Vector]{
>>>>>>           def map(x: Vector) = new
>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>        }),
>>>>>>       new_workset)
>>>>>>      }
>>>>>>    val index = cumSum.filter(new RichFilterFunction[Vector](){
>>>>>>       var y: Vector = null
>>>>>>      override def open(config: Configuration) = {
>>>>>>         y = getRuntimeContext.getBroadcastVariable("ys").toList.head
>>>>>>      }
>>>>>>      def filter(x: Vector) = x.values(0) < y.values(0)
>>>>>>    }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0
>>>>>>
>>>>>>     val center = X.filter(new RichFilterFunction[Vector](){
>>>>>>     var index: Int = -1
>>>>>>     override def open(config: Configuration) = {
>>>>>>       val x: Tuple1[Int] =
>>>>>> getRuntimeContext.getBroadcastVariable("index").toList.head
>>>>>>       index = x._1
>>>>>>        }
>>>>>>       def filter(x: Vector) = x.id == index
>>>>>>     }).withBroadcastSet(index, "index")
>>>>>>
>>>>>>     center neutralize
>>>>>> }
>>>>>>
>>>>>> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector],
>>>>>> width: DataSet[Vector]): DataSet[Vector] = {
>>>>>>     X.map(new RichMapFunction[Vector, Vector]{
>>>>>>       var center: Vector = null
>>>>>>       var width: Vector = null
>>>>>>       override def open(config: Configuration) = {
>>>>>>        center =
>>>>>> getRuntimeContext.getBroadcastVariable("center").toList.head
>>>>>>        width =
>>>>>> getRuntimeContext.getBroadcastVariable("width").toList.head
>>>>>>      }
>>>>>>
>>>>>>     def map(x: Vector) = new Vector(x.id,
>>>>>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>>>>>>     }).withBroadcastSet(center, "center").withBroadcastSet(width,
>>>>>> "width")
>>>>>> }
>>>>>>
>>>>>>
>>>>>> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>> residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
>>>>>> DataSet[Vector]): DataSet[Vector] = {
>>>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>     val costs = emptyDataSet.iterateDelta(widthCandidates,
>>>>>> config.NWidthCandidates, Array("id")) {
>>>>>>        (solutionset, workset) =>
>>>>>>        val currentWidth = workset filter (new
>>>>>> RichFilterFunction[Vector]{
>>>>>>          def filter(x: Vector) = x.id ==
>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>        })
>>>>>>
>>>>>>       val kernelVector = getKernelVector(X, center, currentWidth)
>>>>>>
>>>>>>       val x1 = kernelVector dot residual map {x => x*x}
>>>>>>       val x2 = kernelVector dot kernelVector
>>>>>>
>>>>>>      val cost = (x1 / x2) neutralize
>>>>>>
>>>>>>
>>>>>>      (cost map (new RichMapFunction[Vector, Vector]{
>>>>>>        def map(x: Vector) = new
>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>      }),
>>>>>>      workset)
>>>>>>     }
>>>>>>
>>>>>> // todo: will not work
>>>>>> //val width = costs max(0)
>>>>>>
>>>>>> //val kernelVector = getKernelVector(X, center, width)
>>>>>>
>>>>>> //val x1 = kernelVector dot residual
>>>>>> //val x2 = kernelVector dot kernelVector
>>>>>> //val height = x1 / x2
>>>>>>     costs
>>>>>> }
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: No Nested Iterations??? And where is the Nested Iteration?

Posted by Maximilian Alber <al...@gmail.com>.
Ok, thanks.

But the bug causes that it Flink "sees" a nested iteration where none is?
Or is it a bug that nested are not supported? If not when you plan to add
this feature?
Because I need nested iterations for my algorithm, so it would be nice to
know when I can expect them.

Cheers,
Max

On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <se...@apache.org> wrote:

> I found the cause of the bug and have opened a JIRA to track it.
>
> https://issues.apache.org/jira/browse/FLINK-1235
>
> You can watch that one to keep updated.
>
> Stephan
>
>
> On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi!
>>
>> I am looking into it right now...
>>
>> Stephan
>>
>>
>> On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <
>> alber.maximilian@gmail.com> wrote:
>>
>>> Hi Stephan,
>>>
>>> you already had time to investigate this issue?
>>>
>>> Cheers,
>>> Max
>>>
>>> On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> Hey!
>>>>
>>>> Clearly, this looks like a bug. Let me investigate that and get back at
>>>> you later...
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
>>>> alber.maximilian@gmail.com> wrote:
>>>>
>>>>> Hi Flinksters!
>>>>>
>>>>> First some good news: the cumsum code from the last issue works now
>>>>> correctly and is tested.
>>>>>
>>>>> Bad news (at least for me): I just run into this (for the error and
>>>>> code see below). You have a road map when this feature will be available?
>>>>> Regardless of the rest, I would need it in the near future.
>>>>>
>>>>> So far so good. But I wonder where this nested iteration should be. At
>>>>> least I do not see them... I have an iteration and inside a lot of
>>>>> filters/maps/etc. but not another iteration.
>>>>>
>>>>> Cheers,
>>>>> Max
>>>>>
>>>>> Error:
>>>>>
>>>>> org.apache.flink.compiler.CompilerException: An error occurred while
>>>>> translating the optimized plan to a nephele JobGraph: An error occurred
>>>>> while translating the optimized plan to a nephele JobGraph: Nested
>>>>> Iterations are not possible at the moment!
>>>>> at
>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>> at
>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>> at
>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>>>> at
>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>>>>> at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>>>>> at org.apache.flink.client.program.Client.run(Client.java:290)
>>>>> at org.apache.flink.client.program.Client.run(Client.java:285)
>>>>> at org.apache.flink.client.program.Client.run(Client.java:230)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>>>> Caused by: org.apache.flink.compiler.CompilerException: An error
>>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>>> Iterations are not possible at the moment!
>>>>> at
>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>> at
>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>> at
>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>> at
>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>> at
>>>>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>>>>> at
>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>>>>> ... 14 more
>>>>> Caused by: org.apache.flink.compiler.CompilerException: Nested
>>>>> Iterations are not possible at the moment!
>>>>> at
>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>>>>> ... 33 more
>>>>>
>>>>> Code:
>>>>>
>>>>> def createPlanFirstIteration(env: ExecutionEnvironment) = {
>>>>>     val X = env readTextFile config.xFile map
>>>>> {Vector.parseFromString(config.dimensions, _)}
>>>>>     val residual = env readTextFile config.yFile map
>>>>> {Vector.parseFromString(_)}
>>>>>     val randoms = env readTextFile config.randomFile map
>>>>> {Vector.parseFromString(_)}
>>>>>     val widthCandidates = env readTextFile config.widthCandidatesFile
>>>>> map {Vector.parseFromString(config.dimensions, _)}
>>>>>
>>>>>     val center = calcCenter(env, X, residual, randoms, 0)
>>>>>
>>>>>     val x = calcWidthHeight(env, X, residual, widthCandidates, center)
>>>>>
>>>>>     x map { _ toString } writeAsText config.outFile
>>>>> }
>>>>>
>>>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>> residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int):
>>>>> DataSet[Vector] = {
>>>>>     val residual_2 = residual * residual
>>>>>     val ys = (residual_2 sumV) * (randoms filter {_.id == iteration}
>>>>> neutralize)
>>>>>
>>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>     val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>>>>>     val cumSum = emptyDataSet.iterateDelta(sumVector union residual_2,
>>>>> config.N+1, Array("id")) {
>>>>>       (solutionset, workset) =>
>>>>>       val current = workset filter (new RichFilterFunction[Vector]{
>>>>>         def filter(x: Vector) = x.id ==
>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>        })
>>>>>       val old_sum = workset filter {_.id == -1}
>>>>>       val sum = VectorDataSet.add(old_sum, current.neutralize())
>>>>>
>>>>>       val new_workset = workset filter {_.id != -1} union sum
>>>>>        (sum map (new RichMapFunction[Vector, Vector]{
>>>>>           def map(x: Vector) = new
>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>        }),
>>>>>       new_workset)
>>>>>      }
>>>>>    val index = cumSum.filter(new RichFilterFunction[Vector](){
>>>>>       var y: Vector = null
>>>>>      override def open(config: Configuration) = {
>>>>>         y = getRuntimeContext.getBroadcastVariable("ys").toList.head
>>>>>      }
>>>>>      def filter(x: Vector) = x.values(0) < y.values(0)
>>>>>    }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0
>>>>>
>>>>>     val center = X.filter(new RichFilterFunction[Vector](){
>>>>>     var index: Int = -1
>>>>>     override def open(config: Configuration) = {
>>>>>       val x: Tuple1[Int] =
>>>>> getRuntimeContext.getBroadcastVariable("index").toList.head
>>>>>       index = x._1
>>>>>        }
>>>>>       def filter(x: Vector) = x.id == index
>>>>>     }).withBroadcastSet(index, "index")
>>>>>
>>>>>     center neutralize
>>>>> }
>>>>>
>>>>> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector],
>>>>> width: DataSet[Vector]): DataSet[Vector] = {
>>>>>     X.map(new RichMapFunction[Vector, Vector]{
>>>>>       var center: Vector = null
>>>>>       var width: Vector = null
>>>>>       override def open(config: Configuration) = {
>>>>>        center =
>>>>> getRuntimeContext.getBroadcastVariable("center").toList.head
>>>>>        width =
>>>>> getRuntimeContext.getBroadcastVariable("width").toList.head
>>>>>      }
>>>>>
>>>>>     def map(x: Vector) = new Vector(x.id,
>>>>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>>>>>     }).withBroadcastSet(center, "center").withBroadcastSet(width,
>>>>> "width")
>>>>> }
>>>>>
>>>>>
>>>>> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>> residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
>>>>> DataSet[Vector]): DataSet[Vector] = {
>>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>     val costs = emptyDataSet.iterateDelta(widthCandidates,
>>>>> config.NWidthCandidates, Array("id")) {
>>>>>        (solutionset, workset) =>
>>>>>        val currentWidth = workset filter (new
>>>>> RichFilterFunction[Vector]{
>>>>>          def filter(x: Vector) = x.id ==
>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>        })
>>>>>
>>>>>       val kernelVector = getKernelVector(X, center, currentWidth)
>>>>>
>>>>>       val x1 = kernelVector dot residual map {x => x*x}
>>>>>       val x2 = kernelVector dot kernelVector
>>>>>
>>>>>      val cost = (x1 / x2) neutralize
>>>>>
>>>>>
>>>>>      (cost map (new RichMapFunction[Vector, Vector]{
>>>>>        def map(x: Vector) = new
>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>      }),
>>>>>      workset)
>>>>>     }
>>>>>
>>>>> // todo: will not work
>>>>> //val width = costs max(0)
>>>>>
>>>>> //val kernelVector = getKernelVector(X, center, width)
>>>>>
>>>>> //val x1 = kernelVector dot residual
>>>>> //val x2 = kernelVector dot kernelVector
>>>>> //val height = x1 / x2
>>>>>     costs
>>>>> }
>>>>>
>>>>
>>>>
>>>
>>
>

Re: No Nested Iterations??? And where is the Nested Iteration?

Posted by Stephan Ewen <se...@apache.org>.
I found the cause of the bug and have opened a JIRA to track it.

https://issues.apache.org/jira/browse/FLINK-1235

You can watch that one to keep updated.

Stephan


On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> I am looking into it right now...
>
> Stephan
>
>
> On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <
> alber.maximilian@gmail.com> wrote:
>
>> Hi Stephan,
>>
>> you already had time to investigate this issue?
>>
>> Cheers,
>> Max
>>
>> On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Hey!
>>>
>>> Clearly, this looks like a bug. Let me investigate that and get back at
>>> you later...
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
>>> alber.maximilian@gmail.com> wrote:
>>>
>>>> Hi Flinksters!
>>>>
>>>> First some good news: the cumsum code from the last issue works now
>>>> correctly and is tested.
>>>>
>>>> Bad news (at least for me): I just run into this (for the error and
>>>> code see below). You have a road map when this feature will be available?
>>>> Regardless of the rest, I would need it in the near future.
>>>>
>>>> So far so good. But I wonder where this nested iteration should be. At
>>>> least I do not see them... I have an iteration and inside a lot of
>>>> filters/maps/etc. but not another iteration.
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> Error:
>>>>
>>>> org.apache.flink.compiler.CompilerException: An error occurred while
>>>> translating the optimized plan to a nephele JobGraph: An error occurred
>>>> while translating the optimized plan to a nephele JobGraph: Nested
>>>> Iterations are not possible at the moment!
>>>> at
>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>> at
>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>> at
>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>>> at
>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>>>> at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>>>> at org.apache.flink.client.program.Client.run(Client.java:290)
>>>> at org.apache.flink.client.program.Client.run(Client.java:285)
>>>> at org.apache.flink.client.program.Client.run(Client.java:230)
>>>> at
>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>> at
>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>>> Caused by: org.apache.flink.compiler.CompilerException: An error
>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>> Iterations are not possible at the moment!
>>>> at
>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>> at
>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>> at
>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>> at
>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>>>> at
>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>>>> ... 14 more
>>>> Caused by: org.apache.flink.compiler.CompilerException: Nested
>>>> Iterations are not possible at the moment!
>>>> at
>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>>>> ... 33 more
>>>>
>>>> Code:
>>>>
>>>> def createPlanFirstIteration(env: ExecutionEnvironment) = {
>>>>     val X = env readTextFile config.xFile map
>>>> {Vector.parseFromString(config.dimensions, _)}
>>>>     val residual = env readTextFile config.yFile map
>>>> {Vector.parseFromString(_)}
>>>>     val randoms = env readTextFile config.randomFile map
>>>> {Vector.parseFromString(_)}
>>>>     val widthCandidates = env readTextFile config.widthCandidatesFile
>>>> map {Vector.parseFromString(config.dimensions, _)}
>>>>
>>>>     val center = calcCenter(env, X, residual, randoms, 0)
>>>>
>>>>     val x = calcWidthHeight(env, X, residual, widthCandidates, center)
>>>>
>>>>     x map { _ toString } writeAsText config.outFile
>>>> }
>>>>
>>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual:
>>>> DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector]
>>>> = {
>>>>     val residual_2 = residual * residual
>>>>     val ys = (residual_2 sumV) * (randoms filter {_.id == iteration}
>>>> neutralize)
>>>>
>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>     val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>>>>     val cumSum = emptyDataSet.iterateDelta(sumVector union residual_2,
>>>> config.N+1, Array("id")) {
>>>>       (solutionset, workset) =>
>>>>       val current = workset filter (new RichFilterFunction[Vector]{
>>>>         def filter(x: Vector) = x.id ==
>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>        })
>>>>       val old_sum = workset filter {_.id == -1}
>>>>       val sum = VectorDataSet.add(old_sum, current.neutralize())
>>>>
>>>>       val new_workset = workset filter {_.id != -1} union sum
>>>>        (sum map (new RichMapFunction[Vector, Vector]{
>>>>           def map(x: Vector) = new
>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>        }),
>>>>       new_workset)
>>>>      }
>>>>    val index = cumSum.filter(new RichFilterFunction[Vector](){
>>>>       var y: Vector = null
>>>>      override def open(config: Configuration) = {
>>>>         y = getRuntimeContext.getBroadcastVariable("ys").toList.head
>>>>      }
>>>>      def filter(x: Vector) = x.values(0) < y.values(0)
>>>>    }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0
>>>>
>>>>     val center = X.filter(new RichFilterFunction[Vector](){
>>>>     var index: Int = -1
>>>>     override def open(config: Configuration) = {
>>>>       val x: Tuple1[Int] =
>>>> getRuntimeContext.getBroadcastVariable("index").toList.head
>>>>       index = x._1
>>>>        }
>>>>       def filter(x: Vector) = x.id == index
>>>>     }).withBroadcastSet(index, "index")
>>>>
>>>>     center neutralize
>>>> }
>>>>
>>>> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector], width:
>>>> DataSet[Vector]): DataSet[Vector] = {
>>>>     X.map(new RichMapFunction[Vector, Vector]{
>>>>       var center: Vector = null
>>>>       var width: Vector = null
>>>>       override def open(config: Configuration) = {
>>>>        center =
>>>> getRuntimeContext.getBroadcastVariable("center").toList.head
>>>>        width =
>>>> getRuntimeContext.getBroadcastVariable("width").toList.head
>>>>      }
>>>>
>>>>     def map(x: Vector) = new Vector(x.id,
>>>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>>>>     }).withBroadcastSet(center, "center").withBroadcastSet(width,
>>>> "width")
>>>> }
>>>>
>>>>
>>>> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
>>>> residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
>>>> DataSet[Vector]): DataSet[Vector] = {
>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>     val costs = emptyDataSet.iterateDelta(widthCandidates,
>>>> config.NWidthCandidates, Array("id")) {
>>>>        (solutionset, workset) =>
>>>>        val currentWidth = workset filter (new
>>>> RichFilterFunction[Vector]{
>>>>          def filter(x: Vector) = x.id ==
>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>        })
>>>>
>>>>       val kernelVector = getKernelVector(X, center, currentWidth)
>>>>
>>>>       val x1 = kernelVector dot residual map {x => x*x}
>>>>       val x2 = kernelVector dot kernelVector
>>>>
>>>>      val cost = (x1 / x2) neutralize
>>>>
>>>>
>>>>      (cost map (new RichMapFunction[Vector, Vector]{
>>>>        def map(x: Vector) = new
>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>      }),
>>>>      workset)
>>>>     }
>>>>
>>>> // todo: will not work
>>>> //val width = costs max(0)
>>>>
>>>> //val kernelVector = getKernelVector(X, center, width)
>>>>
>>>> //val x1 = kernelVector dot residual
>>>> //val x2 = kernelVector dot kernelVector
>>>> //val height = x1 / x2
>>>>     costs
>>>> }
>>>>
>>>
>>>
>>
>

Re: No Nested Iterations??? And where is the Nested Iteration?

Posted by Stephan Ewen <se...@apache.org>.
Hi!

I am looking into it right now...

Stephan


On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <
alber.maximilian@gmail.com> wrote:

> Hi Stephan,
>
> you already had time to investigate this issue?
>
> Cheers,
> Max
>
> On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hey!
>>
>> Clearly, this looks like a bug. Let me investigate that and get back at
>> you later...
>>
>> Greetings,
>> Stephan
>>
>>
>> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
>> alber.maximilian@gmail.com> wrote:
>>
>>> Hi Flinksters!
>>>
>>> First some good news: the cumsum code from the last issue works now
>>> correctly and is tested.
>>>
>>> Bad news (at least for me): I just run into this (for the error and code
>>> see below). You have a road map when this feature will be available?
>>> Regardless of the rest, I would need it in the near future.
>>>
>>> So far so good. But I wonder where this nested iteration should be. At
>>> least I do not see them... I have an iteration and inside a lot of
>>> filters/maps/etc. but not another iteration.
>>>
>>> Cheers,
>>> Max
>>>
>>> Error:
>>>
>>> org.apache.flink.compiler.CompilerException: An error occurred while
>>> translating the optimized plan to a nephele JobGraph: An error occurred
>>> while translating the optimized plan to a nephele JobGraph: Nested
>>> Iterations are not possible at the moment!
>>> at
>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>> at
>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>> at
>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>> at
>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>>> at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>>> at org.apache.flink.client.program.Client.run(Client.java:290)
>>> at org.apache.flink.client.program.Client.run(Client.java:285)
>>> at org.apache.flink.client.program.Client.run(Client.java:230)
>>> at
>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>> at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>> Caused by: org.apache.flink.compiler.CompilerException: An error
>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>> Iterations are not possible at the moment!
>>> at
>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>> at
>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>> at
>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>> at
>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>> at
>>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>>> at
>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>>> ... 14 more
>>> Caused by: org.apache.flink.compiler.CompilerException: Nested
>>> Iterations are not possible at the moment!
>>> at
>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>>> ... 33 more
>>>
>>> Code:
>>>
>>> def createPlanFirstIteration(env: ExecutionEnvironment) = {
>>>     val X = env readTextFile config.xFile map
>>> {Vector.parseFromString(config.dimensions, _)}
>>>     val residual = env readTextFile config.yFile map
>>> {Vector.parseFromString(_)}
>>>     val randoms = env readTextFile config.randomFile map
>>> {Vector.parseFromString(_)}
>>>     val widthCandidates = env readTextFile config.widthCandidatesFile
>>> map {Vector.parseFromString(config.dimensions, _)}
>>>
>>>     val center = calcCenter(env, X, residual, randoms, 0)
>>>
>>>     val x = calcWidthHeight(env, X, residual, widthCandidates, center)
>>>
>>>     x map { _ toString } writeAsText config.outFile
>>> }
>>>
>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual:
>>> DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector]
>>> = {
>>>     val residual_2 = residual * residual
>>>     val ys = (residual_2 sumV) * (randoms filter {_.id == iteration}
>>> neutralize)
>>>
>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>     val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>>>     val cumSum = emptyDataSet.iterateDelta(sumVector union residual_2,
>>> config.N+1, Array("id")) {
>>>       (solutionset, workset) =>
>>>       val current = workset filter (new RichFilterFunction[Vector]{
>>>         def filter(x: Vector) = x.id ==
>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>        })
>>>       val old_sum = workset filter {_.id == -1}
>>>       val sum = VectorDataSet.add(old_sum, current.neutralize())
>>>
>>>       val new_workset = workset filter {_.id != -1} union sum
>>>        (sum map (new RichMapFunction[Vector, Vector]{
>>>           def map(x: Vector) = new
>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>        }),
>>>       new_workset)
>>>      }
>>>    val index = cumSum.filter(new RichFilterFunction[Vector](){
>>>       var y: Vector = null
>>>      override def open(config: Configuration) = {
>>>         y = getRuntimeContext.getBroadcastVariable("ys").toList.head
>>>      }
>>>      def filter(x: Vector) = x.values(0) < y.values(0)
>>>    }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0
>>>
>>>     val center = X.filter(new RichFilterFunction[Vector](){
>>>     var index: Int = -1
>>>     override def open(config: Configuration) = {
>>>       val x: Tuple1[Int] =
>>> getRuntimeContext.getBroadcastVariable("index").toList.head
>>>       index = x._1
>>>        }
>>>       def filter(x: Vector) = x.id == index
>>>     }).withBroadcastSet(index, "index")
>>>
>>>     center neutralize
>>> }
>>>
>>> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector], width:
>>> DataSet[Vector]): DataSet[Vector] = {
>>>     X.map(new RichMapFunction[Vector, Vector]{
>>>       var center: Vector = null
>>>       var width: Vector = null
>>>       override def open(config: Configuration) = {
>>>        center =
>>> getRuntimeContext.getBroadcastVariable("center").toList.head
>>>        width =
>>> getRuntimeContext.getBroadcastVariable("width").toList.head
>>>      }
>>>
>>>     def map(x: Vector) = new Vector(x.id,
>>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>>>     }).withBroadcastSet(center, "center").withBroadcastSet(width,
>>> "width")
>>> }
>>>
>>>
>>> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
>>> residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
>>> DataSet[Vector]): DataSet[Vector] = {
>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>     val costs = emptyDataSet.iterateDelta(widthCandidates,
>>> config.NWidthCandidates, Array("id")) {
>>>        (solutionset, workset) =>
>>>        val currentWidth = workset filter (new RichFilterFunction[Vector]{
>>>          def filter(x: Vector) = x.id ==
>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>        })
>>>
>>>       val kernelVector = getKernelVector(X, center, currentWidth)
>>>
>>>       val x1 = kernelVector dot residual map {x => x*x}
>>>       val x2 = kernelVector dot kernelVector
>>>
>>>      val cost = (x1 / x2) neutralize
>>>
>>>
>>>      (cost map (new RichMapFunction[Vector, Vector]{
>>>        def map(x: Vector) = new
>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>      }),
>>>      workset)
>>>     }
>>>
>>> // todo: will not work
>>> //val width = costs max(0)
>>>
>>> //val kernelVector = getKernelVector(X, center, width)
>>>
>>> //val x1 = kernelVector dot residual
>>> //val x2 = kernelVector dot kernelVector
>>> //val height = x1 / x2
>>>     costs
>>> }
>>>
>>
>>
>