You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Maximilian Alber <al...@gmail.com> on 2014/12/09 11:21:37 UTC
It is currently not supported to union between dynamic and static
path in an iteration.
Hi flinksters,
I'm really close to the end (at least I hope so), but I still have some
issues.
Writing my final loop I got this error:
org.apache.flink.compiler.CompilerException: An error occurred while
translating the optimized plan to a nephele JobGraph: An error occurred
while translating the optimized plan to a nephele JobGraph: Error: It is
currently not supported to union between dynamic and static path in an
iteration.
at
org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:567)
at
org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:96)
at
org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
at
org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at
org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
I guess that I should use just the loop step set to create the next step
set?
Here is the code:
def createPlanFullIterations(env: ExecutionEnvironment) = {
val tmp = env readTextFile config.inFile map
{Vector.parseFromSVMLightString (config.dimensions, _)}
val X = tmp map {_._1}
var residual = tmp map {_._2}
val randoms = env readTextFile config.randomFile map
{Vector.parseFromString(_)}
val widthCandidates: DataSet[Vector] = if(config.widthCandidatesFile !=
null)
env readTextFile config.widthCandidatesFile map
{Vector.parseFromString(config.dimensions, _)}
else
null
val emptyDataSet = env.fromCollection[Vector](Seq())
val model = emptyDataSet.iterate(config.iterations){
stepSet =>
val center = calcCenter(env, X, residual, randoms, -1)
val x = calcWidthHeight(env, X, residual, widthCandidates, center)
val width = x._1
val height = x._2
residual = residual - (getKernelVector(X, center, width).map(new
RichMapFunction[Vector, Vector]{
var height: Vector = null
override def open(config: Configuration) = {
height =
getRuntimeContext.getBroadcastVariable("height").toList.head
}
def map(x: Vector) = {x * height}
}).withBroadcastSet(height, "height"))
val centerOut = center map {x => new Vector(0, x.values)}
val widthOut = width map {x => new Vector(1, x.values)}
val heightOut = height map {x => new Vector(2, x.values)}
val stepModel = centerOut union widthOut union height
stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{
def map(x: Vector) = new
Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id, x.values)
}))
}
model map { _ toString } writeAsText config.outFile
}
thanks!
Cheers,
Max
Re: It is currently not supported to union between dynamic and static
path in an iteration.
Posted by Maximilian Alber <al...@gmail.com>.
Thanks!
On Wed, Dec 10, 2014 at 4:35 PM, Stephan Ewen <se...@apache.org> wrote:
> Hi!
>
> Yes, that is a known limitation. It is sort of straightforward to resolve,
> but will take a bit of time. I will try and get to it until the end of the
> week.
>
> Stephan
>
>
> On Tue, Dec 9, 2014 at 11:21 AM, Maximilian Alber <
> alber.maximilian@gmail.com> wrote:
>
>> Hi flinksters,
>>
>> I'm really close to the end (at least I hope so), but I still have some
>> issues.
>> Writing my final loop I got this error:
>>
>> org.apache.flink.compiler.CompilerException: An error occurred while
>> translating the optimized plan to a nephele JobGraph: An error occurred
>> while translating the optimized plan to a nephele JobGraph: Error: It is
>> currently not supported to union between dynamic and static path in an
>> iteration.
>> at
>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:567)
>> at
>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:96)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>
>> I guess that I should use just the loop step set to create the next step
>> set?
>>
>> Here is the code:
>>
>> def createPlanFullIterations(env: ExecutionEnvironment) = {
>> val tmp = env readTextFile config.inFile map
>> {Vector.parseFromSVMLightString (config.dimensions, _)}
>> val X = tmp map {_._1}
>> var residual = tmp map {_._2}
>> val randoms = env readTextFile config.randomFile map
>> {Vector.parseFromString(_)}
>> val widthCandidates: DataSet[Vector] = if(config.widthCandidatesFile
>> != null)
>> env readTextFile config.widthCandidatesFile map
>> {Vector.parseFromString(config.dimensions, _)}
>> else
>> null
>>
>> val emptyDataSet = env.fromCollection[Vector](Seq())
>> val model = emptyDataSet.iterate(config.iterations){
>> stepSet =>
>> val center = calcCenter(env, X, residual, randoms, -1)
>>
>> val x = calcWidthHeight(env, X, residual, widthCandidates, center)
>> val width = x._1
>> val height = x._2
>>
>> residual = residual - (getKernelVector(X, center, width).map(new
>> RichMapFunction[Vector, Vector]{
>> var height: Vector = null
>> override def open(config: Configuration) = {
>> height =
>> getRuntimeContext.getBroadcastVariable("height").toList.head
>> }
>> def map(x: Vector) = {x * height}
>> }).withBroadcastSet(height, "height"))
>>
>> val centerOut = center map {x => new Vector(0, x.values)}
>> val widthOut = width map {x => new Vector(1, x.values)}
>> val heightOut = height map {x => new Vector(2, x.values)}
>> val stepModel = centerOut union widthOut union height
>>
>> stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{
>> def map(x: Vector) = new
>> Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id,
>> x.values)
>> }))
>> }
>>
>> model map { _ toString } writeAsText config.outFile
>> }
>>
>>
>> thanks!
>> Cheers,
>> Max
>>
>
>
Re: It is currently not supported to union between dynamic and static
path in an iteration.
Posted by Stephan Ewen <se...@apache.org>.
Hi!
Yes, that is a known limitation. It is sort of straightforward to resolve,
but will take a bit of time. I will try and get to it until the end of the
week.
Stephan
On Tue, Dec 9, 2014 at 11:21 AM, Maximilian Alber <
alber.maximilian@gmail.com> wrote:
> Hi flinksters,
>
> I'm really close to the end (at least I hope so), but I still have some
> issues.
> Writing my final loop I got this error:
>
> org.apache.flink.compiler.CompilerException: An error occurred while
> translating the optimized plan to a nephele JobGraph: An error occurred
> while translating the optimized plan to a nephele JobGraph: Error: It is
> currently not supported to union between dynamic and static path in an
> iteration.
> at
> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:567)
> at
> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:96)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>
> I guess that I should use just the loop step set to create the next step
> set?
>
> Here is the code:
>
> def createPlanFullIterations(env: ExecutionEnvironment) = {
> val tmp = env readTextFile config.inFile map
> {Vector.parseFromSVMLightString (config.dimensions, _)}
> val X = tmp map {_._1}
> var residual = tmp map {_._2}
> val randoms = env readTextFile config.randomFile map
> {Vector.parseFromString(_)}
> val widthCandidates: DataSet[Vector] = if(config.widthCandidatesFile !=
> null)
> env readTextFile config.widthCandidatesFile map
> {Vector.parseFromString(config.dimensions, _)}
> else
> null
>
> val emptyDataSet = env.fromCollection[Vector](Seq())
> val model = emptyDataSet.iterate(config.iterations){
> stepSet =>
> val center = calcCenter(env, X, residual, randoms, -1)
>
> val x = calcWidthHeight(env, X, residual, widthCandidates, center)
> val width = x._1
> val height = x._2
>
> residual = residual - (getKernelVector(X, center, width).map(new
> RichMapFunction[Vector, Vector]{
> var height: Vector = null
> override def open(config: Configuration) = {
> height =
> getRuntimeContext.getBroadcastVariable("height").toList.head
> }
> def map(x: Vector) = {x * height}
> }).withBroadcastSet(height, "height"))
>
> val centerOut = center map {x => new Vector(0, x.values)}
> val widthOut = width map {x => new Vector(1, x.values)}
> val heightOut = height map {x => new Vector(2, x.values)}
> val stepModel = centerOut union widthOut union height
>
> stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{
> def map(x: Vector) = new
> Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id, x.values)
> }))
> }
>
> model map { _ toString } writeAsText config.outFile
> }
>
>
> thanks!
> Cheers,
> Max
>