You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Bergmann, Rico (GfK External)" <Ri...@ext.gfk.com> on 2017/02/02 15:04:07 UTC

possible reasons for exception "Cannot move input watermark time backwards from"

Hi @all!

I'm using Beam 0.4.0 and only the batch processing features of it.
While executing the pipeline I get an exception: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
First, since I'm not using the streaming features I'm wondering about watermarks (but this may be an Beam internal thing, I don't know).
Second, the timestamp stated in the exception message is really weird and looks a bit like an overflow in a long value to me.

Does anyone have a clue what the reason for this exception could be?

Thanks,
Rico.


Full Stacktrace:
2017-02-02 14:31:24,863 ERROR [Driver] yarn.ApplicationMaster (Logging.scala:logError(95)) - User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
        at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
        at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112)
        at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
        at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329)
        at com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52)
        at com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
Caused by: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
        at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199)
        at org.apache.beam.sdk.util.state.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:163)
        at org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn.processElement(GroupAlsoByWindowsViaOutputBufferDoFn.java:89)
        at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.invokeProcessElement(SparkProcessContext.java:372)
        at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:335)
        at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
        at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
        at org.apache.beam.runners.spark.translation.SparkProcessContext.callWithCtxt(SparkProcessContext.java:91)
        at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:75)
        at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:43)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


________________________________


GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf Klein-B?lting This email and any attachments may contain confidential or privileged information. Please note that unauthorized copying, disclosure or distribution of the material in this email is not permitted.

Re: possible reasons for exception "Cannot move input watermark time backwards from"

Posted by Amit Sela <am...@gmail.com>.
@Rico: filed BEAM-1395 <https://issues.apache.org/jira/browse/BEAM-1395>.
This should be sorted soon enough.

Thanks for reporting this issue!

On Fri, Feb 3, 2017 at 4:31 PM Amit Sela <am...@gmail.com> wrote:

> OK, this is indeed a different stacktrace - the problem now is in
> SparkGroupAlsoByWIndow which did not exist in 0.4.0, and I hoped would fix
> any issues you've encountered.
>
> More questions: is your data timestamped ? is your pipeline aware of the
> timestamp fields (using DoFn#outputWithTimestamp or a source that defines
> the timestamp)?
>
> Looks like this
> <https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java#L138> is
> broken anyway, I don't think there's actually time-order guarantee when
> processing a partition. Could you open a ticket please ? Thanks!
>
> On Fri, Feb 3, 2017 at 4:13 PM Bergmann, Rico (GfK External) <
> Rico.Bergmann@ext.gfk.com> wrote:
>
>
>
> Due to restricitions in my contract I can not show you the pipeline. But
> it’s a very complex we are work on for several months already. Also with
> Beam 0.4.0
>
>
>
> Interesting to note is, that we already successfully ran our pipeline with
> that version. Now in a series of 30 executions about 20 get this exception,
> the others succeed…
>
>
>
>
>
> The full StackTrace
>
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalStateException: Cannot move input watermark time backwards
> from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112)
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
>
>         at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110)
>
>         at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>         at java.lang.reflect.Method.invoke(Method.java:497)
>
>         at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
>
> Caused by: java.lang.IllegalStateException: Cannot move input watermark
> time backwards from 294247-01-09T04:00:54.775Z to
> -290308-12-21T19:59:05.225Z
>
>         at
> org.apache.beam.runners.core.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199)
>
>         at
> org.apache.beam.runners.core.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:189)
>
>         at
> org.apache.beam.runners.spark.translation.SparkGroupAlsoByWindowFn.call(SparkGroupAlsoByWindowFn.java:140)
>
>         at
> org.apache.beam.runners.spark.translation.SparkGroupAlsoByWindowFn.call(SparkGroupAlsoByWindowFn.java:56)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
>
>         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>
>         at
> scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
>
>         at
> org.apache.beam.runners.spark.translation.SparkProcessContext.processPartition(SparkProcessContext.java:64)
>
>         at
> org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:97)
>
>         at
> org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:47)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>         at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
> *Von:* Amit Sela [mailto:amitsela33@gmail.com]
> *Gesendet:* Freitag, 3. Februar 2017 13:10
>
>
> *An:* user@beam.apache.org
> *Betreff:* Re: possible reasons for exception "Cannot move input
> watermark time backwards from"
>
>
>
> Is it the exact same stack trace ? Would you mind sharing the stack trace
> and the pipeline ?
>
> Thanks,
> Amit
>
>
>
> On Fri, Feb 3, 2017, 13:58 Bergmann, Rico (GfK External) <
> Rico.Bergmann@ext.gfk.com> wrote:
>
> Hi!
>
>
>
> Thanks for the insights.
>
>
>
> As you suggested I tried it with the current beam0.5.0-SNAPSHOT. But ran
> into the same error … L
>
>
>
> Any further ideas or suggestions?
>
>
>
> Best,
>
> Rico.
>
>
>
> *Von:* Amit Sela [mailto:amitsela33@gmail.com]
> *Gesendet:* Donnerstag, 2. Februar 2017 17:15
> *An:* user@beam.apache.org
> *Betreff:* Re: possible reasons for exception "Cannot move input
> watermark time backwards from"
>
>
>
> Hi Rico,
>
>
>
> Batch sort of uses Watermarks by noting the "start watermark"  at the
> beginning of time, and the "end watermark" at the end of time (this is the
> "overflow" you see), or in a more "Beam" way, the watermark at the
> beginning is the start of time, and after processing all the elements the
> watermark jumps to the end of time because we *know *there are no more
> elements left to process.
>
>
>
> Could you try 0.5.0-SNAPSHOT please ? there was a large refactor around
> that area in the Spark runner, and release is on the way so 0.5.0 should be
> available within a few days anyway.
>
>
>
> Thanks,
>
> Amit
>
>
>
> On Thu, Feb 2, 2017 at 5:04 PM Bergmann, Rico (GfK External) <
> Rico.Bergmann@ext.gfk.com> wrote:
>
> Hi @all!
>
>
>
> I’m using Beam 0.4.0 and only the batch processing features of it.
>
> While executing the pipeline I get an exception: Cannot move input
> watermark time backwards from 294247-01-09T04:00:54.775Z to
> -290308-12-21T19:59:05.225Z
>
> First, since I’m not using the streaming features I’m wondering about
> watermarks (but this may be an Beam internal thing, I don’t know).
>
> Second, the timestamp stated in the exception message is really weird and
> looks a bit like an overflow in a long value to me.
>
>
>
> Does anyone have a clue what the reason for this exception could be?
>
>
>
> Thanks,
>
> Rico.
>
>
>
>
>
> Full Stacktrace:
>
> 2017-02-02 14:31:24,863 ERROR [Driver] yarn.ApplicationMaster
> (Logging.scala:logError(95)) - User class threw exception:
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalStateException: Cannot move input watermark time backwards
> from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
>
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalStateException: Cannot move input watermark time backwards
> from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112)
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
>
>         at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110)
>
>         at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>         at java.lang.reflect.Method.invoke(Method.java:497)
>
>         at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
>
> Caused by: java.lang.IllegalStateException: Cannot move input watermark
> time backwards from 294247-01-09T04:00:54.775Z to
> -290308-12-21T19:59:05.225Z
>
>         at
> org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199)
>
>         at
> org.apache.beam.sdk.util.state.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:163)
>
>         at
> org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn.processElement(GroupAlsoByWindowsViaOutputBufferDoFn.java:89)
>
>         at
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.invokeProcessElement(SparkProcessContext.java:372)
>
>         at
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:335)
>
>         at
> com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
>
>         at
> com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
>
>         at
> org.apache.beam.runners.spark.translation.SparkProcessContext.callWithCtxt(SparkProcessContext.java:91)
>
>         at
> org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:75)
>
>         at
> org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:43)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>         at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> ------------------------------
>
>
>
> GfK SE, Nuremberg, Germany, commercial register at the local court
> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
> David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf
> Klein-Bölting This email and any attachments may contain confidential or
> privileged information. Please note that unauthorized copying, disclosure
> or distribution of the material in this email is not permitted.
>
>
> ------------------------------
>
>
>
> GfK SE, Nuremberg, Germany, commercial register at the local court
> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
> David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf
> Klein-Bölting This email and any attachments may contain confidential or
> privileged information. Please note that unauthorized copying, disclosure
> or distribution of the material in this email is not permitted.
>
>
> ------------------------------
>
>
> GfK SE, Nuremberg, Germany, commercial register at the local court
> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
> David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf
> Klein-Bölting This email and any attachments may contain confidential or
> privileged information. Please note that unauthorized copying, disclosure
> or distribution of the material in this email is not permitted.
>
>

Re: possible reasons for exception "Cannot move input watermark time backwards from"

Posted by Amit Sela <am...@gmail.com>.
Hi Rico,

I already opened two tickets: BEAM-1395
<https://issues.apache.org/jira/browse/BEAM-1395> and BEAM-1396
<https://issues.apache.org/jira/browse/BEAM-1396> (Spark runner and SDK),
and you can follow on the PRs as well if you'd like:

https://github.com/apache/beam/pull/1922
https://github.com/apache/beam/pull/1924

Thanks!

On Mon, Feb 6, 2017 at 1:28 PM Bergmann, Rico (GfK External) <
Rico.Bergmann@ext.gfk.com> wrote:

> https://issues.apache.org/jira/browse/BEAM-1403
>
>
>
> *Von:* Bergmann, Rico (GfK External) [mailto:Rico.Bergmann@ext.gfk.com]
> *Gesendet:* Montag, 6. Februar 2017 12:19
> *An:* user@beam.apache.org
> *Betreff:* AW: possible reasons for exception "Cannot move input
> watermark time backwards from"
>
>
>
> See below
>
>
>
> *Von:* Amit Sela [mailto:amitsela33@gmail.com <am...@gmail.com>]
> *Gesendet:* Freitag, 3. Februar 2017 15:31
> *An:* user@beam.apache.org
> *Betreff:* Re: possible reasons for exception "Cannot move input
> watermark time backwards from"
>
>
>
> OK, this is indeed a different stacktrace - the problem now is in
> SparkGroupAlsoByWIndow which did not exist in 0.4.0, and I hoped would fix
> any issues you've encountered.
>
>
>
> More questions: is your data timestamped ?
>
> Yes (but only internally)
>
> is your pipeline aware of the timestamp fields (using
> DoFn#outputWithTimestamp or a source that defines the timestamp)?
>
> No, we do not expose timestamps to the pipeline. For the pipeline this are
> simply fields in a record.
>
>
>
> Looks like this
> <https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java#L138> is
> broken anyway, I don't think there's actually time-order guarantee when
> processing a partition. Could you open a ticket please ? Thanks!
>
>
>
> I’ll do this!
>
>
>
> On Fri, Feb 3, 2017 at 4:13 PM Bergmann, Rico (GfK External) <
> Rico.Bergmann@ext.gfk.com> wrote:
>
>
>
> Due to restricitions in my contract I can not show you the pipeline. But
> it’s a very complex we are work on for several months already. Also with
> Beam 0.4.0
>
>
>
> Interesting to note is, that we already successfully ran our pipeline with
> that version. Now in a series of 30 executions about 20 get this exception,
> the others succeed…
>
>
>
>
>
> The full StackTrace
>
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalStateException: Cannot move input watermark time backwards
> from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112)
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
>
>         at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110)
>
>         at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>         at java.lang.reflect.Method.invoke(Method.java:497)
>
>         at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
>
> Caused by: java.lang.IllegalStateException: Cannot move input watermark
> time backwards from 294247-01-09T04:00:54.775Z to
> -290308-12-21T19:59:05.225Z
>
>         at
> org.apache.beam.runners.core.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199)
>
>         at
> org.apache.beam.runners.core.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:189)
>
>         at
> org.apache.beam.runners.spark.translation.SparkGroupAlsoByWindowFn.call(SparkGroupAlsoByWindowFn.java:140)
>
>         at
> org.apache.beam.runners.spark.translation.SparkGroupAlsoByWindowFn.call(SparkGroupAlsoByWindowFn.java:56)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
>
>         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>
>         at
> scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
>
>         at
> org.apache.beam.runners.spark.translation.SparkProcessContext.processPartition(SparkProcessContext.java:64)
>
>         at
> org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:97)
>
>         at
> org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:47)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>         at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
> *Von:* Amit Sela [mailto:amitsela33@gmail.com]
> *Gesendet:* Freitag, 3. Februar 2017 13:10
>
>
> *An:* user@beam.apache.org
> *Betreff:* Re: possible reasons for exception "Cannot move input
> watermark time backwards from"
>
>
>
> Is it the exact same stack trace ? Would you mind sharing the stack trace
> and the pipeline ?
>
> Thanks,
> Amit
>
>
>
> On Fri, Feb 3, 2017, 13:58 Bergmann, Rico (GfK External) <
> Rico.Bergmann@ext.gfk.com> wrote:
>
> Hi!
>
>
>
> Thanks for the insights.
>
>
>
> As you suggested I tried it with the current beam0.5.0-SNAPSHOT. But ran
> into the same error … L
>
>
>
> Any further ideas or suggestions?
>
>
>
> Best,
>
> Rico.
>
>
>
> *Von:* Amit Sela [mailto:amitsela33@gmail.com]
> *Gesendet:* Donnerstag, 2. Februar 2017 17:15
> *An:* user@beam.apache.org
> *Betreff:* Re: possible reasons for exception "Cannot move input
> watermark time backwards from"
>
>
>
> Hi Rico,
>
>
>
> Batch sort of uses Watermarks by noting the "start watermark"  at the
> beginning of time, and the "end watermark" at the end of time (this is the
> "overflow" you see), or in a more "Beam" way, the watermark at the
> beginning is the start of time, and after processing all the elements the
> watermark jumps to the end of time because we *know *there are no more
> elements left to process.
>
>
>
> Could you try 0.5.0-SNAPSHOT please ? there was a large refactor around
> that area in the Spark runner, and release is on the way so 0.5.0 should be
> available within a few days anyway.
>
>
>
> Thanks,
>
> Amit
>
>
>
> On Thu, Feb 2, 2017 at 5:04 PM Bergmann, Rico (GfK External) <
> Rico.Bergmann@ext.gfk.com> wrote:
>
> Hi @all!
>
>
>
> I’m using Beam 0.4.0 and only the batch processing features of it.
>
> While executing the pipeline I get an exception: Cannot move input
> watermark time backwards from 294247-01-09T04:00:54.775Z to
> -290308-12-21T19:59:05.225Z
>
> First, since I’m not using the streaming features I’m wondering about
> watermarks (but this may be an Beam internal thing, I don’t know).
>
> Second, the timestamp stated in the exception message is really weird and
> looks a bit like an overflow in a long value to me.
>
>
>
> Does anyone have a clue what the reason for this exception could be?
>
>
>
> Thanks,
>
> Rico.
>
>
>
>
>
> Full Stacktrace:
>
> 2017-02-02 14:31:24,863 ERROR [Driver] yarn.ApplicationMaster
> (Logging.scala:logError(95)) - User class threw exception:
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalStateException: Cannot move input watermark time backwards
> from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
>
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalStateException: Cannot move input watermark time backwards
> from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112)
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
>
>         at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110)
>
>         at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>         at java.lang.reflect.Method.invoke(Method.java:497)
>
>         at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
>
> Caused by: java.lang.IllegalStateException: Cannot move input watermark
> time backwards from 294247-01-09T04:00:54.775Z to
> -290308-12-21T19:59:05.225Z
>
>         at
> org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199)
>
>         at
> org.apache.beam.sdk.util.state.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:163)
>
>         at
> org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn.processElement(GroupAlsoByWindowsViaOutputBufferDoFn.java:89)
>
>         at
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.invokeProcessElement(SparkProcessContext.java:372)
>
>         at
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:335)
>
>         at
> com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
>
>         at
> com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
>
>         at
> org.apache.beam.runners.spark.translation.SparkProcessContext.callWithCtxt(SparkProcessContext.java:91)
>
>         at
> org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:75)
>
>         at
> org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:43)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>         at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> ------------------------------
>
>
>
> GfK SE, Nuremberg, Germany, commercial register at the local court
> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
> David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf
> Klein-Bölting This email and any attachments may contain confidential or
> privileged information. Please note that unauthorized copying, disclosure
> or distribution of the material in this email is not permitted.
>
>
> ------------------------------
>
>
>
> GfK SE, Nuremberg, Germany, commercial register at the local court
> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
> David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf
> Klein-Bölting This email and any attachments may contain confidential or
> privileged information. Please note that unauthorized copying, disclosure
> or distribution of the material in this email is not permitted.
>
>
> ------------------------------
>
>
>
> GfK SE, Nuremberg, Germany, commercial register at the local court
> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
> David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf
> Klein-Bölting This email and any attachments may contain confidential or
> privileged information. Please note that unauthorized copying, disclosure
> or distribution of the material in this email is not permitted.
>
>
> ------------------------------
>
>
>
> GfK SE, Nuremberg, Germany, commercial register at the local court
> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
> David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf
> Klein-Bölting This email and any attachments may contain confidential or
> privileged information. Please note that unauthorized copying, disclosure
> or distribution of the material in this email is not permitted.
>
> ------------------------------
>
>
> GfK SE, Nuremberg, Germany, commercial register at the local court
> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
> David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf
> Klein-Bölting This email and any attachments may contain confidential or
> privileged information. Please note that unauthorized copying, disclosure
> or distribution of the material in this email is not permitted.
>

AW: possible reasons for exception "Cannot move input watermark time backwards from"

Posted by "Bergmann, Rico (GfK External)" <Ri...@ext.gfk.com>.
https://issues.apache.org/jira/browse/BEAM-1403

Von: Bergmann, Rico (GfK External) [mailto:Rico.Bergmann@ext.gfk.com]
Gesendet: Montag, 6. Februar 2017 12:19
An: user@beam.apache.org
Betreff: AW: possible reasons for exception "Cannot move input watermark time backwards from"

See below

Von: Amit Sela [mailto:amitsela33@gmail.com]
Gesendet: Freitag, 3. Februar 2017 15:31
An: user@beam.apache.org<ma...@beam.apache.org>
Betreff: Re: possible reasons for exception "Cannot move input watermark time backwards from"

OK, this is indeed a different stacktrace - the problem now is in SparkGroupAlsoByWIndow which did not exist in 0.4.0, and I hoped would fix any issues you've encountered.

More questions: is your data timestamped ?
Yes (but only internally)
is your pipeline aware of the timestamp fields (using DoFn#outputWithTimestamp or a source that defines the timestamp)?
No, we do not expose timestamps to the pipeline. For the pipeline this are simply fields in a record.

Looks like this<https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java#L138> is broken anyway, I don't think there's actually time-order guarantee when processing a partition. Could you open a ticket please ? Thanks!

I’ll do this!

On Fri, Feb 3, 2017 at 4:13 PM Bergmann, Rico (GfK External) <Ri...@ext.gfk.com>> wrote:

Due to restricitions in my contract I can not show you the pipeline. But it’s a very complex we are work on for several months already. Also with Beam 0.4.0

Interesting to note is, that we already successfully ran our pipeline with that version. Now in a series of 30 executions about 20 get this exception, the others succeed…


The full StackTrace
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
        at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
        at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112)
        at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
        at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329)
        at com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52)
        at com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
Caused by: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
        at org.apache.beam.runners.core.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199)
        at org.apache.beam.runners.core.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:189)
        at org.apache.beam.runners.spark.translation.SparkGroupAlsoByWindowFn.call(SparkGroupAlsoByWindowFn.java:140)
        at org.apache.beam.runners.spark.translation.SparkGroupAlsoByWindowFn.call(SparkGroupAlsoByWindowFn.java:56)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
        at org.apache.beam.runners.spark.translation.SparkProcessContext.processPartition(SparkProcessContext.java:64)
        at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:97)
        at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:47)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


Von: Amit Sela [mailto:amitsela33@gmail.com<ma...@gmail.com>]
Gesendet: Freitag, 3. Februar 2017 13:10

An: user@beam.apache.org<ma...@beam.apache.org>
Betreff: Re: possible reasons for exception "Cannot move input watermark time backwards from"


Is it the exact same stack trace ? Would you mind sharing the stack trace and the pipeline ?

Thanks,
Amit

On Fri, Feb 3, 2017, 13:58 Bergmann, Rico (GfK External) <Ri...@ext.gfk.com>> wrote:
Hi!

Thanks for the insights.

As you suggested I tried it with the current beam0.5.0-SNAPSHOT. But ran into the same error … ☹

Any further ideas or suggestions?

Best,
Rico.

Von: Amit Sela [mailto:amitsela33@gmail.com<ma...@gmail.com>]
Gesendet: Donnerstag, 2. Februar 2017 17:15
An: user@beam.apache.org<ma...@beam.apache.org>
Betreff: Re: possible reasons for exception "Cannot move input watermark time backwards from"

Hi Rico,

Batch sort of uses Watermarks by noting the "start watermark"  at the beginning of time, and the "end watermark" at the end of time (this is the "overflow" you see), or in a more "Beam" way, the watermark at the beginning is the start of time, and after processing all the elements the watermark jumps to the end of time because we know there are no more elements left to process.

Could you try 0.5.0-SNAPSHOT please ? there was a large refactor around that area in the Spark runner, and release is on the way so 0.5.0 should be available within a few days anyway.

Thanks,
Amit

On Thu, Feb 2, 2017 at 5:04 PM Bergmann, Rico (GfK External) <Ri...@ext.gfk.com>> wrote:
Hi @all!

I’m using Beam 0.4.0 and only the batch processing features of it.
While executing the pipeline I get an exception: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
First, since I’m not using the streaming features I’m wondering about watermarks (but this may be an Beam internal thing, I don’t know).
Second, the timestamp stated in the exception message is really weird and looks a bit like an overflow in a long value to me.

Does anyone have a clue what the reason for this exception could be?

Thanks,
Rico.


Full Stacktrace:
2017-02-02 14:31:24,863 ERROR [Driver] yarn.ApplicationMaster (Logging.scala:logError(95)) - User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
        at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
        at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112)
        at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
        at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329)
        at com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52)
        at com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
Caused by: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
        at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199)
        at org.apache.beam.sdk.util.state.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:163)
        at org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn.processElement(GroupAlsoByWindowsViaOutputBufferDoFn.java:89)
        at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.invokeProcessElement(SparkProcessContext.java:372)
        at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:335)
        at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
        at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
        at org.apache.beam.runners.spark.translation.SparkProcessContext.callWithCtxt(SparkProcessContext.java:91)
        at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:75)
        at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:43)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


________________________________


GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any attachments may contain confidential or privileged information. Please note that unauthorized copying, disclosure or distribution of the material in this email is not permitted.

________________________________


GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any attachments may contain confidential or privileged information. Please note that unauthorized copying, disclosure or distribution of the material in this email is not permitted.

________________________________


GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any attachments may contain confidential or privileged information. Please note that unauthorized copying, disclosure or distribution of the material in this email is not permitted.

________________________________


GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any attachments may contain confidential or privileged information. Please note that unauthorized copying, disclosure or distribution of the material in this email is not permitted.

________________________________


GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any attachments may contain confidential or privileged information. Please note that unauthorized copying, disclosure or distribution of the material in this email is not permitted.

AW: possible reasons for exception "Cannot move input watermark time backwards from"

Posted by "Bergmann, Rico (GfK External)" <Ri...@ext.gfk.com>.
See below

Von: Amit Sela [mailto:amitsela33@gmail.com]
Gesendet: Freitag, 3. Februar 2017 15:31
An: user@beam.apache.org
Betreff: Re: possible reasons for exception "Cannot move input watermark time backwards from"

OK, this is indeed a different stacktrace - the problem now is in SparkGroupAlsoByWIndow which did not exist in 0.4.0, and I hoped would fix any issues you've encountered.

More questions: is your data timestamped ?
Yes (but only internally)
is your pipeline aware of the timestamp fields (using DoFn#outputWithTimestamp or a source that defines the timestamp)?
No, we do not expose timestamps to the pipeline. For the pipeline this are simply fields in a record.

Looks like this<https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java#L138> is broken anyway, I don't think there's actually time-order guarantee when processing a partition. Could you open a ticket please ? Thanks!

I’ll do this!

On Fri, Feb 3, 2017 at 4:13 PM Bergmann, Rico (GfK External) <Ri...@ext.gfk.com>> wrote:

Due to restricitions in my contract I can not show you the pipeline. But it’s a very complex we are work on for several months already. Also with Beam 0.4.0

Interesting to note is, that we already successfully ran our pipeline with that version. Now in a series of 30 executions about 20 get this exception, the others succeed…


The full StackTrace
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
        at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
        at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112)
        at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
        at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329)
        at com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52)
        at com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
Caused by: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
        at org.apache.beam.runners.core.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199)
        at org.apache.beam.runners.core.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:189)
        at org.apache.beam.runners.spark.translation.SparkGroupAlsoByWindowFn.call(SparkGroupAlsoByWindowFn.java:140)
        at org.apache.beam.runners.spark.translation.SparkGroupAlsoByWindowFn.call(SparkGroupAlsoByWindowFn.java:56)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
        at org.apache.beam.runners.spark.translation.SparkProcessContext.processPartition(SparkProcessContext.java:64)
        at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:97)
        at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:47)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


Von: Amit Sela [mailto:amitsela33@gmail.com<ma...@gmail.com>]
Gesendet: Freitag, 3. Februar 2017 13:10

An: user@beam.apache.org<ma...@beam.apache.org>
Betreff: Re: possible reasons for exception "Cannot move input watermark time backwards from"


Is it the exact same stack trace ? Would you mind sharing the stack trace and the pipeline ?

Thanks,
Amit

On Fri, Feb 3, 2017, 13:58 Bergmann, Rico (GfK External) <Ri...@ext.gfk.com>> wrote:
Hi!

Thanks for the insights.

As you suggested I tried it with the current beam0.5.0-SNAPSHOT. But ran into the same error … ☹

Any further ideas or suggestions?

Best,
Rico.

Von: Amit Sela [mailto:amitsela33@gmail.com<ma...@gmail.com>]
Gesendet: Donnerstag, 2. Februar 2017 17:15
An: user@beam.apache.org<ma...@beam.apache.org>
Betreff: Re: possible reasons for exception "Cannot move input watermark time backwards from"

Hi Rico,

Batch sort of uses Watermarks by noting the "start watermark"  at the beginning of time, and the "end watermark" at the end of time (this is the "overflow" you see), or in a more "Beam" way, the watermark at the beginning is the start of time, and after processing all the elements the watermark jumps to the end of time because we know there are no more elements left to process.

Could you try 0.5.0-SNAPSHOT please ? there was a large refactor around that area in the Spark runner, and release is on the way so 0.5.0 should be available within a few days anyway.

Thanks,
Amit

On Thu, Feb 2, 2017 at 5:04 PM Bergmann, Rico (GfK External) <Ri...@ext.gfk.com>> wrote:
Hi @all!

I’m using Beam 0.4.0 and only the batch processing features of it.
While executing the pipeline I get an exception: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
First, since I’m not using the streaming features I’m wondering about watermarks (but this may be an Beam internal thing, I don’t know).
Second, the timestamp stated in the exception message is really weird and looks a bit like an overflow in a long value to me.

Does anyone have a clue what the reason for this exception could be?

Thanks,
Rico.


Full Stacktrace:
2017-02-02 14:31:24,863 ERROR [Driver] yarn.ApplicationMaster (Logging.scala:logError(95)) - User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
        at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
        at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112)
        at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
        at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329)
        at com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52)
        at com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
Caused by: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
        at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199)
        at org.apache.beam.sdk.util.state.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:163)
        at org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn.processElement(GroupAlsoByWindowsViaOutputBufferDoFn.java:89)
        at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.invokeProcessElement(SparkProcessContext.java:372)
        at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:335)
        at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
        at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
        at org.apache.beam.runners.spark.translation.SparkProcessContext.callWithCtxt(SparkProcessContext.java:91)
        at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:75)
        at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:43)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


________________________________


GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any attachments may contain confidential or privileged information. Please note that unauthorized copying, disclosure or distribution of the material in this email is not permitted.

________________________________


GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any attachments may contain confidential or privileged information. Please note that unauthorized copying, disclosure or distribution of the material in this email is not permitted.

________________________________


GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any attachments may contain confidential or privileged information. Please note that unauthorized copying, disclosure or distribution of the material in this email is not permitted.

________________________________


GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any attachments may contain confidential or privileged information. Please note that unauthorized copying, disclosure or distribution of the material in this email is not permitted.

Re: possible reasons for exception "Cannot move input watermark time backwards from"

Posted by Amit Sela <am...@gmail.com>.
OK, this is indeed a different stacktrace - the problem now is in
SparkGroupAlsoByWIndow which did not exist in 0.4.0, and I hoped would fix
any issues you've encountered.

More questions: is your data timestamped ? is your pipeline aware of the
timestamp fields (using DoFn#outputWithTimestamp or a source that defines
the timestamp)?

Looks like this
<https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java#L138>
is
broken anyway, I don't think there's actually time-order guarantee when
processing a partition. Could you open a ticket please ? Thanks!

On Fri, Feb 3, 2017 at 4:13 PM Bergmann, Rico (GfK External) <
Rico.Bergmann@ext.gfk.com> wrote:

>
>
> Due to restricitions in my contract I can not show you the pipeline. But
> it’s a very complex we are work on for several months already. Also with
> Beam 0.4.0
>
>
>
> Interesting to note is, that we already successfully ran our pipeline with
> that version. Now in a series of 30 executions about 20 get this exception,
> the others succeed…
>
>
>
>
>
> The full StackTrace
>
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalStateException: Cannot move input watermark time backwards
> from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112)
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
>
>         at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110)
>
>         at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>         at java.lang.reflect.Method.invoke(Method.java:497)
>
>         at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
>
> Caused by: java.lang.IllegalStateException: Cannot move input watermark
> time backwards from 294247-01-09T04:00:54.775Z to
> -290308-12-21T19:59:05.225Z
>
>         at
> org.apache.beam.runners.core.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199)
>
>         at
> org.apache.beam.runners.core.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:189)
>
>         at
> org.apache.beam.runners.spark.translation.SparkGroupAlsoByWindowFn.call(SparkGroupAlsoByWindowFn.java:140)
>
>         at
> org.apache.beam.runners.spark.translation.SparkGroupAlsoByWindowFn.call(SparkGroupAlsoByWindowFn.java:56)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
>
>         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>
>         at
> scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
>
>         at
> org.apache.beam.runners.spark.translation.SparkProcessContext.processPartition(SparkProcessContext.java:64)
>
>         at
> org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:97)
>
>         at
> org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:47)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>         at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
> *Von:* Amit Sela [mailto:amitsela33@gmail.com]
> *Gesendet:* Freitag, 3. Februar 2017 13:10
>
>
> *An:* user@beam.apache.org
> *Betreff:* Re: possible reasons for exception "Cannot move input
> watermark time backwards from"
>
>
>
> Is it the exact same stack trace ? Would you mind sharing the stack trace
> and the pipeline ?
>
> Thanks,
> Amit
>
>
>
> On Fri, Feb 3, 2017, 13:58 Bergmann, Rico (GfK External) <
> Rico.Bergmann@ext.gfk.com> wrote:
>
> Hi!
>
>
>
> Thanks for the insights.
>
>
>
> As you suggested I tried it with the current beam0.5.0-SNAPSHOT. But ran
> into the same error … L
>
>
>
> Any further ideas or suggestions?
>
>
>
> Best,
>
> Rico.
>
>
>
> *Von:* Amit Sela [mailto:amitsela33@gmail.com]
> *Gesendet:* Donnerstag, 2. Februar 2017 17:15
> *An:* user@beam.apache.org
> *Betreff:* Re: possible reasons for exception "Cannot move input
> watermark time backwards from"
>
>
>
> Hi Rico,
>
>
>
> Batch sort of uses Watermarks by noting the "start watermark"  at the
> beginning of time, and the "end watermark" at the end of time (this is the
> "overflow" you see), or in a more "Beam" way, the watermark at the
> beginning is the start of time, and after processing all the elements the
> watermark jumps to the end of time because we *know *there are no more
> elements left to process.
>
>
>
> Could you try 0.5.0-SNAPSHOT please ? there was a large refactor around
> that area in the Spark runner, and release is on the way so 0.5.0 should be
> available within a few days anyway.
>
>
>
> Thanks,
>
> Amit
>
>
>
> On Thu, Feb 2, 2017 at 5:04 PM Bergmann, Rico (GfK External) <
> Rico.Bergmann@ext.gfk.com> wrote:
>
> Hi @all!
>
>
>
> I’m using Beam 0.4.0 and only the batch processing features of it.
>
> While executing the pipeline I get an exception: Cannot move input
> watermark time backwards from 294247-01-09T04:00:54.775Z to
> -290308-12-21T19:59:05.225Z
>
> First, since I’m not using the streaming features I’m wondering about
> watermarks (but this may be an Beam internal thing, I don’t know).
>
> Second, the timestamp stated in the exception message is really weird and
> looks a bit like an overflow in a long value to me.
>
>
>
> Does anyone have a clue what the reason for this exception could be?
>
>
>
> Thanks,
>
> Rico.
>
>
>
>
>
> Full Stacktrace:
>
> 2017-02-02 14:31:24,863 ERROR [Driver] yarn.ApplicationMaster
> (Logging.scala:logError(95)) - User class threw exception:
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalStateException: Cannot move input watermark time backwards
> from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
>
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalStateException: Cannot move input watermark time backwards
> from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112)
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
>
>         at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110)
>
>         at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>         at java.lang.reflect.Method.invoke(Method.java:497)
>
>         at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
>
> Caused by: java.lang.IllegalStateException: Cannot move input watermark
> time backwards from 294247-01-09T04:00:54.775Z to
> -290308-12-21T19:59:05.225Z
>
>         at
> org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199)
>
>         at
> org.apache.beam.sdk.util.state.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:163)
>
>         at
> org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn.processElement(GroupAlsoByWindowsViaOutputBufferDoFn.java:89)
>
>         at
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.invokeProcessElement(SparkProcessContext.java:372)
>
>         at
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:335)
>
>         at
> com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
>
>         at
> com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
>
>         at
> org.apache.beam.runners.spark.translation.SparkProcessContext.callWithCtxt(SparkProcessContext.java:91)
>
>         at
> org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:75)
>
>         at
> org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:43)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>         at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> ------------------------------
>
>
>
> GfK SE, Nuremberg, Germany, commercial register at the local court
> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
> David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf
> Klein-Bölting This email and any attachments may contain confidential or
> privileged information. Please note that unauthorized copying, disclosure
> or distribution of the material in this email is not permitted.
>
>
> ------------------------------
>
>
>
> GfK SE, Nuremberg, Germany, commercial register at the local court
> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
> David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf
> Klein-Bölting This email and any attachments may contain confidential or
> privileged information. Please note that unauthorized copying, disclosure
> or distribution of the material in this email is not permitted.
>
>
> ------------------------------
>
>
> GfK SE, Nuremberg, Germany, commercial register at the local court
> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
> David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf
> Klein-Bölting This email and any attachments may contain confidential or
> privileged information. Please note that unauthorized copying, disclosure
> or distribution of the material in this email is not permitted.
>

AW: possible reasons for exception "Cannot move input watermark time backwards from"

Posted by "Bergmann, Rico (GfK External)" <Ri...@ext.gfk.com>.
Due to restricitions in my contract I can not show you the pipeline. But it’s a very complex we are work on for several months already. Also with Beam 0.4.0

Interesting to note is, that we already successfully ran our pipeline with that version. Now in a series of 30 executions about 20 get this exception, the others succeed…


The full StackTrace
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
        at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
        at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112)
        at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
        at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329)
        at com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52)
        at com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
Caused by: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
        at org.apache.beam.runners.core.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199)
        at org.apache.beam.runners.core.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:189)
        at org.apache.beam.runners.spark.translation.SparkGroupAlsoByWindowFn.call(SparkGroupAlsoByWindowFn.java:140)
        at org.apache.beam.runners.spark.translation.SparkGroupAlsoByWindowFn.call(SparkGroupAlsoByWindowFn.java:56)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
        at org.apache.beam.runners.spark.translation.SparkProcessContext.processPartition(SparkProcessContext.java:64)
        at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:97)
        at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:47)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


Von: Amit Sela [mailto:amitsela33@gmail.com]
Gesendet: Freitag, 3. Februar 2017 13:10
An: user@beam.apache.org
Betreff: Re: possible reasons for exception "Cannot move input watermark time backwards from"


Is it the exact same stack trace ? Would you mind sharing the stack trace and the pipeline ?

Thanks,
Amit

On Fri, Feb 3, 2017, 13:58 Bergmann, Rico (GfK External) <Ri...@ext.gfk.com>> wrote:
Hi!

Thanks for the insights.

As you suggested I tried it with the current beam0.5.0-SNAPSHOT. But ran into the same error … ☹

Any further ideas or suggestions?

Best,
Rico.

Von: Amit Sela [mailto:amitsela33@gmail.com<ma...@gmail.com>]
Gesendet: Donnerstag, 2. Februar 2017 17:15
An: user@beam.apache.org<ma...@beam.apache.org>
Betreff: Re: possible reasons for exception "Cannot move input watermark time backwards from"

Hi Rico,

Batch sort of uses Watermarks by noting the "start watermark"  at the beginning of time, and the "end watermark" at the end of time (this is the "overflow" you see), or in a more "Beam" way, the watermark at the beginning is the start of time, and after processing all the elements the watermark jumps to the end of time because we know there are no more elements left to process.

Could you try 0.5.0-SNAPSHOT please ? there was a large refactor around that area in the Spark runner, and release is on the way so 0.5.0 should be available within a few days anyway.

Thanks,
Amit

On Thu, Feb 2, 2017 at 5:04 PM Bergmann, Rico (GfK External) <Ri...@ext.gfk.com>> wrote:
Hi @all!

I’m using Beam 0.4.0 and only the batch processing features of it.
While executing the pipeline I get an exception: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
First, since I’m not using the streaming features I’m wondering about watermarks (but this may be an Beam internal thing, I don’t know).
Second, the timestamp stated in the exception message is really weird and looks a bit like an overflow in a long value to me.

Does anyone have a clue what the reason for this exception could be?

Thanks,
Rico.


Full Stacktrace:
2017-02-02 14:31:24,863 ERROR [Driver] yarn.ApplicationMaster (Logging.scala:logError(95)) - User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
        at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
        at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112)
        at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
        at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329)
        at com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52)
        at com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
Caused by: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
        at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199)
        at org.apache.beam.sdk.util.state.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:163)
        at org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn.processElement(GroupAlsoByWindowsViaOutputBufferDoFn.java:89)
        at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.invokeProcessElement(SparkProcessContext.java:372)
        at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:335)
        at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
        at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
        at org.apache.beam.runners.spark.translation.SparkProcessContext.callWithCtxt(SparkProcessContext.java:91)
        at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:75)
        at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:43)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


________________________________


GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any attachments may contain confidential or privileged information. Please note that unauthorized copying, disclosure or distribution of the material in this email is not permitted.

________________________________


GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any attachments may contain confidential or privileged information. Please note that unauthorized copying, disclosure or distribution of the material in this email is not permitted.

________________________________


GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any attachments may contain confidential or privileged information. Please note that unauthorized copying, disclosure or distribution of the material in this email is not permitted.

Re: possible reasons for exception "Cannot move input watermark time backwards from"

Posted by Amit Sela <am...@gmail.com>.
Is it the exact same stack trace ? Would you mind sharing the stack trace
and the pipeline ?

Thanks,
Amit

On Fri, Feb 3, 2017, 13:58 Bergmann, Rico (GfK External) <
Rico.Bergmann@ext.gfk.com> wrote:

> Hi!
>
>
>
> Thanks for the insights.
>
>
>
> As you suggested I tried it with the current beam0.5.0-SNAPSHOT. But ran
> into the same error … L
>
>
>
> Any further ideas or suggestions?
>
>
>
> Best,
>
> Rico.
>
>
>
> *Von:* Amit Sela [mailto:amitsela33@gmail.com]
> *Gesendet:* Donnerstag, 2. Februar 2017 17:15
> *An:* user@beam.apache.org
> *Betreff:* Re: possible reasons for exception "Cannot move input
> watermark time backwards from"
>
>
>
> Hi Rico,
>
>
>
> Batch sort of uses Watermarks by noting the "start watermark"  at the
> beginning of time, and the "end watermark" at the end of time (this is the
> "overflow" you see), or in a more "Beam" way, the watermark at the
> beginning is the start of time, and after processing all the elements the
> watermark jumps to the end of time because we *know *there are no more
> elements left to process.
>
>
>
> Could you try 0.5.0-SNAPSHOT please ? there was a large refactor around
> that area in the Spark runner, and release is on the way so 0.5.0 should be
> available within a few days anyway.
>
>
>
> Thanks,
>
> Amit
>
>
>
> On Thu, Feb 2, 2017 at 5:04 PM Bergmann, Rico (GfK External) <
> Rico.Bergmann@ext.gfk.com> wrote:
>
> Hi @all!
>
>
>
> I’m using Beam 0.4.0 and only the batch processing features of it.
>
> While executing the pipeline I get an exception: Cannot move input
> watermark time backwards from 294247-01-09T04:00:54.775Z to
> -290308-12-21T19:59:05.225Z
>
> First, since I’m not using the streaming features I’m wondering about
> watermarks (but this may be an Beam internal thing, I don’t know).
>
> Second, the timestamp stated in the exception message is really weird and
> looks a bit like an overflow in a long value to me.
>
>
>
> Does anyone have a clue what the reason for this exception could be?
>
>
>
> Thanks,
>
> Rico.
>
>
>
>
>
> Full Stacktrace:
>
> 2017-02-02 14:31:24,863 ERROR [Driver] yarn.ApplicationMaster
> (Logging.scala:logError(95)) - User class threw exception:
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalStateException: Cannot move input watermark time backwards
> from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
>
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalStateException: Cannot move input watermark time backwards
> from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112)
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
>
>         at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110)
>
>         at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>         at java.lang.reflect.Method.invoke(Method.java:497)
>
>         at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
>
> Caused by: java.lang.IllegalStateException: Cannot move input watermark
> time backwards from 294247-01-09T04:00:54.775Z to
> -290308-12-21T19:59:05.225Z
>
>         at
> org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199)
>
>         at
> org.apache.beam.sdk.util.state.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:163)
>
>         at
> org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn.processElement(GroupAlsoByWindowsViaOutputBufferDoFn.java:89)
>
>         at
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.invokeProcessElement(SparkProcessContext.java:372)
>
>         at
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:335)
>
>         at
> com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
>
>         at
> com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
>
>         at
> org.apache.beam.runners.spark.translation.SparkProcessContext.callWithCtxt(SparkProcessContext.java:91)
>
>         at
> org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:75)
>
>         at
> org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:43)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>         at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> ------------------------------
>
>
>
> GfK SE, Nuremberg, Germany, commercial register at the local court
> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
> David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf
> Klein-Bölting This email and any attachments may contain confidential or
> privileged information. Please note that unauthorized copying, disclosure
> or distribution of the material in this email is not permitted.
>
>
> ------------------------------
>
>
> GfK SE, Nuremberg, Germany, commercial register at the local court
> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
> David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf
> Klein-Bölting This email and any attachments may contain confidential or
> privileged information. Please note that unauthorized copying, disclosure
> or distribution of the material in this email is not permitted.
>

AW: possible reasons for exception "Cannot move input watermark time backwards from"

Posted by "Bergmann, Rico (GfK External)" <Ri...@ext.gfk.com>.
Hi!

Thanks for the insights.

As you suggested I tried it with the current beam0.5.0-SNAPSHOT. But ran into the same error … ☹

Any further ideas or suggestions?

Best,
Rico.

Von: Amit Sela [mailto:amitsela33@gmail.com]
Gesendet: Donnerstag, 2. Februar 2017 17:15
An: user@beam.apache.org
Betreff: Re: possible reasons for exception "Cannot move input watermark time backwards from"

Hi Rico,

Batch sort of uses Watermarks by noting the "start watermark"  at the beginning of time, and the "end watermark" at the end of time (this is the "overflow" you see), or in a more "Beam" way, the watermark at the beginning is the start of time, and after processing all the elements the watermark jumps to the end of time because we know there are no more elements left to process.

Could you try 0.5.0-SNAPSHOT please ? there was a large refactor around that area in the Spark runner, and release is on the way so 0.5.0 should be available within a few days anyway.

Thanks,
Amit

On Thu, Feb 2, 2017 at 5:04 PM Bergmann, Rico (GfK External) <Ri...@ext.gfk.com>> wrote:
Hi @all!

I’m using Beam 0.4.0 and only the batch processing features of it.
While executing the pipeline I get an exception: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
First, since I’m not using the streaming features I’m wondering about watermarks (but this may be an Beam internal thing, I don’t know).
Second, the timestamp stated in the exception message is really weird and looks a bit like an overflow in a long value to me.

Does anyone have a clue what the reason for this exception could be?

Thanks,
Rico.


Full Stacktrace:
2017-02-02 14:31:24,863 ERROR [Driver] yarn.ApplicationMaster (Logging.scala:logError(95)) - User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
        at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
        at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112)
        at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
        at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329)
        at com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52)
        at com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
Caused by: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
        at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199)
        at org.apache.beam.sdk.util.state.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:163)
        at org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn.processElement(GroupAlsoByWindowsViaOutputBufferDoFn.java:89)
        at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.invokeProcessElement(SparkProcessContext.java:372)
        at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:335)
        at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
        at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
        at org.apache.beam.runners.spark.translation.SparkProcessContext.callWithCtxt(SparkProcessContext.java:91)
        at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:75)
        at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:43)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


________________________________


GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any attachments may contain confidential or privileged information. Please note that unauthorized copying, disclosure or distribution of the material in this email is not permitted.

________________________________


GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any attachments may contain confidential or privileged information. Please note that unauthorized copying, disclosure or distribution of the material in this email is not permitted.

Re: possible reasons for exception "Cannot move input watermark time backwards from"

Posted by Amit Sela <am...@gmail.com>.
Hi Rico,

Batch sort of uses Watermarks by noting the "start watermark"  at the
beginning of time, and the "end watermark" at the end of time (this is the
"overflow" you see), or in a more "Beam" way, the watermark at the
beginning is the start of time, and after processing all the elements the
watermark jumps to the end of time because we *know *there are no more
elements left to process.

Could you try 0.5.0-SNAPSHOT please ? there was a large refactor around
that area in the Spark runner, and release is on the way so 0.5.0 should be
available within a few days anyway.

Thanks,
Amit

On Thu, Feb 2, 2017 at 5:04 PM Bergmann, Rico (GfK External) <
Rico.Bergmann@ext.gfk.com> wrote:

> Hi @all!
>
>
>
> I’m using Beam 0.4.0 and only the batch processing features of it.
>
> While executing the pipeline I get an exception: Cannot move input
> watermark time backwards from 294247-01-09T04:00:54.775Z to
> -290308-12-21T19:59:05.225Z
>
> First, since I’m not using the streaming features I’m wondering about
> watermarks (but this may be an Beam internal thing, I don’t know).
>
> Second, the timestamp stated in the exception message is really weird and
> looks a bit like an overflow in a long value to me.
>
>
>
> Does anyone have a clue what the reason for this exception could be?
>
>
>
> Thanks,
>
> Rico.
>
>
>
>
>
> Full Stacktrace:
>
> 2017-02-02 14:31:24,863 ERROR [Driver] yarn.ApplicationMaster
> (Logging.scala:logError(95)) - User class threw exception:
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalStateException: Cannot move input watermark time backwards
> from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
>
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalStateException: Cannot move input watermark time backwards
> from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112)
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
>
>         at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110)
>
>         at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>         at java.lang.reflect.Method.invoke(Method.java:497)
>
>         at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
>
> Caused by: java.lang.IllegalStateException: Cannot move input watermark
> time backwards from 294247-01-09T04:00:54.775Z to
> -290308-12-21T19:59:05.225Z
>
>         at
> org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199)
>
>         at
> org.apache.beam.sdk.util.state.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:163)
>
>         at
> org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn.processElement(GroupAlsoByWindowsViaOutputBufferDoFn.java:89)
>
>         at
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.invokeProcessElement(SparkProcessContext.java:372)
>
>         at
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:335)
>
>         at
> com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
>
>         at
> com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
>
>         at
> org.apache.beam.runners.spark.translation.SparkProcessContext.callWithCtxt(SparkProcessContext.java:91)
>
>         at
> org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:75)
>
>         at
> org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:43)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>         at java.lang.Thread.run(Thread.java:745)
>
>
>
> ------------------------------
>
>
> GfK SE, Nuremberg, Germany, commercial register at the local court
> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
> David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf
> Klein-Bölting This email and any attachments may contain confidential or
> privileged information. Please note that unauthorized copying, disclosure
> or distribution of the material in this email is not permitted.
>