You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Maximilian Alber <al...@gmail.com> on 2014/11/13 12:32:31 UTC
The given strategy does not work on two inputs.
Hi Flinksters!
The current stable Flink compiler rejects my plan. But I don't have a clue
why. The causing line of code is marked:
Code:
def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
DataSet[Vector]): DataSet[Vector] = {
val emptyDataSet = env.fromCollection[Vector](Seq())
val costs = emptyDataSet.iterateDelta(widthCandidates,
config.NWidthCandidates, Array("id")) {
(solutionset, workset) =>
val currentWidth = workset filter (new RichFilterFunction[Vector]{
def filter(x: Vector) = x.id ==
(getIterationRuntimeContext.getSuperstepNumber-1)
})
val kernelVector = getKernelVector(X, center, currentWidth)
val x1 = kernelVector dot residual map {x => x*x}
val x2 = kernelVector dot kernelVector
val cost = (x1 / x2) neutralize
(cost map (new RichMapFunction[Vector, Vector]{
def map(x: Vector) = new
Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
}),
workset)
}
val maxCost = costs max(0)
>>>>>>>>>>>>>>>>>>>>>>>>>
val width = maxCost join widthCandidates where "id" equalTo "id" map {x
=> x._2}
>>>>>>>>>>>>>>>>>>>>>>>>>
//val kernelVector = getKernelVector(X, center, width)
//val x1 = kernelVector dot residual
//val x2 = kernelVector dot kernelVector
//val height = x1 / x2
//costs
width
}
The error message is:
java.lang.IllegalArgumentException: The given strategy does not work on two
inputs.
at
org.apache.flink.runtime.operators.DriverStrategy.secondDam(DriverStrategy.java:164)
at
org.apache.flink.compiler.plan.DualInputPlanNode.hasDamOnPathDownTo(DualInputPlanNode.java:224)
at
org.apache.flink.compiler.plan.WorksetIterationPlanNode.hasDamOnPathDownTo(WorksetIterationPlanNode.java:172)
at
org.apache.flink.compiler.plan.SingleInputPlanNode.hasDamOnPathDownTo(SingleInputPlanNode.java:235)
at
org.apache.flink.compiler.dag.TwoInputNode.placePipelineBreakersIfNecessary(TwoInputNode.java:585)
at
org.apache.flink.compiler.dag.TwoInputNode.instantiate(TwoInputNode.java:546)
at
org.apache.flink.compiler.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:497)
at
org.apache.flink.compiler.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:431)
at
org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
at
org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
at
org.apache.flink.compiler.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:194)
at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:561)
at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:466)
at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:196)
at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:209)
at org.apache.flink.client.program.Client.run(Client.java:285)
at org.apache.flink.client.program.Client.run(Client.java:230)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Thanks!
Cheers,
Max
Re: The given strategy does not work on two inputs.
Posted by Stephan Ewen <se...@apache.org>.
Let us have a look...
On Thu, Nov 13, 2014 at 12:32 PM, Maximilian Alber <
alber.maximilian@gmail.com> wrote:
> Hi Flinksters!
>
> The current stable Flink compiler rejects my plan. But I don't have a clue
> why. The causing line of code is marked:
>
> Code:
>
> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
> residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
> DataSet[Vector]): DataSet[Vector] = {
> val emptyDataSet = env.fromCollection[Vector](Seq())
> val costs = emptyDataSet.iterateDelta(widthCandidates,
> config.NWidthCandidates, Array("id")) {
> (solutionset, workset) =>
> val currentWidth = workset filter (new RichFilterFunction[Vector]{
> def filter(x: Vector) = x.id ==
> (getIterationRuntimeContext.getSuperstepNumber-1)
> })
>
> val kernelVector = getKernelVector(X, center, currentWidth)
>
> val x1 = kernelVector dot residual map {x => x*x}
> val x2 = kernelVector dot kernelVector
>
> val cost = (x1 / x2) neutralize
>
>
> (cost map (new RichMapFunction[Vector, Vector]{
> def map(x: Vector) = new
> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
> }),
> workset)
> }
>
> val maxCost = costs max(0)
> >>>>>>>>>>>>>>>>>>>>>>>>>
> val width = maxCost join widthCandidates where "id" equalTo "id" map
> {x => x._2}
> >>>>>>>>>>>>>>>>>>>>>>>>>
>
> //val kernelVector = getKernelVector(X, center, width)
>
> //val x1 = kernelVector dot residual
> //val x2 = kernelVector dot kernelVector
> //val height = x1 / x2
> //costs
> width
> }
>
>
> The error message is:
>
> java.lang.IllegalArgumentException: The given strategy does not work on
> two inputs.
> at
> org.apache.flink.runtime.operators.DriverStrategy.secondDam(DriverStrategy.java:164)
> at
> org.apache.flink.compiler.plan.DualInputPlanNode.hasDamOnPathDownTo(DualInputPlanNode.java:224)
> at
> org.apache.flink.compiler.plan.WorksetIterationPlanNode.hasDamOnPathDownTo(WorksetIterationPlanNode.java:172)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.hasDamOnPathDownTo(SingleInputPlanNode.java:235)
> at
> org.apache.flink.compiler.dag.TwoInputNode.placePipelineBreakersIfNecessary(TwoInputNode.java:585)
> at
> org.apache.flink.compiler.dag.TwoInputNode.instantiate(TwoInputNode.java:546)
> at
> org.apache.flink.compiler.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:497)
> at
> org.apache.flink.compiler.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:431)
> at
> org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
> at
> org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
> at
> org.apache.flink.compiler.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:194)
> at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:561)
> at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:466)
> at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:196)
> at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:209)
> at org.apache.flink.client.program.Client.run(Client.java:285)
> at org.apache.flink.client.program.Client.run(Client.java:230)
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>
> Thanks!
> Cheers,
> Max
>