You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Arvid Heise (JIRA)" <ji...@apache.org> on 2017/04/27 12:32:04 UTC
[jira] [Created] (BEAM-2095) SourceRDD hasNext not idempotent
Arvid Heise created BEAM-2095:
---------------------------------
Summary: SourceRDD hasNext not idempotent
Key: BEAM-2095
URL: https://issues.apache.org/jira/browse/BEAM-2095
Project: Beam
Issue Type: Bug
Components: runner-spark
Affects Versions: 0.6.0
Reporter: Arvid Heise
Assignee: Amit Sela
When reading an Avro from HDFS with the new HDFSFileSource, we experience the following exceptions:
{code}
17/04/27 11:48:38 ERROR executor.Executor: Exception in task 2.0 in stage 1.0 (TID 32)
java.util.NoSuchElementException
at com.gfk.hyperlane.engine.target_group_evaluation.dataset.HDFSFileSource$HDFSFileReader.getCurrent(HDFSFileSource.java:498)
at org.apache.beam.runners.spark.io.SourceRDD$Bounded$1.next(SourceRDD.java:142)
at org.apache.beam.runners.spark.io.SourceRDD$Bounded$1.next(SourceRDD.java:111)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:30)
at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:165)
at org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
at org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162)
at org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
at org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
at org.apache.beam.runners.spark.translation.SparkProcessContext.processPartition(SparkProcessContext.java:64)
at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:105)
at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:48)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}
The error comes from a call to BoundedReader#getCurrent after it has been closed.
We logged the following call patterns:
(for data)
advance
getCurrent
(when drained)
advance
close
getCurrent
The issue probably comes from the implementation in SourceRDD
https://github.com/apache/beam/blob/3101e69c438d5c42577fc7d3476d623f6e551837/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java#L145
A repeated call to hasNext will result in repeated calls of advance. This results in a data loss and may return different results. In particular, it may cause the issue as observed.
The usual solution is to use hasNext() to already retrieve and cache the next element if cache empty and return and reset the cache in next().
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)