You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Maximilian Alber <al...@gmail.com> on 2014/11/13 12:32:31 UTC

The given strategy does not work on two inputs.

Hi Flinksters!

The current stable Flink compiler rejects my plan. But I don't have a clue
why. The causing line of code is marked:

Code:

def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
DataSet[Vector]): DataSet[Vector] = {
    val emptyDataSet = env.fromCollection[Vector](Seq())
    val costs = emptyDataSet.iterateDelta(widthCandidates,
config.NWidthCandidates, Array("id")) {
      (solutionset, workset) =>
      val currentWidth = workset filter (new RichFilterFunction[Vector]{
       def filter(x: Vector) = x.id ==
(getIterationRuntimeContext.getSuperstepNumber-1)
      })

     val kernelVector = getKernelVector(X, center, currentWidth)

     val x1 = kernelVector dot residual map {x => x*x}
     val x2 = kernelVector dot kernelVector

     val cost = (x1 / x2) neutralize


     (cost map (new RichMapFunction[Vector, Vector]{
      def map(x: Vector) = new
Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
     }),
     workset)
    }

    val maxCost = costs max(0)
>>>>>>>>>>>>>>>>>>>>>>>>>
    val width = maxCost join widthCandidates where "id" equalTo "id" map {x
=> x._2}
>>>>>>>>>>>>>>>>>>>>>>>>>

    //val kernelVector = getKernelVector(X, center, width)

    //val x1 = kernelVector dot residual
    //val x2 = kernelVector dot kernelVector
    //val height = x1 / x2
    //costs
    width
}


The error message is:

java.lang.IllegalArgumentException: The given strategy does not work on two
inputs.
at
org.apache.flink.runtime.operators.DriverStrategy.secondDam(DriverStrategy.java:164)
at
org.apache.flink.compiler.plan.DualInputPlanNode.hasDamOnPathDownTo(DualInputPlanNode.java:224)
at
org.apache.flink.compiler.plan.WorksetIterationPlanNode.hasDamOnPathDownTo(WorksetIterationPlanNode.java:172)
at
org.apache.flink.compiler.plan.SingleInputPlanNode.hasDamOnPathDownTo(SingleInputPlanNode.java:235)
at
org.apache.flink.compiler.dag.TwoInputNode.placePipelineBreakersIfNecessary(TwoInputNode.java:585)
at
org.apache.flink.compiler.dag.TwoInputNode.instantiate(TwoInputNode.java:546)
at
org.apache.flink.compiler.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:497)
at
org.apache.flink.compiler.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:431)
at
org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
at
org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
at
org.apache.flink.compiler.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:194)
at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:561)
at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:466)
at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:196)
at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:209)
at org.apache.flink.client.program.Client.run(Client.java:285)
at org.apache.flink.client.program.Client.run(Client.java:230)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)

Thanks!
Cheers,
Max

Re: The given strategy does not work on two inputs.

Posted by Stephan Ewen <se...@apache.org>.
Let us have a look...

On Thu, Nov 13, 2014 at 12:32 PM, Maximilian Alber <
alber.maximilian@gmail.com> wrote:

> Hi Flinksters!
>
> The current stable Flink compiler rejects my plan. But I don't have a clue
> why. The causing line of code is marked:
>
> Code:
>
> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
> residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
> DataSet[Vector]): DataSet[Vector] = {
>     val emptyDataSet = env.fromCollection[Vector](Seq())
>     val costs = emptyDataSet.iterateDelta(widthCandidates,
> config.NWidthCandidates, Array("id")) {
>       (solutionset, workset) =>
>       val currentWidth = workset filter (new RichFilterFunction[Vector]{
>        def filter(x: Vector) = x.id ==
> (getIterationRuntimeContext.getSuperstepNumber-1)
>       })
>
>      val kernelVector = getKernelVector(X, center, currentWidth)
>
>      val x1 = kernelVector dot residual map {x => x*x}
>      val x2 = kernelVector dot kernelVector
>
>      val cost = (x1 / x2) neutralize
>
>
>      (cost map (new RichMapFunction[Vector, Vector]{
>       def map(x: Vector) = new
> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>      }),
>      workset)
>     }
>
>     val maxCost = costs max(0)
> >>>>>>>>>>>>>>>>>>>>>>>>>
>     val width = maxCost join widthCandidates where "id" equalTo "id" map
> {x => x._2}
> >>>>>>>>>>>>>>>>>>>>>>>>>
>
>     //val kernelVector = getKernelVector(X, center, width)
>
>     //val x1 = kernelVector dot residual
>     //val x2 = kernelVector dot kernelVector
>     //val height = x1 / x2
>     //costs
>     width
> }
>
>
> The error message is:
>
> java.lang.IllegalArgumentException: The given strategy does not work on
> two inputs.
> at
> org.apache.flink.runtime.operators.DriverStrategy.secondDam(DriverStrategy.java:164)
> at
> org.apache.flink.compiler.plan.DualInputPlanNode.hasDamOnPathDownTo(DualInputPlanNode.java:224)
> at
> org.apache.flink.compiler.plan.WorksetIterationPlanNode.hasDamOnPathDownTo(WorksetIterationPlanNode.java:172)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.hasDamOnPathDownTo(SingleInputPlanNode.java:235)
> at
> org.apache.flink.compiler.dag.TwoInputNode.placePipelineBreakersIfNecessary(TwoInputNode.java:585)
> at
> org.apache.flink.compiler.dag.TwoInputNode.instantiate(TwoInputNode.java:546)
> at
> org.apache.flink.compiler.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:497)
> at
> org.apache.flink.compiler.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:431)
> at
> org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
> at
> org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
> at
> org.apache.flink.compiler.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:194)
> at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:561)
> at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:466)
> at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:196)
> at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:209)
> at org.apache.flink.client.program.Client.run(Client.java:285)
> at org.apache.flink.client.program.Client.run(Client.java:230)
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>
> Thanks!
> Cheers,
> Max
>