You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Arvid Heise (JIRA)" <ji...@apache.org> on 2017/04/27 12:32:04 UTC

[jira] [Created] (BEAM-2095) SourceRDD hasNext not idempotent

Arvid Heise created BEAM-2095:
---------------------------------

             Summary: SourceRDD hasNext not idempotent
                 Key: BEAM-2095
                 URL: https://issues.apache.org/jira/browse/BEAM-2095
             Project: Beam
          Issue Type: Bug
          Components: runner-spark
    Affects Versions: 0.6.0
            Reporter: Arvid Heise
            Assignee: Amit Sela


When reading an Avro from HDFS with the new HDFSFileSource, we experience the following exceptions:

{code}
17/04/27 11:48:38 ERROR executor.Executor: Exception in task 2.0 in stage 1.0 (TID 32)
java.util.NoSuchElementException
	at com.gfk.hyperlane.engine.target_group_evaluation.dataset.HDFSFileSource$HDFSFileReader.getCurrent(HDFSFileSource.java:498)
	at org.apache.beam.runners.spark.io.SourceRDD$Bounded$1.next(SourceRDD.java:142)
	at org.apache.beam.runners.spark.io.SourceRDD$Bounded$1.next(SourceRDD.java:111)
	at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
	at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
	at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:30)
	at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:165)
	at org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
	at org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
	at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162)
	at org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
	at org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
	at org.apache.beam.runners.spark.translation.SparkProcessContext.processPartition(SparkProcessContext.java:64)
	at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:105)
	at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:48)
	at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
	at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
{code}

The error comes from a call to BoundedReader#getCurrent after it has been closed.

We logged the following call patterns:
(for data)
  advance
  getCurrent
(when drained)
advance
  close
getCurrent

The issue probably comes from the implementation in SourceRDD 
https://github.com/apache/beam/blob/3101e69c438d5c42577fc7d3476d623f6e551837/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java#L145

A repeated call to hasNext will result in repeated calls of advance. This results in a data loss and may return different results. In particular, it may cause the issue as observed.

The usual solution is to use hasNext() to already retrieve and cache the next element if cache empty and return and reset the cache in next().



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)