You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Stephan Ewen <se...@apache.org> on 2014/11/10 17:13:26 UTC

Re: Forced to use Solution Set in Step Function

Hi!

The 0.7.0 release should have fixed that problem. Have you had a chance to
try that out?

Greetings,
Stephan


On Wed, Oct 15, 2014 at 3:43 PM, Maximilian Alber <
alber.maximilian@gmail.com> wrote:

> @Aljoscha Sorry, I just tried my workaround. There are some minor
> conceptual bugs (caused by the id's...). I attached the new version.
> Unfortunately there the compiler breaks. An issue is already open.
>
> On Wed, Oct 15, 2014 at 11:53 AM, Maximilian Alber <
> alber.maximilian@gmail.com> wrote:
>
>> Ok.
>>
>> Here is a input variant:
>> flink run -v bump_boost-0.1.jar -c bumpboost.Job x_file=X y_file=Y
>> out_file=/tmp/tmpnWYamw random_file=random_file dimensions=1 N=100
>> width_candidates_file=/tmp/tmpTJ4LDh iterations=30 multi_bump_boost=0
>> gradient_descent_iterations=30 cache=False min_width=-4 max_width=6
>> min_width_update=1e-08 max_width_update=10
>>
>> width_candidates_file is not needed by now. X, Y and random_file are
>> attached.
>>
>> If you have problems, running it, let me know!
>> Thanks!
>>
>> On Tue, Oct 14, 2014 at 10:40 PM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Transferring to variables: Unfortunately not possible right now but we
>>> are working on it.
>>>
>>> On Tue, Oct 14, 2014 at 8:53 PM, Maximilian Alber
>>> <al...@gmail.com> wrote:
>>> > @ Stephan: Thanks! So I gonna switch!
>>> >
>>> > Sorry, my bad. I will provide you some sample by tomorrow morning.
>>> >
>>> > Yes. Workaround, because I cannot transfer them into variables, can I
>>> by now
>>> > (or will I ever)?
>>> >
>>> > Maybe some explanation to my solution:
>>> > - X is for my a matrix of shape (N, d). Modeled in Flink as dataset of
>>> > vectors. Each Vector has an ID which is the row number and an array
>>> with
>>> > numbers, the actual row.
>>> > - Y is for my a matrix of shape (N, 1) thus actually a column-vector.
>>> > - old_sum is either a scalar if d == 1 or a row-vector aka matrix of
>>> shape
>>> > (1, N) or a Dataset with one Vector. (By now I have the convention to
>>> give
>>> > id -1 to them, comes from a former workaround...)
>>> >
>>> > The whole ID story comes from the fact that I need to know which stuff
>>> > belongs together in mathematical operations (see my zip function). You
>>> can
>>> > look that up in util.scala, that's kind of my math library. I don't
>>> want to
>>> > imagine the mess in Java :)
>>> >
>>> > Cheers
>>> > Max
>>> >
>>> >
>>> >
>>> > On Tue, Oct 14, 2014 at 6:28 PM, Aljoscha Krettek <aljoscha@apache.org
>>> >
>>> > wrote:
>>> >>
>>> >> Could you maybe also give some examples for the input expected by your
>>> >> program?
>>> >>
>>> >> Also, the residual DataSet contains several Vectors while the sum (or
>>> >> old_sum) DataSet is always only contains 1 Vector. Correct?
>>> >>
>>> >> On Tue, Oct 14, 2014 at 6:04 PM, Stephan Ewen <se...@apache.org>
>>> wrote:
>>> >> > BTW: The current master allows you to not join with the solution
>>> set,
>>> >> > and
>>> >> > only use it to accumulate data.
>>> >> >
>>> >> > On Tue, Oct 14, 2014 at 5:29 PM, Maximilian Alber
>>> >> > <al...@gmail.com> wrote:
>>> >> >>
>>> >> >> Ok, that's possible too.
>>> >> >>
>>> >> >> VectorDataSet is just scala magic to ease my life (See below). If
>>> you
>>> >> >> want
>>> >> >> to take a look, I appended the package. The main code is in
>>> >> >> BumpBoost.scala.
>>> >> >> In util.scala is the vector stuff.
>>> >> >> Thanks!
>>> >> >>
>>> >> >> class VectorDataSet(X: DataSet[Vector]){
>>> >> >> def + (Y: DataSet[Vector]) = VectorDataSet.add(X, Y)
>>> >> >> def - (Y: DataSet[Vector]) = VectorDataSet.subt(X, Y)
>>> >> >> def * (Y: DataSet[Vector]) = VectorDataSet.mult(X, Y)
>>> >> >> def / (Y: DataSet[Vector]) = VectorDataSet.div(X, Y)
>>> >> >>
>>> >> >> def zip(Y: DataSet[Vector]) = VectorDataSet.zip(X, Y)
>>> >> >> def dot(Y: DataSet[Vector]) = VectorDataSet.dot(X, Y)
>>> >> >> def sumV() = VectorDataSet.sumV(X)
>>> >> >> }
>>> >> >> object VectorDataSet {
>>> >> >> def add(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map
>>> {x =>
>>> >> >> x._1 + x._2}
>>> >> >> def subt(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map
>>> {x
>>> >> >> =>
>>> >> >> x._1 - x._2}
>>> >> >> def mult(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map
>>> {x
>>> >> >> =>
>>> >> >> x._1 * x._2}
>>> >> >> def div(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map
>>> {x =>
>>> >> >> x._1 / x._2}
>>> >> >>
>>> >> >> def zip(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 join X2
>>> where
>>> >> >> "id"
>>> >> >> equalTo "id"
>>> >> >> def dot(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 * X2 sumV
>>> >> >> def sumV(X1: DataSet[Vector]) = X1 reduce {_ + _}
>>> >> >>
>>> >> >> implicit def vectorDataSetWrapper(ds: DataSet[Vector]) = new
>>> >> >> VectorDataSet(ds)
>>> >> >> }
>>> >> >>
>>> >> >>
>>> >> >> On Tue, Oct 14, 2014 at 5:21 PM, Aljoscha Krettek <
>>> aljoscha@apache.org>
>>> >> >> wrote:
>>> >> >>>
>>> >> >>> Maybe you could use the residual_2 data set as a broadcast
>>> dataset.
>>> >> >>> i.e. make in available in the operation that adds the residual
>>> for the
>>> >> >>> current iteration number to the old_sum. (I'm not sure what the
>>> >> >>> VectorDataSet.add() method does here). If you gave me the complete
>>> >> >>> code I could try finding an elegant solution to that problem.
>>> >> >>>
>>> >> >>> On Tue, Oct 14, 2014 at 5:15 PM, Stephan Ewen <se...@apache.org>
>>> >> >>> wrote:
>>> >> >>> > That is an interesting case. Everything that is loop invariant
>>> is
>>> >> >>> > computed
>>> >> >>> > once outside the loop. You are looking for a way to make this
>>> part
>>> >> >>> > of
>>> >> >>> > the
>>> >> >>> > loop.
>>> >> >>> >
>>> >> >>> > Can you try making the filter part of the
>>> >> >>> > "VectorDataSet.add(old_sum,
>>> >> >>> > current)" operation?
>>> >> >>> >
>>> >> >>> > On Tue, Oct 14, 2014 at 5:05 PM, Maximilian Alber
>>> >> >>> > <al...@gmail.com> wrote:
>>> >> >>> >>
>>> >> >>> >> The deltaiteration calculates the cumulative sum (prefix sum)
>>> of
>>> >> >>> >> residual_2.
>>> >> >>> >>
>>> >> >>> >> I'm not sure if got you idea. I could add residual_2 to the
>>> >> >>> >> workset.
>>> >> >>> >> But
>>> >> >>> >> in that case I need to separate the "old_sum"(the sum up to
>>> now)
>>> >> >>> >> and
>>> >> >>> >> residual_2 each iteration. Actually I had that before, then I
>>> tried
>>> >> >>> >> the
>>> >> >>> >> Scala closure to clean up the code.
>>> >> >>> >>
>>> >> >>> >> On Tue, Oct 14, 2014 at 4:57 PM, Fabian Hueske <
>>> fhueske@apache.org>
>>> >> >>> >> wrote:
>>> >> >>> >>>
>>> >> >>> >>> Jep, I see you point.
>>> >> >>> >>> Conceptually, all data that changes and affects the result of
>>> an
>>> >> >>> >>> iteration should be part of the workset.
>>> >> >>> >>> Hence, the model kind of assumes that the datum
>>> "superStepNumber"
>>> >> >>> >>> should
>>> >> >>> >>> be part of the workset.
>>> >> >>> >>>
>>> >> >>> >>> I am not familiar with your application, but would it make
>>> sense
>>> >> >>> >>> to
>>> >> >>> >>> add
>>> >> >>> >>> the number as an additional attribute to the workset data set
>>> and
>>> >> >>> >>> increase
>>> >> >>> >>> it manually?
>>> >> >>> >>>
>>> >> >>> >>> 2014-10-14 16:45 GMT+02:00 Maximilian Alber
>>> >> >>> >>> <al...@gmail.com>:
>>> >> >>> >>>>
>>> >> >>> >>>> Ok, sounds true, but somehow I would like to execute it
>>> inside of
>>> >> >>> >>>> it. So
>>> >> >>> >>>> I probably need to do some nonsense work to make it part of
>>> it?
>>> >> >>> >>>>
>>> >> >>> >>>> On Tue, Oct 14, 2014 at 4:36 PM, Aljoscha Krettek
>>> >> >>> >>>> <al...@apache.org>
>>> >> >>> >>>> wrote:
>>> >> >>> >>>>>
>>> >> >>> >>>>> Dammit you beat me to it. But yes, this is exactly what I
>>> was
>>> >> >>> >>>>> just
>>> >> >>> >>>>> writing.
>>> >> >>> >>>>>
>>> >> >>> >>>>> On Tue, Oct 14, 2014 at 4:35 PM, Fabian Hueske
>>> >> >>> >>>>> <fh...@apache.org>
>>> >> >>> >>>>> wrote:
>>> >> >>> >>>>> > Hi,
>>> >> >>> >>>>> >
>>> >> >>> >>>>> > I'm not super familiar with the iterations, but my guess
>>> would
>>> >> >>> >>>>> > be
>>> >> >>> >>>>> > that the
>>> >> >>> >>>>> > filter is not evaluated as part of the iteration.
>>> >> >>> >>>>> > Since it is not connect to the workset, the filter is not
>>> part
>>> >> >>> >>>>> > of
>>> >> >>> >>>>> > the
>>> >> >>> >>>>> > loop
>>> >> >>> >>>>> > and evaluated once outside where no superset number is
>>> >> >>> >>>>> > available.
>>> >> >>> >>>>> > I guess, moving the filter outside of the loop gives the
>>> same
>>> >> >>> >>>>> > error.
>>> >> >>> >>>>> >
>>> >> >>> >>>>> > Cheers, Fabian
>>> >> >>> >>>>> >
>>> >> >>> >>>>> >
>>> >> >>> >>>>> >
>>> >> >>> >>>>> > 2014-10-14 16:18 GMT+02:00 Maximilian Alber
>>> >> >>> >>>>> > <al...@gmail.com>:
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >> Hmm or maybe not. With this code I get some strange
>>> error:
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >> def createPlan_find_center(env: ExecutionEnvironment) = {
>>> >> >>> >>>>> >> val X = env readTextFile config.xFile map
>>> >> >>> >>>>> >> {Vector.parseFromString(config.dimensions, _)};
>>> >> >>> >>>>> >> val residual = env readTextFile config.yFile map
>>> >> >>> >>>>> >> {Vector.parseFromString(_)};
>>> >> >>> >>>>> >> val randoms = env readTextFile config.randomFile map
>>> >> >>> >>>>> >> {Vector.parseFromString(_)}
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >> val residual_2 = residual * residual
>>> >> >>> >>>>> >> val ys = (residual_2 sumV) * (randoms filter {_.id == 0})
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >> val emptyDataSet = env.fromCollection[Vector](Seq())
>>> >> >>> >>>>> >> val sumVector =
>>> >> >>> >>>>> >> env.fromCollection(Seq(Vector.zeros(config.dimensions)))
>>> >> >>> >>>>> >> val cumSum = emptyDataSet.iterateDelta(sumVector,
>>> config.N,
>>> >> >>> >>>>> >> Array("id")) {
>>> >> >>> >>>>> >>     (solutionset, old_sum) =>
>>> >> >>> >>>>> >>     val current = residual_2 filter (new
>>> >> >>> >>>>> >> RichFilterFunction[Vector]{
>>> >> >>> >>>>> >>       def filter(x: Vector) = x.id ==
>>> >> >>> >>>>> >> (getIterationRuntimeContext.getSuperstepNumber)
>>> >> >>> >>>>> >>     })
>>> >> >>> >>>>> >>     val sum = VectorDataSet.add(old_sum, current)
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>     (sum map (new RichMapFunction[Vector, Vector]{
>>> >> >>> >>>>> >>       def map(x: Vector) = new
>>> >> >>> >>>>> >> Vector(getIterationRuntimeContext.getSuperstepNumber,
>>> >> >>> >>>>> >> x.values)
>>> >> >>> >>>>> >>     }),
>>> >> >>> >>>>> >>     sum)
>>> >> >>> >>>>> >> }
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >> Error:
>>> >> >>> >>>>> >> 10/14/2014 15:57:35: Job execution switched to status
>>> RUNNING
>>> >> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1)
>>> switched to
>>> >> >>> >>>>> >> SCHEDULED
>>> >> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1)
>>> switched to
>>> >> >>> >>>>> >> DEPLOYING
>>> >> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>>> >> >>> >>>>> >> SCHEDULED
>>> >> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>>> >> >>> >>>>> >> DEPLOYING
>>> >> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>> >> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>>> >> >>> >>>>> >> UTF-8) -> Map
>>> (org.apache.flink.api.scala.DataSet$$anon$1)
>>> >> >>> >>>>> >> (1/1)
>>> >> >>> >>>>> >> switched to
>>> >> >>> >>>>> >> SCHEDULED
>>> >> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>> >> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>>> >> >>> >>>>> >> UTF-8) -> Map
>>> (org.apache.flink.api.scala.DataSet$$anon$1)
>>> >> >>> >>>>> >> (1/1)
>>> >> >>> >>>>> >> switched to
>>> >> >>> >>>>> >> DEPLOYING
>>> >> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1)
>>> switched to
>>> >> >>> >>>>> >> RUNNING
>>> >> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration
>>> (Unnamed
>>> >> >>> >>>>> >> Delta
>>> >> >>> >>>>> >> Iteration)) (1/1) switched to SCHEDULED
>>> >> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration
>>> (Unnamed
>>> >> >>> >>>>> >> Delta
>>> >> >>> >>>>> >> Iteration)) (1/1) switched to DEPLOYING
>>> >> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>> >> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>>> >> >>> >>>>> >> UTF-8) -> Map
>>> (org.apache.flink.api.scala.DataSet$$anon$1)
>>> >> >>> >>>>> >> (1/1)
>>> >> >>> >>>>> >> switched to
>>> >> >>> >>>>> >> RUNNING
>>> >> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>>> >> >>> >>>>> >> RUNNING
>>> >> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration
>>> (Unnamed
>>> >> >>> >>>>> >> Delta
>>> >> >>> >>>>> >> Iteration)) (1/1) switched to RUNNING
>>> >> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >> >>> >>>>> >> ->
>>> >> >>> >>>>> >> Map
>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>> >> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched
>>> to
>>> >> >>> >>>>> >> SCHEDULED
>>> >> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >> >>> >>>>> >> ->
>>> >> >>> >>>>> >> Map
>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>> >> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched
>>> to
>>> >> >>> >>>>> >> DEPLOYING
>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >> >>> >>>>> >> ->
>>> >> >>> >>>>> >> Map
>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>> switched
>>> >> >>> >>>>> >> to
>>> >> >>> >>>>> >> SCHEDULED
>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >> >>> >>>>> >> ->
>>> >> >>> >>>>> >> Map
>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>> switched
>>> >> >>> >>>>> >> to
>>> >> >>> >>>>> >> DEPLOYING
>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >> >>> >>>>> >> ->
>>> >> >>> >>>>> >> Map
>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>> >> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched
>>> to
>>> >> >>> >>>>> >> RUNNING
>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >> >>> >>>>> >> ->
>>> >> >>> >>>>> >> Map
>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>> switched
>>> >> >>> >>>>> >> to
>>> >> >>> >>>>> >> RUNNING
>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >> >>> >>>>> >> ->
>>> >> >>> >>>>> >> Map
>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>> >> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched
>>> to
>>> >> >>> >>>>> >> FAILED
>>> >> >>> >>>>> >> java.lang.IllegalStateException: This stub is not part
>>> of an
>>> >> >>> >>>>> >> iteration
>>> >> >>> >>>>> >> step function.
>>> >> >>> >>>>> >> at
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>>> >> >>> >>>>> >> at
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>>> >> >>> >>>>> >> at
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>>> >> >>> >>>>> >> at
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>>> >> >>> >>>>> >> at
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>>> >> >>> >>>>> >> at
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>> >> >>> >>>>> >> at
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>>> >> >>> >>>>> >> at
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>>> >> >>> >>>>> >> at
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>> >> >>> >>>>> >> at
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>> >> >>> >>>>> >> at
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>> >> >>> >>>>> >> at
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>>> >> >>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status
>>> FAILING
>>> >> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to
>>> >> >>> >>>>> >> CANCELING
>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>>> >> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>>> >> >>> >>>>> >> UTF-8) -> Map
>>> (org.apache.flink.api.scala.DataSet$$anon$1)
>>> >> >>> >>>>> >> (1/1)
>>> >> >>> >>>>> >> switched to
>>> >> >>> >>>>> >> CANCELING
>>> >> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1)
>>> switched to
>>> >> >>> >>>>> >> CANCELING
>>> >> >>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration
>>> (Unnamed
>>> >> >>> >>>>> >> Delta
>>> >> >>> >>>>> >> Iteration)) (1/1) switched to CANCELING
>>> >> >>> >>>>> >> 10/14/2014 15:57:36: Map
>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
>>> >> >>> >>>>> >> (1/1) switched to CANCELED
>>> >> >>> >>>>> >> 10/14/2014 15:57:36: DataSink(TextOutputFormat
>>> >> >>> >>>>> >> (/tmp/tmplSYJ7S)
>>> >> >>> >>>>> >> -
>>> >> >>> >>>>> >> UTF-8)
>>> >> >>> >>>>> >> (1/1) switched to CANCELED
>>> >> >>> >>>>> >> 10/14/2014 15:57:36: Sync (WorksetIteration (Unnamed
>>> Delta
>>> >> >>> >>>>> >> Iteration))
>>> >> >>> >>>>> >> (1/1) switched to CANCELED
>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >> >>> >>>>> >> ->
>>> >> >>> >>>>> >> Map
>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>> switched
>>> >> >>> >>>>> >> to
>>> >> >>> >>>>> >> CANCELING
>>> >> >>> >>>>> >> 10/14/2014 15:57:36: Map
>>> >> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$2)
>>> >> >>> >>>>> >> (1/1)
>>> >> >>> >>>>> >> switched to CANCELED
>>> >> >>> >>>>> >> 10/14/2014 15:57:36: SolutionSet Delta (1/1) switched to
>>> >> >>> >>>>> >> CANCELED
>>> >> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to
>>> >> >>> >>>>> >> CANCELED
>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>>> >> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>>> >> >>> >>>>> >> UTF-8) -> Map
>>> (org.apache.flink.api.scala.DataSet$$anon$1)
>>> >> >>> >>>>> >> (1/1)
>>> >> >>> >>>>> >> switched to
>>> >> >>> >>>>> >> CANCELED
>>> >> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1)
>>> switched to
>>> >> >>> >>>>> >> CANCELED
>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >> >>> >>>>> >> ->
>>> >> >>> >>>>> >> Map
>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>> switched
>>> >> >>> >>>>> >> to
>>> >> >>> >>>>> >> CANCELED
>>> >> >>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration
>>> (Unnamed
>>> >> >>> >>>>> >> Delta
>>> >> >>> >>>>> >> Iteration)) (1/1) switched to CANCELED
>>> >> >>> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status
>>> FAILED
>>> >> >>> >>>>> >> Error: The program execution failed:
>>> >> >>> >>>>> >> java.lang.IllegalStateException: This
>>> >> >>> >>>>> >> stub is not part of an iteration step function.
>>> >> >>> >>>>> >> at
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>>> >> >>> >>>>> >> at
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>>> >> >>> >>>>> >> at
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>>> >> >>> >>>>> >> at
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>>> >> >>> >>>>> >> at
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>>> >> >>> >>>>> >> at
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>> >> >>> >>>>> >> at
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>>> >> >>> >>>>> >> at
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>>> >> >>> >>>>> >> at
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>> >> >>> >>>>> >> at
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>> >> >>> >>>>> >> at
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>> >> >>> >>>>> >> at
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >>
>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>>> >> >>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >> On Tue, Oct 14, 2014 at 3:32 PM, Maximilian Alber
>>> >> >>> >>>>> >> <al...@gmail.com> wrote:
>>> >> >>> >>>>> >>>
>>> >> >>> >>>>> >>> Should work now.
>>> >> >>> >>>>> >>> Cheers
>>> >> >>> >>>>> >>>
>>> >> >>> >>>>> >>> On Fri, Oct 10, 2014 at 3:38 PM, Maximilian Alber
>>> >> >>> >>>>> >>> <al...@gmail.com> wrote:
>>> >> >>> >>>>> >>>>
>>> >> >>> >>>>> >>>> Ok, thanks.
>>> >> >>> >>>>> >>>> Please let me know when it is fixed.
>>> >> >>> >>>>> >>>>
>>> >> >>> >>>>> >>>> Cheers
>>> >> >>> >>>>> >>>> Max
>>> >> >>> >>>>> >>>>
>>> >> >>> >>>>> >>>> On Fri, Oct 10, 2014 at 1:34 PM, Stephan Ewen
>>> >> >>> >>>>> >>>> <se...@apache.org>
>>> >> >>> >>>>> >>>> wrote:
>>> >> >>> >>>>> >>>>>
>>> >> >>> >>>>> >>>>> Thank you, I will look into that...
>>> >> >>> >>>>> >>>>
>>> >> >>> >>>>> >>>>
>>> >> >>> >>>>> >>>
>>> >> >>> >>>>> >>
>>> >> >>> >>>>> >
>>> >> >>> >>>>
>>> >> >>> >>>>
>>> >> >>> >>>
>>> >> >>> >>
>>> >> >>> >
>>> >> >>
>>> >> >>
>>> >> >
>>> >
>>> >
>>>
>>
>>
>

Re: Forced to use Solution Set in Step Function

Posted by Stephan Ewen <se...@apache.org>.
Good to heat that it worked. No need to try out the other version, the same
fix is in there...

On Tue, Nov 11, 2014 at 2:07 PM, Maximilian Alber <
alber.maximilian@gmail.com> wrote:

> Hi Stephan!
>
> It worked with the development snapshot. I never tried it with the new
> release. If you want me to try it out, just say so.
>
> Cheers,
> Max
>
> On Mon, Nov 10, 2014 at 5:13 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi!
>>
>> The 0.7.0 release should have fixed that problem. Have you had a chance
>> to try that out?
>>
>> Greetings,
>> Stephan
>>
>>
>> On Wed, Oct 15, 2014 at 3:43 PM, Maximilian Alber <
>> alber.maximilian@gmail.com> wrote:
>>
>>> @Aljoscha Sorry, I just tried my workaround. There are some minor
>>> conceptual bugs (caused by the id's...). I attached the new version.
>>> Unfortunately there the compiler breaks. An issue is already open.
>>>
>>> On Wed, Oct 15, 2014 at 11:53 AM, Maximilian Alber <
>>> alber.maximilian@gmail.com> wrote:
>>>
>>>> Ok.
>>>>
>>>> Here is a input variant:
>>>> flink run -v bump_boost-0.1.jar -c bumpboost.Job x_file=X y_file=Y
>>>> out_file=/tmp/tmpnWYamw random_file=random_file dimensions=1 N=100
>>>> width_candidates_file=/tmp/tmpTJ4LDh iterations=30 multi_bump_boost=0
>>>> gradient_descent_iterations=30 cache=False min_width=-4 max_width=6
>>>> min_width_update=1e-08 max_width_update=10
>>>>
>>>> width_candidates_file is not needed by now. X, Y and random_file are
>>>> attached.
>>>>
>>>> If you have problems, running it, let me know!
>>>> Thanks!
>>>>
>>>> On Tue, Oct 14, 2014 at 10:40 PM, Aljoscha Krettek <aljoscha@apache.org
>>>> > wrote:
>>>>
>>>>> Transferring to variables: Unfortunately not possible right now but we
>>>>> are working on it.
>>>>>
>>>>> On Tue, Oct 14, 2014 at 8:53 PM, Maximilian Alber
>>>>> <al...@gmail.com> wrote:
>>>>> > @ Stephan: Thanks! So I gonna switch!
>>>>> >
>>>>> > Sorry, my bad. I will provide you some sample by tomorrow morning.
>>>>> >
>>>>> > Yes. Workaround, because I cannot transfer them into variables, can
>>>>> I by now
>>>>> > (or will I ever)?
>>>>> >
>>>>> > Maybe some explanation to my solution:
>>>>> > - X is for my a matrix of shape (N, d). Modeled in Flink as dataset
>>>>> of
>>>>> > vectors. Each Vector has an ID which is the row number and an array
>>>>> with
>>>>> > numbers, the actual row.
>>>>> > - Y is for my a matrix of shape (N, 1) thus actually a column-vector.
>>>>> > - old_sum is either a scalar if d == 1 or a row-vector aka matrix of
>>>>> shape
>>>>> > (1, N) or a Dataset with one Vector. (By now I have the convention
>>>>> to give
>>>>> > id -1 to them, comes from a former workaround...)
>>>>> >
>>>>> > The whole ID story comes from the fact that I need to know which
>>>>> stuff
>>>>> > belongs together in mathematical operations (see my zip function).
>>>>> You can
>>>>> > look that up in util.scala, that's kind of my math library. I don't
>>>>> want to
>>>>> > imagine the mess in Java :)
>>>>> >
>>>>> > Cheers
>>>>> > Max
>>>>> >
>>>>> >
>>>>> >
>>>>> > On Tue, Oct 14, 2014 at 6:28 PM, Aljoscha Krettek <
>>>>> aljoscha@apache.org>
>>>>> > wrote:
>>>>> >>
>>>>> >> Could you maybe also give some examples for the input expected by
>>>>> your
>>>>> >> program?
>>>>> >>
>>>>> >> Also, the residual DataSet contains several Vectors while the sum
>>>>> (or
>>>>> >> old_sum) DataSet is always only contains 1 Vector. Correct?
>>>>> >>
>>>>> >> On Tue, Oct 14, 2014 at 6:04 PM, Stephan Ewen <se...@apache.org>
>>>>> wrote:
>>>>> >> > BTW: The current master allows you to not join with the solution
>>>>> set,
>>>>> >> > and
>>>>> >> > only use it to accumulate data.
>>>>> >> >
>>>>> >> > On Tue, Oct 14, 2014 at 5:29 PM, Maximilian Alber
>>>>> >> > <al...@gmail.com> wrote:
>>>>> >> >>
>>>>> >> >> Ok, that's possible too.
>>>>> >> >>
>>>>> >> >> VectorDataSet is just scala magic to ease my life (See below).
>>>>> If you
>>>>> >> >> want
>>>>> >> >> to take a look, I appended the package. The main code is in
>>>>> >> >> BumpBoost.scala.
>>>>> >> >> In util.scala is the vector stuff.
>>>>> >> >> Thanks!
>>>>> >> >>
>>>>> >> >> class VectorDataSet(X: DataSet[Vector]){
>>>>> >> >> def + (Y: DataSet[Vector]) = VectorDataSet.add(X, Y)
>>>>> >> >> def - (Y: DataSet[Vector]) = VectorDataSet.subt(X, Y)
>>>>> >> >> def * (Y: DataSet[Vector]) = VectorDataSet.mult(X, Y)
>>>>> >> >> def / (Y: DataSet[Vector]) = VectorDataSet.div(X, Y)
>>>>> >> >>
>>>>> >> >> def zip(Y: DataSet[Vector]) = VectorDataSet.zip(X, Y)
>>>>> >> >> def dot(Y: DataSet[Vector]) = VectorDataSet.dot(X, Y)
>>>>> >> >> def sumV() = VectorDataSet.sumV(X)
>>>>> >> >> }
>>>>> >> >> object VectorDataSet {
>>>>> >> >> def add(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2
>>>>> map {x =>
>>>>> >> >> x._1 + x._2}
>>>>> >> >> def subt(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2
>>>>> map {x
>>>>> >> >> =>
>>>>> >> >> x._1 - x._2}
>>>>> >> >> def mult(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2
>>>>> map {x
>>>>> >> >> =>
>>>>> >> >> x._1 * x._2}
>>>>> >> >> def div(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2
>>>>> map {x =>
>>>>> >> >> x._1 / x._2}
>>>>> >> >>
>>>>> >> >> def zip(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 join X2
>>>>> where
>>>>> >> >> "id"
>>>>> >> >> equalTo "id"
>>>>> >> >> def dot(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 * X2 sumV
>>>>> >> >> def sumV(X1: DataSet[Vector]) = X1 reduce {_ + _}
>>>>> >> >>
>>>>> >> >> implicit def vectorDataSetWrapper(ds: DataSet[Vector]) = new
>>>>> >> >> VectorDataSet(ds)
>>>>> >> >> }
>>>>> >> >>
>>>>> >> >>
>>>>> >> >> On Tue, Oct 14, 2014 at 5:21 PM, Aljoscha Krettek <
>>>>> aljoscha@apache.org>
>>>>> >> >> wrote:
>>>>> >> >>>
>>>>> >> >>> Maybe you could use the residual_2 data set as a broadcast
>>>>> dataset.
>>>>> >> >>> i.e. make in available in the operation that adds the residual
>>>>> for the
>>>>> >> >>> current iteration number to the old_sum. (I'm not sure what the
>>>>> >> >>> VectorDataSet.add() method does here). If you gave me the
>>>>> complete
>>>>> >> >>> code I could try finding an elegant solution to that problem.
>>>>> >> >>>
>>>>> >> >>> On Tue, Oct 14, 2014 at 5:15 PM, Stephan Ewen <sewen@apache.org
>>>>> >
>>>>> >> >>> wrote:
>>>>> >> >>> > That is an interesting case. Everything that is loop
>>>>> invariant is
>>>>> >> >>> > computed
>>>>> >> >>> > once outside the loop. You are looking for a way to make this
>>>>> part
>>>>> >> >>> > of
>>>>> >> >>> > the
>>>>> >> >>> > loop.
>>>>> >> >>> >
>>>>> >> >>> > Can you try making the filter part of the
>>>>> >> >>> > "VectorDataSet.add(old_sum,
>>>>> >> >>> > current)" operation?
>>>>> >> >>> >
>>>>> >> >>> > On Tue, Oct 14, 2014 at 5:05 PM, Maximilian Alber
>>>>> >> >>> > <al...@gmail.com> wrote:
>>>>> >> >>> >>
>>>>> >> >>> >> The deltaiteration calculates the cumulative sum (prefix
>>>>> sum) of
>>>>> >> >>> >> residual_2.
>>>>> >> >>> >>
>>>>> >> >>> >> I'm not sure if got you idea. I could add residual_2 to the
>>>>> >> >>> >> workset.
>>>>> >> >>> >> But
>>>>> >> >>> >> in that case I need to separate the "old_sum"(the sum up to
>>>>> now)
>>>>> >> >>> >> and
>>>>> >> >>> >> residual_2 each iteration. Actually I had that before, then
>>>>> I tried
>>>>> >> >>> >> the
>>>>> >> >>> >> Scala closure to clean up the code.
>>>>> >> >>> >>
>>>>> >> >>> >> On Tue, Oct 14, 2014 at 4:57 PM, Fabian Hueske <
>>>>> fhueske@apache.org>
>>>>> >> >>> >> wrote:
>>>>> >> >>> >>>
>>>>> >> >>> >>> Jep, I see you point.
>>>>> >> >>> >>> Conceptually, all data that changes and affects the result
>>>>> of an
>>>>> >> >>> >>> iteration should be part of the workset.
>>>>> >> >>> >>> Hence, the model kind of assumes that the datum
>>>>> "superStepNumber"
>>>>> >> >>> >>> should
>>>>> >> >>> >>> be part of the workset.
>>>>> >> >>> >>>
>>>>> >> >>> >>> I am not familiar with your application, but would it make
>>>>> sense
>>>>> >> >>> >>> to
>>>>> >> >>> >>> add
>>>>> >> >>> >>> the number as an additional attribute to the workset data
>>>>> set and
>>>>> >> >>> >>> increase
>>>>> >> >>> >>> it manually?
>>>>> >> >>> >>>
>>>>> >> >>> >>> 2014-10-14 16:45 GMT+02:00 Maximilian Alber
>>>>> >> >>> >>> <al...@gmail.com>:
>>>>> >> >>> >>>>
>>>>> >> >>> >>>> Ok, sounds true, but somehow I would like to execute it
>>>>> inside of
>>>>> >> >>> >>>> it. So
>>>>> >> >>> >>>> I probably need to do some nonsense work to make it part
>>>>> of it?
>>>>> >> >>> >>>>
>>>>> >> >>> >>>> On Tue, Oct 14, 2014 at 4:36 PM, Aljoscha Krettek
>>>>> >> >>> >>>> <al...@apache.org>
>>>>> >> >>> >>>> wrote:
>>>>> >> >>> >>>>>
>>>>> >> >>> >>>>> Dammit you beat me to it. But yes, this is exactly what I
>>>>> was
>>>>> >> >>> >>>>> just
>>>>> >> >>> >>>>> writing.
>>>>> >> >>> >>>>>
>>>>> >> >>> >>>>> On Tue, Oct 14, 2014 at 4:35 PM, Fabian Hueske
>>>>> >> >>> >>>>> <fh...@apache.org>
>>>>> >> >>> >>>>> wrote:
>>>>> >> >>> >>>>> > Hi,
>>>>> >> >>> >>>>> >
>>>>> >> >>> >>>>> > I'm not super familiar with the iterations, but my
>>>>> guess would
>>>>> >> >>> >>>>> > be
>>>>> >> >>> >>>>> > that the
>>>>> >> >>> >>>>> > filter is not evaluated as part of the iteration.
>>>>> >> >>> >>>>> > Since it is not connect to the workset, the filter is
>>>>> not part
>>>>> >> >>> >>>>> > of
>>>>> >> >>> >>>>> > the
>>>>> >> >>> >>>>> > loop
>>>>> >> >>> >>>>> > and evaluated once outside where no superset number is
>>>>> >> >>> >>>>> > available.
>>>>> >> >>> >>>>> > I guess, moving the filter outside of the loop gives
>>>>> the same
>>>>> >> >>> >>>>> > error.
>>>>> >> >>> >>>>> >
>>>>> >> >>> >>>>> > Cheers, Fabian
>>>>> >> >>> >>>>> >
>>>>> >> >>> >>>>> >
>>>>> >> >>> >>>>> >
>>>>> >> >>> >>>>> > 2014-10-14 16:18 GMT+02:00 Maximilian Alber
>>>>> >> >>> >>>>> > <al...@gmail.com>:
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >> Hmm or maybe not. With this code I get some strange
>>>>> error:
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >> def createPlan_find_center(env: ExecutionEnvironment)
>>>>> = {
>>>>> >> >>> >>>>> >> val X = env readTextFile config.xFile map
>>>>> >> >>> >>>>> >> {Vector.parseFromString(config.dimensions, _)};
>>>>> >> >>> >>>>> >> val residual = env readTextFile config.yFile map
>>>>> >> >>> >>>>> >> {Vector.parseFromString(_)};
>>>>> >> >>> >>>>> >> val randoms = env readTextFile config.randomFile map
>>>>> >> >>> >>>>> >> {Vector.parseFromString(_)}
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >> val residual_2 = residual * residual
>>>>> >> >>> >>>>> >> val ys = (residual_2 sumV) * (randoms filter {_.id ==
>>>>> 0})
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >> val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>> >> >>> >>>>> >> val sumVector =
>>>>> >> >>> >>>>> >>
>>>>> env.fromCollection(Seq(Vector.zeros(config.dimensions)))
>>>>> >> >>> >>>>> >> val cumSum = emptyDataSet.iterateDelta(sumVector,
>>>>> config.N,
>>>>> >> >>> >>>>> >> Array("id")) {
>>>>> >> >>> >>>>> >>     (solutionset, old_sum) =>
>>>>> >> >>> >>>>> >>     val current = residual_2 filter (new
>>>>> >> >>> >>>>> >> RichFilterFunction[Vector]{
>>>>> >> >>> >>>>> >>       def filter(x: Vector) = x.id ==
>>>>> >> >>> >>>>> >> (getIterationRuntimeContext.getSuperstepNumber)
>>>>> >> >>> >>>>> >>     })
>>>>> >> >>> >>>>> >>     val sum = VectorDataSet.add(old_sum, current)
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>     (sum map (new RichMapFunction[Vector, Vector]{
>>>>> >> >>> >>>>> >>       def map(x: Vector) = new
>>>>> >> >>> >>>>> >> Vector(getIterationRuntimeContext.getSuperstepNumber,
>>>>> >> >>> >>>>> >> x.values)
>>>>> >> >>> >>>>> >>     }),
>>>>> >> >>> >>>>> >>     sum)
>>>>> >> >>> >>>>> >> }
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >> Error:
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: Job execution switched to status
>>>>> RUNNING
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1)
>>>>> switched to
>>>>> >> >>> >>>>> >> SCHEDULED
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1)
>>>>> switched to
>>>>> >> >>> >>>>> >> DEPLOYING
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>>>>> >> >>> >>>>> >> SCHEDULED
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>>>>> >> >>> >>>>> >> DEPLOYING
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>>>> >> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> >>> >>>>> >> UTF-8) -> Map
>>>>> (org.apache.flink.api.scala.DataSet$$anon$1)
>>>>> >> >>> >>>>> >> (1/1)
>>>>> >> >>> >>>>> >> switched to
>>>>> >> >>> >>>>> >> SCHEDULED
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>>>> >> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> >>> >>>>> >> UTF-8) -> Map
>>>>> (org.apache.flink.api.scala.DataSet$$anon$1)
>>>>> >> >>> >>>>> >> (1/1)
>>>>> >> >>> >>>>> >> switched to
>>>>> >> >>> >>>>> >> DEPLOYING
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1)
>>>>> switched to
>>>>> >> >>> >>>>> >> RUNNING
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration
>>>>> (Unnamed
>>>>> >> >>> >>>>> >> Delta
>>>>> >> >>> >>>>> >> Iteration)) (1/1) switched to SCHEDULED
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration
>>>>> (Unnamed
>>>>> >> >>> >>>>> >> Delta
>>>>> >> >>> >>>>> >> Iteration)) (1/1) switched to DEPLOYING
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>>>> >> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> >>> >>>>> >> UTF-8) -> Map
>>>>> (org.apache.flink.api.scala.DataSet$$anon$1)
>>>>> >> >>> >>>>> >> (1/1)
>>>>> >> >>> >>>>> >> switched to
>>>>> >> >>> >>>>> >> RUNNING
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>>>>> >> >>> >>>>> >> RUNNING
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration
>>>>> (Unnamed
>>>>> >> >>> >>>>> >> Delta
>>>>> >> >>> >>>>> >> Iteration)) (1/1) switched to RUNNING
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>>>> >> >>> >>>>> >> ->
>>>>> >> >>> >>>>> >> Map
>>>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>>>> >> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1)
>>>>> switched to
>>>>> >> >>> >>>>> >> SCHEDULED
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>>>> >> >>> >>>>> >> ->
>>>>> >> >>> >>>>> >> Map
>>>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>>>> >> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1)
>>>>> switched to
>>>>> >> >>> >>>>> >> DEPLOYING
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>>>> >> >>> >>>>> >> ->
>>>>> >> >>> >>>>> >> Map
>>>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>>> switched
>>>>> >> >>> >>>>> >> to
>>>>> >> >>> >>>>> >> SCHEDULED
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>>>> >> >>> >>>>> >> ->
>>>>> >> >>> >>>>> >> Map
>>>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>>> switched
>>>>> >> >>> >>>>> >> to
>>>>> >> >>> >>>>> >> DEPLOYING
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>>>> >> >>> >>>>> >> ->
>>>>> >> >>> >>>>> >> Map
>>>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>>>> >> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1)
>>>>> switched to
>>>>> >> >>> >>>>> >> RUNNING
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>>>> >> >>> >>>>> >> ->
>>>>> >> >>> >>>>> >> Map
>>>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>>> switched
>>>>> >> >>> >>>>> >> to
>>>>> >> >>> >>>>> >> RUNNING
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>>>> >> >>> >>>>> >> ->
>>>>> >> >>> >>>>> >> Map
>>>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>>>> >> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1)
>>>>> switched to
>>>>> >> >>> >>>>> >> FAILED
>>>>> >> >>> >>>>> >> java.lang.IllegalStateException: This stub is not part
>>>>> of an
>>>>> >> >>> >>>>> >> iteration
>>>>> >> >>> >>>>> >> step function.
>>>>> >> >>> >>>>> >> at
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>>>>> >> >>> >>>>> >> at
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>>>>> >> >>> >>>>> >> at
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>>>>> >> >>> >>>>> >> at
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>>>>> >> >>> >>>>> >> at
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>>>>> >> >>> >>>>> >> at
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>>>> >> >>> >>>>> >> at
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>>>>> >> >>> >>>>> >> at
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>>>>> >> >>> >>>>> >> at
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>>>> >> >>> >>>>> >> at
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>>>> >> >>> >>>>> >> at
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>>>> >> >>> >>>>> >> at
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>>>>> >> >>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status
>>>>> FAILING
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to
>>>>> >> >>> >>>>> >> CANCELING
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>>>>> >> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> >>> >>>>> >> UTF-8) -> Map
>>>>> (org.apache.flink.api.scala.DataSet$$anon$1)
>>>>> >> >>> >>>>> >> (1/1)
>>>>> >> >>> >>>>> >> switched to
>>>>> >> >>> >>>>> >> CANCELING
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1)
>>>>> switched to
>>>>> >> >>> >>>>> >> CANCELING
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration
>>>>> (Unnamed
>>>>> >> >>> >>>>> >> Delta
>>>>> >> >>> >>>>> >> Iteration)) (1/1) switched to CANCELING
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: Map
>>>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
>>>>> >> >>> >>>>> >> (1/1) switched to CANCELED
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: DataSink(TextOutputFormat
>>>>> >> >>> >>>>> >> (/tmp/tmplSYJ7S)
>>>>> >> >>> >>>>> >> -
>>>>> >> >>> >>>>> >> UTF-8)
>>>>> >> >>> >>>>> >> (1/1) switched to CANCELED
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: Sync (WorksetIteration (Unnamed
>>>>> Delta
>>>>> >> >>> >>>>> >> Iteration))
>>>>> >> >>> >>>>> >> (1/1) switched to CANCELED
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>>>> >> >>> >>>>> >> ->
>>>>> >> >>> >>>>> >> Map
>>>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>>> switched
>>>>> >> >>> >>>>> >> to
>>>>> >> >>> >>>>> >> CANCELING
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: Map
>>>>> >> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$2)
>>>>> >> >>> >>>>> >> (1/1)
>>>>> >> >>> >>>>> >> switched to CANCELED
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: SolutionSet Delta (1/1) switched
>>>>> to
>>>>> >> >>> >>>>> >> CANCELED
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to
>>>>> >> >>> >>>>> >> CANCELED
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>>>>> >> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> >>> >>>>> >> UTF-8) -> Map
>>>>> (org.apache.flink.api.scala.DataSet$$anon$1)
>>>>> >> >>> >>>>> >> (1/1)
>>>>> >> >>> >>>>> >> switched to
>>>>> >> >>> >>>>> >> CANCELED
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1)
>>>>> switched to
>>>>> >> >>> >>>>> >> CANCELED
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>>>> >> >>> >>>>> >> ->
>>>>> >> >>> >>>>> >> Map
>>>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>>> switched
>>>>> >> >>> >>>>> >> to
>>>>> >> >>> >>>>> >> CANCELED
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration
>>>>> (Unnamed
>>>>> >> >>> >>>>> >> Delta
>>>>> >> >>> >>>>> >> Iteration)) (1/1) switched to CANCELED
>>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status
>>>>> FAILED
>>>>> >> >>> >>>>> >> Error: The program execution failed:
>>>>> >> >>> >>>>> >> java.lang.IllegalStateException: This
>>>>> >> >>> >>>>> >> stub is not part of an iteration step function.
>>>>> >> >>> >>>>> >> at
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>>>>> >> >>> >>>>> >> at
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>>>>> >> >>> >>>>> >> at
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>>>>> >> >>> >>>>> >> at
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>>>>> >> >>> >>>>> >> at
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>>>>> >> >>> >>>>> >> at
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>>>> >> >>> >>>>> >> at
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>>>>> >> >>> >>>>> >> at
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>>>>> >> >>> >>>>> >> at
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>>>> >> >>> >>>>> >> at
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>>>> >> >>> >>>>> >> at
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>>>> >> >>> >>>>> >> at
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >>
>>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>>>>> >> >>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >> On Tue, Oct 14, 2014 at 3:32 PM, Maximilian Alber
>>>>> >> >>> >>>>> >> <al...@gmail.com> wrote:
>>>>> >> >>> >>>>> >>>
>>>>> >> >>> >>>>> >>> Should work now.
>>>>> >> >>> >>>>> >>> Cheers
>>>>> >> >>> >>>>> >>>
>>>>> >> >>> >>>>> >>> On Fri, Oct 10, 2014 at 3:38 PM, Maximilian Alber
>>>>> >> >>> >>>>> >>> <al...@gmail.com> wrote:
>>>>> >> >>> >>>>> >>>>
>>>>> >> >>> >>>>> >>>> Ok, thanks.
>>>>> >> >>> >>>>> >>>> Please let me know when it is fixed.
>>>>> >> >>> >>>>> >>>>
>>>>> >> >>> >>>>> >>>> Cheers
>>>>> >> >>> >>>>> >>>> Max
>>>>> >> >>> >>>>> >>>>
>>>>> >> >>> >>>>> >>>> On Fri, Oct 10, 2014 at 1:34 PM, Stephan Ewen
>>>>> >> >>> >>>>> >>>> <se...@apache.org>
>>>>> >> >>> >>>>> >>>> wrote:
>>>>> >> >>> >>>>> >>>>>
>>>>> >> >>> >>>>> >>>>> Thank you, I will look into that...
>>>>> >> >>> >>>>> >>>>
>>>>> >> >>> >>>>> >>>>
>>>>> >> >>> >>>>> >>>
>>>>> >> >>> >>>>> >>
>>>>> >> >>> >>>>> >
>>>>> >> >>> >>>>
>>>>> >> >>> >>>>
>>>>> >> >>> >>>
>>>>> >> >>> >>
>>>>> >> >>> >
>>>>> >> >>
>>>>> >> >>
>>>>> >> >
>>>>> >
>>>>> >
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Forced to use Solution Set in Step Function

Posted by Maximilian Alber <al...@gmail.com>.
Hi Stephan!

It worked with the development snapshot. I never tried it with the new
release. If you want me to try it out, just say so.

Cheers,
Max

On Mon, Nov 10, 2014 at 5:13 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> The 0.7.0 release should have fixed that problem. Have you had a chance to
> try that out?
>
> Greetings,
> Stephan
>
>
> On Wed, Oct 15, 2014 at 3:43 PM, Maximilian Alber <
> alber.maximilian@gmail.com> wrote:
>
>> @Aljoscha Sorry, I just tried my workaround. There are some minor
>> conceptual bugs (caused by the id's...). I attached the new version.
>> Unfortunately there the compiler breaks. An issue is already open.
>>
>> On Wed, Oct 15, 2014 at 11:53 AM, Maximilian Alber <
>> alber.maximilian@gmail.com> wrote:
>>
>>> Ok.
>>>
>>> Here is a input variant:
>>> flink run -v bump_boost-0.1.jar -c bumpboost.Job x_file=X y_file=Y
>>> out_file=/tmp/tmpnWYamw random_file=random_file dimensions=1 N=100
>>> width_candidates_file=/tmp/tmpTJ4LDh iterations=30 multi_bump_boost=0
>>> gradient_descent_iterations=30 cache=False min_width=-4 max_width=6
>>> min_width_update=1e-08 max_width_update=10
>>>
>>> width_candidates_file is not needed by now. X, Y and random_file are
>>> attached.
>>>
>>> If you have problems, running it, let me know!
>>> Thanks!
>>>
>>> On Tue, Oct 14, 2014 at 10:40 PM, Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> Transferring to variables: Unfortunately not possible right now but we
>>>> are working on it.
>>>>
>>>> On Tue, Oct 14, 2014 at 8:53 PM, Maximilian Alber
>>>> <al...@gmail.com> wrote:
>>>> > @ Stephan: Thanks! So I gonna switch!
>>>> >
>>>> > Sorry, my bad. I will provide you some sample by tomorrow morning.
>>>> >
>>>> > Yes. Workaround, because I cannot transfer them into variables, can I
>>>> by now
>>>> > (or will I ever)?
>>>> >
>>>> > Maybe some explanation to my solution:
>>>> > - X is for my a matrix of shape (N, d). Modeled in Flink as dataset of
>>>> > vectors. Each Vector has an ID which is the row number and an array
>>>> with
>>>> > numbers, the actual row.
>>>> > - Y is for my a matrix of shape (N, 1) thus actually a column-vector.
>>>> > - old_sum is either a scalar if d == 1 or a row-vector aka matrix of
>>>> shape
>>>> > (1, N) or a Dataset with one Vector. (By now I have the convention to
>>>> give
>>>> > id -1 to them, comes from a former workaround...)
>>>> >
>>>> > The whole ID story comes from the fact that I need to know which stuff
>>>> > belongs together in mathematical operations (see my zip function).
>>>> You can
>>>> > look that up in util.scala, that's kind of my math library. I don't
>>>> want to
>>>> > imagine the mess in Java :)
>>>> >
>>>> > Cheers
>>>> > Max
>>>> >
>>>> >
>>>> >
>>>> > On Tue, Oct 14, 2014 at 6:28 PM, Aljoscha Krettek <
>>>> aljoscha@apache.org>
>>>> > wrote:
>>>> >>
>>>> >> Could you maybe also give some examples for the input expected by
>>>> your
>>>> >> program?
>>>> >>
>>>> >> Also, the residual DataSet contains several Vectors while the sum (or
>>>> >> old_sum) DataSet is always only contains 1 Vector. Correct?
>>>> >>
>>>> >> On Tue, Oct 14, 2014 at 6:04 PM, Stephan Ewen <se...@apache.org>
>>>> wrote:
>>>> >> > BTW: The current master allows you to not join with the solution
>>>> set,
>>>> >> > and
>>>> >> > only use it to accumulate data.
>>>> >> >
>>>> >> > On Tue, Oct 14, 2014 at 5:29 PM, Maximilian Alber
>>>> >> > <al...@gmail.com> wrote:
>>>> >> >>
>>>> >> >> Ok, that's possible too.
>>>> >> >>
>>>> >> >> VectorDataSet is just scala magic to ease my life (See below). If
>>>> you
>>>> >> >> want
>>>> >> >> to take a look, I appended the package. The main code is in
>>>> >> >> BumpBoost.scala.
>>>> >> >> In util.scala is the vector stuff.
>>>> >> >> Thanks!
>>>> >> >>
>>>> >> >> class VectorDataSet(X: DataSet[Vector]){
>>>> >> >> def + (Y: DataSet[Vector]) = VectorDataSet.add(X, Y)
>>>> >> >> def - (Y: DataSet[Vector]) = VectorDataSet.subt(X, Y)
>>>> >> >> def * (Y: DataSet[Vector]) = VectorDataSet.mult(X, Y)
>>>> >> >> def / (Y: DataSet[Vector]) = VectorDataSet.div(X, Y)
>>>> >> >>
>>>> >> >> def zip(Y: DataSet[Vector]) = VectorDataSet.zip(X, Y)
>>>> >> >> def dot(Y: DataSet[Vector]) = VectorDataSet.dot(X, Y)
>>>> >> >> def sumV() = VectorDataSet.sumV(X)
>>>> >> >> }
>>>> >> >> object VectorDataSet {
>>>> >> >> def add(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map
>>>> {x =>
>>>> >> >> x._1 + x._2}
>>>> >> >> def subt(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2
>>>> map {x
>>>> >> >> =>
>>>> >> >> x._1 - x._2}
>>>> >> >> def mult(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2
>>>> map {x
>>>> >> >> =>
>>>> >> >> x._1 * x._2}
>>>> >> >> def div(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map
>>>> {x =>
>>>> >> >> x._1 / x._2}
>>>> >> >>
>>>> >> >> def zip(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 join X2
>>>> where
>>>> >> >> "id"
>>>> >> >> equalTo "id"
>>>> >> >> def dot(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 * X2 sumV
>>>> >> >> def sumV(X1: DataSet[Vector]) = X1 reduce {_ + _}
>>>> >> >>
>>>> >> >> implicit def vectorDataSetWrapper(ds: DataSet[Vector]) = new
>>>> >> >> VectorDataSet(ds)
>>>> >> >> }
>>>> >> >>
>>>> >> >>
>>>> >> >> On Tue, Oct 14, 2014 at 5:21 PM, Aljoscha Krettek <
>>>> aljoscha@apache.org>
>>>> >> >> wrote:
>>>> >> >>>
>>>> >> >>> Maybe you could use the residual_2 data set as a broadcast
>>>> dataset.
>>>> >> >>> i.e. make in available in the operation that adds the residual
>>>> for the
>>>> >> >>> current iteration number to the old_sum. (I'm not sure what the
>>>> >> >>> VectorDataSet.add() method does here). If you gave me the
>>>> complete
>>>> >> >>> code I could try finding an elegant solution to that problem.
>>>> >> >>>
>>>> >> >>> On Tue, Oct 14, 2014 at 5:15 PM, Stephan Ewen <se...@apache.org>
>>>> >> >>> wrote:
>>>> >> >>> > That is an interesting case. Everything that is loop invariant
>>>> is
>>>> >> >>> > computed
>>>> >> >>> > once outside the loop. You are looking for a way to make this
>>>> part
>>>> >> >>> > of
>>>> >> >>> > the
>>>> >> >>> > loop.
>>>> >> >>> >
>>>> >> >>> > Can you try making the filter part of the
>>>> >> >>> > "VectorDataSet.add(old_sum,
>>>> >> >>> > current)" operation?
>>>> >> >>> >
>>>> >> >>> > On Tue, Oct 14, 2014 at 5:05 PM, Maximilian Alber
>>>> >> >>> > <al...@gmail.com> wrote:
>>>> >> >>> >>
>>>> >> >>> >> The deltaiteration calculates the cumulative sum (prefix sum)
>>>> of
>>>> >> >>> >> residual_2.
>>>> >> >>> >>
>>>> >> >>> >> I'm not sure if got you idea. I could add residual_2 to the
>>>> >> >>> >> workset.
>>>> >> >>> >> But
>>>> >> >>> >> in that case I need to separate the "old_sum"(the sum up to
>>>> now)
>>>> >> >>> >> and
>>>> >> >>> >> residual_2 each iteration. Actually I had that before, then I
>>>> tried
>>>> >> >>> >> the
>>>> >> >>> >> Scala closure to clean up the code.
>>>> >> >>> >>
>>>> >> >>> >> On Tue, Oct 14, 2014 at 4:57 PM, Fabian Hueske <
>>>> fhueske@apache.org>
>>>> >> >>> >> wrote:
>>>> >> >>> >>>
>>>> >> >>> >>> Jep, I see you point.
>>>> >> >>> >>> Conceptually, all data that changes and affects the result
>>>> of an
>>>> >> >>> >>> iteration should be part of the workset.
>>>> >> >>> >>> Hence, the model kind of assumes that the datum
>>>> "superStepNumber"
>>>> >> >>> >>> should
>>>> >> >>> >>> be part of the workset.
>>>> >> >>> >>>
>>>> >> >>> >>> I am not familiar with your application, but would it make
>>>> sense
>>>> >> >>> >>> to
>>>> >> >>> >>> add
>>>> >> >>> >>> the number as an additional attribute to the workset data
>>>> set and
>>>> >> >>> >>> increase
>>>> >> >>> >>> it manually?
>>>> >> >>> >>>
>>>> >> >>> >>> 2014-10-14 16:45 GMT+02:00 Maximilian Alber
>>>> >> >>> >>> <al...@gmail.com>:
>>>> >> >>> >>>>
>>>> >> >>> >>>> Ok, sounds true, but somehow I would like to execute it
>>>> inside of
>>>> >> >>> >>>> it. So
>>>> >> >>> >>>> I probably need to do some nonsense work to make it part of
>>>> it?
>>>> >> >>> >>>>
>>>> >> >>> >>>> On Tue, Oct 14, 2014 at 4:36 PM, Aljoscha Krettek
>>>> >> >>> >>>> <al...@apache.org>
>>>> >> >>> >>>> wrote:
>>>> >> >>> >>>>>
>>>> >> >>> >>>>> Dammit you beat me to it. But yes, this is exactly what I
>>>> was
>>>> >> >>> >>>>> just
>>>> >> >>> >>>>> writing.
>>>> >> >>> >>>>>
>>>> >> >>> >>>>> On Tue, Oct 14, 2014 at 4:35 PM, Fabian Hueske
>>>> >> >>> >>>>> <fh...@apache.org>
>>>> >> >>> >>>>> wrote:
>>>> >> >>> >>>>> > Hi,
>>>> >> >>> >>>>> >
>>>> >> >>> >>>>> > I'm not super familiar with the iterations, but my guess
>>>> would
>>>> >> >>> >>>>> > be
>>>> >> >>> >>>>> > that the
>>>> >> >>> >>>>> > filter is not evaluated as part of the iteration.
>>>> >> >>> >>>>> > Since it is not connect to the workset, the filter is
>>>> not part
>>>> >> >>> >>>>> > of
>>>> >> >>> >>>>> > the
>>>> >> >>> >>>>> > loop
>>>> >> >>> >>>>> > and evaluated once outside where no superset number is
>>>> >> >>> >>>>> > available.
>>>> >> >>> >>>>> > I guess, moving the filter outside of the loop gives the
>>>> same
>>>> >> >>> >>>>> > error.
>>>> >> >>> >>>>> >
>>>> >> >>> >>>>> > Cheers, Fabian
>>>> >> >>> >>>>> >
>>>> >> >>> >>>>> >
>>>> >> >>> >>>>> >
>>>> >> >>> >>>>> > 2014-10-14 16:18 GMT+02:00 Maximilian Alber
>>>> >> >>> >>>>> > <al...@gmail.com>:
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >> Hmm or maybe not. With this code I get some strange
>>>> error:
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >> def createPlan_find_center(env: ExecutionEnvironment) =
>>>> {
>>>> >> >>> >>>>> >> val X = env readTextFile config.xFile map
>>>> >> >>> >>>>> >> {Vector.parseFromString(config.dimensions, _)};
>>>> >> >>> >>>>> >> val residual = env readTextFile config.yFile map
>>>> >> >>> >>>>> >> {Vector.parseFromString(_)};
>>>> >> >>> >>>>> >> val randoms = env readTextFile config.randomFile map
>>>> >> >>> >>>>> >> {Vector.parseFromString(_)}
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >> val residual_2 = residual * residual
>>>> >> >>> >>>>> >> val ys = (residual_2 sumV) * (randoms filter {_.id ==
>>>> 0})
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >> val emptyDataSet = env.fromCollection[Vector](Seq())
>>>> >> >>> >>>>> >> val sumVector =
>>>> >> >>> >>>>> >> env.fromCollection(Seq(Vector.zeros(config.dimensions)))
>>>> >> >>> >>>>> >> val cumSum = emptyDataSet.iterateDelta(sumVector,
>>>> config.N,
>>>> >> >>> >>>>> >> Array("id")) {
>>>> >> >>> >>>>> >>     (solutionset, old_sum) =>
>>>> >> >>> >>>>> >>     val current = residual_2 filter (new
>>>> >> >>> >>>>> >> RichFilterFunction[Vector]{
>>>> >> >>> >>>>> >>       def filter(x: Vector) = x.id ==
>>>> >> >>> >>>>> >> (getIterationRuntimeContext.getSuperstepNumber)
>>>> >> >>> >>>>> >>     })
>>>> >> >>> >>>>> >>     val sum = VectorDataSet.add(old_sum, current)
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>     (sum map (new RichMapFunction[Vector, Vector]{
>>>> >> >>> >>>>> >>       def map(x: Vector) = new
>>>> >> >>> >>>>> >> Vector(getIterationRuntimeContext.getSuperstepNumber,
>>>> >> >>> >>>>> >> x.values)
>>>> >> >>> >>>>> >>     }),
>>>> >> >>> >>>>> >>     sum)
>>>> >> >>> >>>>> >> }
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >> Error:
>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: Job execution switched to status
>>>> RUNNING
>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1)
>>>> switched to
>>>> >> >>> >>>>> >> SCHEDULED
>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1)
>>>> switched to
>>>> >> >>> >>>>> >> DEPLOYING
>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>>>> >> >>> >>>>> >> SCHEDULED
>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>>>> >> >>> >>>>> >> DEPLOYING
>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>>> >> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>>>> >> >>> >>>>> >> UTF-8) -> Map
>>>> (org.apache.flink.api.scala.DataSet$$anon$1)
>>>> >> >>> >>>>> >> (1/1)
>>>> >> >>> >>>>> >> switched to
>>>> >> >>> >>>>> >> SCHEDULED
>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>>> >> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>>>> >> >>> >>>>> >> UTF-8) -> Map
>>>> (org.apache.flink.api.scala.DataSet$$anon$1)
>>>> >> >>> >>>>> >> (1/1)
>>>> >> >>> >>>>> >> switched to
>>>> >> >>> >>>>> >> DEPLOYING
>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1)
>>>> switched to
>>>> >> >>> >>>>> >> RUNNING
>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration
>>>> (Unnamed
>>>> >> >>> >>>>> >> Delta
>>>> >> >>> >>>>> >> Iteration)) (1/1) switched to SCHEDULED
>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration
>>>> (Unnamed
>>>> >> >>> >>>>> >> Delta
>>>> >> >>> >>>>> >> Iteration)) (1/1) switched to DEPLOYING
>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>>> >> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>>>> >> >>> >>>>> >> UTF-8) -> Map
>>>> (org.apache.flink.api.scala.DataSet$$anon$1)
>>>> >> >>> >>>>> >> (1/1)
>>>> >> >>> >>>>> >> switched to
>>>> >> >>> >>>>> >> RUNNING
>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>>>> >> >>> >>>>> >> RUNNING
>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration
>>>> (Unnamed
>>>> >> >>> >>>>> >> Delta
>>>> >> >>> >>>>> >> Iteration)) (1/1) switched to RUNNING
>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>>> >> >>> >>>>> >> ->
>>>> >> >>> >>>>> >> Map
>>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>>> >> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched
>>>> to
>>>> >> >>> >>>>> >> SCHEDULED
>>>> >> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>>> >> >>> >>>>> >> ->
>>>> >> >>> >>>>> >> Map
>>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>>> >> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched
>>>> to
>>>> >> >>> >>>>> >> DEPLOYING
>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>>> >> >>> >>>>> >> ->
>>>> >> >>> >>>>> >> Map
>>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>> switched
>>>> >> >>> >>>>> >> to
>>>> >> >>> >>>>> >> SCHEDULED
>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>>> >> >>> >>>>> >> ->
>>>> >> >>> >>>>> >> Map
>>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>> switched
>>>> >> >>> >>>>> >> to
>>>> >> >>> >>>>> >> DEPLOYING
>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>>> >> >>> >>>>> >> ->
>>>> >> >>> >>>>> >> Map
>>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>>> >> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched
>>>> to
>>>> >> >>> >>>>> >> RUNNING
>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>>> >> >>> >>>>> >> ->
>>>> >> >>> >>>>> >> Map
>>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>> switched
>>>> >> >>> >>>>> >> to
>>>> >> >>> >>>>> >> RUNNING
>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>>> >> >>> >>>>> >> ->
>>>> >> >>> >>>>> >> Map
>>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>>> >> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched
>>>> to
>>>> >> >>> >>>>> >> FAILED
>>>> >> >>> >>>>> >> java.lang.IllegalStateException: This stub is not part
>>>> of an
>>>> >> >>> >>>>> >> iteration
>>>> >> >>> >>>>> >> step function.
>>>> >> >>> >>>>> >> at
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>>>> >> >>> >>>>> >> at
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>>>> >> >>> >>>>> >> at
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>>>> >> >>> >>>>> >> at
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>>>> >> >>> >>>>> >> at
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>>>> >> >>> >>>>> >> at
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>>> >> >>> >>>>> >> at
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>>>> >> >>> >>>>> >> at
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>>>> >> >>> >>>>> >> at
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>>> >> >>> >>>>> >> at
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>>> >> >>> >>>>> >> at
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>>> >> >>> >>>>> >> at
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>>>> >> >>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status
>>>> FAILING
>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to
>>>> >> >>> >>>>> >> CANCELING
>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>>>> >> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>>>> >> >>> >>>>> >> UTF-8) -> Map
>>>> (org.apache.flink.api.scala.DataSet$$anon$1)
>>>> >> >>> >>>>> >> (1/1)
>>>> >> >>> >>>>> >> switched to
>>>> >> >>> >>>>> >> CANCELING
>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1)
>>>> switched to
>>>> >> >>> >>>>> >> CANCELING
>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration
>>>> (Unnamed
>>>> >> >>> >>>>> >> Delta
>>>> >> >>> >>>>> >> Iteration)) (1/1) switched to CANCELING
>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: Map
>>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
>>>> >> >>> >>>>> >> (1/1) switched to CANCELED
>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: DataSink(TextOutputFormat
>>>> >> >>> >>>>> >> (/tmp/tmplSYJ7S)
>>>> >> >>> >>>>> >> -
>>>> >> >>> >>>>> >> UTF-8)
>>>> >> >>> >>>>> >> (1/1) switched to CANCELED
>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: Sync (WorksetIteration (Unnamed
>>>> Delta
>>>> >> >>> >>>>> >> Iteration))
>>>> >> >>> >>>>> >> (1/1) switched to CANCELED
>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>>> >> >>> >>>>> >> ->
>>>> >> >>> >>>>> >> Map
>>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>> switched
>>>> >> >>> >>>>> >> to
>>>> >> >>> >>>>> >> CANCELING
>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: Map
>>>> >> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$2)
>>>> >> >>> >>>>> >> (1/1)
>>>> >> >>> >>>>> >> switched to CANCELED
>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: SolutionSet Delta (1/1) switched to
>>>> >> >>> >>>>> >> CANCELED
>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to
>>>> >> >>> >>>>> >> CANCELED
>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>>>> >> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>>>> >> >>> >>>>> >> UTF-8) -> Map
>>>> (org.apache.flink.api.scala.DataSet$$anon$1)
>>>> >> >>> >>>>> >> (1/1)
>>>> >> >>> >>>>> >> switched to
>>>> >> >>> >>>>> >> CANCELED
>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1)
>>>> switched to
>>>> >> >>> >>>>> >> CANCELED
>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>>> >> >>> >>>>> >> ->
>>>> >> >>> >>>>> >> Map
>>>> >> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>> switched
>>>> >> >>> >>>>> >> to
>>>> >> >>> >>>>> >> CANCELED
>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration
>>>> (Unnamed
>>>> >> >>> >>>>> >> Delta
>>>> >> >>> >>>>> >> Iteration)) (1/1) switched to CANCELED
>>>> >> >>> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status
>>>> FAILED
>>>> >> >>> >>>>> >> Error: The program execution failed:
>>>> >> >>> >>>>> >> java.lang.IllegalStateException: This
>>>> >> >>> >>>>> >> stub is not part of an iteration step function.
>>>> >> >>> >>>>> >> at
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>>>> >> >>> >>>>> >> at
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>>>> >> >>> >>>>> >> at
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>>>> >> >>> >>>>> >> at
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>>>> >> >>> >>>>> >> at
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>>>> >> >>> >>>>> >> at
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>>> >> >>> >>>>> >> at
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>>>> >> >>> >>>>> >> at
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>>>> >> >>> >>>>> >> at
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>>> >> >>> >>>>> >> at
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>>> >> >>> >>>>> >> at
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>>> >> >>> >>>>> >> at
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >>
>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>>>> >> >>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >> On Tue, Oct 14, 2014 at 3:32 PM, Maximilian Alber
>>>> >> >>> >>>>> >> <al...@gmail.com> wrote:
>>>> >> >>> >>>>> >>>
>>>> >> >>> >>>>> >>> Should work now.
>>>> >> >>> >>>>> >>> Cheers
>>>> >> >>> >>>>> >>>
>>>> >> >>> >>>>> >>> On Fri, Oct 10, 2014 at 3:38 PM, Maximilian Alber
>>>> >> >>> >>>>> >>> <al...@gmail.com> wrote:
>>>> >> >>> >>>>> >>>>
>>>> >> >>> >>>>> >>>> Ok, thanks.
>>>> >> >>> >>>>> >>>> Please let me know when it is fixed.
>>>> >> >>> >>>>> >>>>
>>>> >> >>> >>>>> >>>> Cheers
>>>> >> >>> >>>>> >>>> Max
>>>> >> >>> >>>>> >>>>
>>>> >> >>> >>>>> >>>> On Fri, Oct 10, 2014 at 1:34 PM, Stephan Ewen
>>>> >> >>> >>>>> >>>> <se...@apache.org>
>>>> >> >>> >>>>> >>>> wrote:
>>>> >> >>> >>>>> >>>>>
>>>> >> >>> >>>>> >>>>> Thank you, I will look into that...
>>>> >> >>> >>>>> >>>>
>>>> >> >>> >>>>> >>>>
>>>> >> >>> >>>>> >>>
>>>> >> >>> >>>>> >>
>>>> >> >>> >>>>> >
>>>> >> >>> >>>>
>>>> >> >>> >>>>
>>>> >> >>> >>>
>>>> >> >>> >>
>>>> >> >>> >
>>>> >> >>
>>>> >> >>
>>>> >> >
>>>> >
>>>> >
>>>>
>>>
>>>
>>
>