You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Xusen Yin (JIRA)" <ji...@apache.org> on 2016/02/04 01:09:39 UTC

[jira] [Created] (SPARK-13178) RRDD faces with concurrency issue in case of rdd.zip(rdd).count()

Xusen Yin created SPARK-13178:
---------------------------------

             Summary: RRDD faces with concurrency issue in case of rdd.zip(rdd).count()
                 Key: SPARK-13178
                 URL: https://issues.apache.org/jira/browse/SPARK-13178
             Project: Spark
          Issue Type: Bug
          Components: SparkR
            Reporter: Xusen Yin


In Kmeans algorithm, there is a zip operation in taking samples, i.e. https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L355, which can be simplified in the following code:

{code:title=zip.scala|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
val rdd =  ...
val rdd2 = rdd.map(x => x)
rdd.zip(rdd2).count()
{code}

However, RRDD fails on this operation with an error of "can only zip rdd with same number of elements" or "stream closed", similar to the JIRA issue: https://issues.apache.org/jira/browse/SPARK-2251

Inside RRDD, a data stream is used to ingest data from the R side. In the zip operation, zip with self computes each partition twice. So if we zip a HadoopRDD with itself, we get 

{code:title=log-from-zip-HadoopRDD|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
we get a pair (6.8, 6.8)
we get a pair (5.1, 5.1)
we get a pair (6.7, 6.7)
we get a pair (4.9, 4.9)
we get a pair (6.0, 6.0)
we get a pair (4.7, 4.7)
we get a pair (5.7, 5.7)
we get a pair (4.6, 4.6)
we get a pair (5.5, 5.5)
we get a pair (5.0, 5.0)
we get a pair (5.5, 5.5)
we get a pair (5.4, 5.4)
we get a pair (5.8, 5.8)
we get a pair (4.6, 4.6)
we get a pair (6.0, 6.0)
we get a pair (5.0, 5.0)
we get a pair (5.4, 5.4)
we get a pair (4.4, 4.4)
we get a pair (6.0, 6.0)
we get a pair (4.9, 4.9)
we get a pair (6.7, 6.7)
we get a pair (5.4, 5.4)
we get a pair (6.3, 6.3)
we get a pair (4.8, 4.8)
we get a pair (5.6, 5.6)
we get a pair (4.8, 4.8)
we get a pair (5.5, 5.5)
we get a pair (4.3, 4.3)
we get a pair (5.5, 5.5)
we get a pair (5.8, 5.8)
we get a pair (6.1, 6.1)
we get a pair (5.7, 5.7)
we get a pair (5.8, 5.8)
we get a pair (5.4, 5.4)
we get a pair (5.0, 5.0)
we get a pair (5.1, 5.1)
we get a pair (5.6, 5.6)
we get a pair (5.7, 5.7)
we get a pair (5.7, 5.7)
we get a pair (5.1, 5.1)
we get a pair (5.7, 5.7)
we get a pair (5.4, 5.4)
we get a pair (6.2, 6.2)
we get a pair (5.1, 5.1)
we get a pair (5.1, 5.1)
we get a pair (4.6, 4.6)
we get a pair (5.7, 5.7)
we get a pair (5.1, 5.1)
we get a pair (6.3, 6.3)
we get a pair (4.8, 4.8)
we get a pair (5.8, 5.8)
we get a pair (5.0, 5.0)
we get a pair (7.1, 7.1)
we get a pair (5.0, 5.0)
we get a pair (6.3, 6.3)
we get a pair (5.2, 5.2)
we get a pair (6.5, 6.5)
we get a pair (5.2, 5.2)
we get a pair (7.6, 7.6)
we get a pair (4.7, 4.7)
we get a pair (4.9, 4.9)
we get a pair (4.8, 4.8)
we get a pair (7.3, 7.3)
we get a pair (5.4, 5.4)
we get a pair (6.7, 6.7)
we get a pair (5.2, 5.2)
we get a pair (7.2, 7.2)
we get a pair (5.5, 5.5)
we get a pair (6.5, 6.5)
we get a pair (4.9, 4.9)
we get a pair (6.4, 6.4)
we get a pair (5.0, 5.0)
we get a pair (6.8, 6.8)
we get a pair (5.5, 5.5)
we get a pair (5.7, 5.7)
we get a pair (4.9, 4.9)
we get a pair (5.8, 5.8)
we get a pair (4.4, 4.4)
we get a pair (6.4, 6.4)
we get a pair (5.1, 5.1)
we get a pair (6.5, 6.5)
we get a pair (5.0, 5.0)
we get a pair (7.7, 7.7)
we get a pair (4.5, 4.5)
we get a pair (7.7, 7.7)
we get a pair (4.4, 4.4)
we get a pair (6.0, 6.0)
we get a pair (5.0, 5.0)
we get a pair (6.9, 6.9)
we get a pair (5.1, 5.1)
we get a pair (5.6, 5.6)
we get a pair (4.8, 4.8)
we get a pair (7.7, 7.7)
we get a pair (6.3, 6.3)
we get a pair (5.1, 5.1)
we get a pair (6.7, 6.7)
we get a pair (4.6, 4.6)
we get a pair (7.2, 7.2)
we get a pair (5.3, 5.3)
we get a pair (6.2, 6.2)
we get a pair (5.0, 5.0)
we get a pair (6.1, 6.1)
we get a pair (7.0, 7.0)
we get a pair (6.4, 6.4)
we get a pair (6.4, 6.4)
we get a pair (7.2, 7.2)
we get a pair (6.9, 6.9)
we get a pair (7.4, 7.4)
we get a pair (5.5, 5.5)
we get a pair (7.9, 7.9)
we get a pair (6.5, 6.5)
we get a pair (6.4, 6.4)
we get a pair (5.7, 5.7)
we get a pair (6.3, 6.3)
we get a pair (6.3, 6.3)
we get a pair (6.1, 6.1)
we get a pair (4.9, 4.9)
we get a pair (7.7, 7.7)
we get a pair (6.6, 6.6)
we get a pair (6.3, 6.3)
we get a pair (5.2, 5.2)
we get a pair (6.4, 6.4)
we get a pair (5.0, 5.0)
we get a pair (6.0, 6.0)
we get a pair (5.9, 5.9)
we get a pair (6.9, 6.9)
we get a pair (6.0, 6.0)
we get a pair (6.7, 6.7)
we get a pair (6.1, 6.1)
we get a pair (6.9, 6.9)
we get a pair (5.6, 5.6)
we get a pair (5.8, 5.8)
we get a pair (6.7, 6.7)
we get a pair (6.8, 6.8)
we get a pair (5.6, 5.6)
we get a pair (6.7, 6.7)
we get a pair (5.8, 5.8)
we get a pair (6.7, 6.7)
we get a pair (6.2, 6.2)
we get a pair (6.3, 6.3)
we get a pair (5.6, 5.6)
we get a pair (6.5, 6.5)
we get a pair (5.9, 5.9)
we get a pair (6.2, 6.2)
we get a pair (6.1, 6.1)
we get a pair (5.9, 5.9)
we get a pair (6.3, 6.3)
we get a pair (6.1, 6.1)
we get a pair (6.4, 6.4)
we get a pair (6.6, 6.6)
{code}

However, in RRDD with the same setting we get:

{code:title=log-from-zip-RRDD|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
we get a pair (5.1, 5.1)
we get a pair (4.9, 4.7)
we get a pair (4.6, 5.0)
we get a pair (5.4, 4.6)
we get a pair (5.0, 4.4)
we get a pair (4.9, 5.4)
we get a pair (4.8, 4.8)
we get a pair (4.3, 5.8)
we get a pair (5.7, 5.4)
we get a pair (5.1, 5.7)
we get a pair (5.1, 5.4)
we get a pair (5.1, 4.6)
we get a pair (5.1, 4.8)
we get a pair (5.0, 5.0)
we get a pair (5.2, 5.2)
we get a pair (4.7, 4.8)
we get a pair (5.4, 5.2)
we get a pair (5.5, 4.9)
we get a pair (5.0, 5.5)
we get a pair (4.9, 4.4)
we get a pair (5.1, 5.0)
we get a pair (4.5, 4.4)
we get a pair (5.0, 5.1)
we get a pair (4.8, 5.1)
we get a pair (4.6, 5.3)
we get a pair (5.0, 7.0)
we get a pair (6.4, 6.9)
we get a pair (5.5, 6.5)
we get a pair (5.7, 6.3)
we get a pair (4.9, 6.6)
we get a pair (5.2, 5.0)
we get a pair (5.9, 6.0)
we get a pair (6.1, 5.6)
we get a pair (6.7, 5.6)
we get a pair (5.8, 6.2)
we get a pair (5.6, 5.9)
we get a pair (6.1, 6.3)
we get a pair (6.1, 6.4)
we get a pair (6.6, 6.8)
we get a pair (6.7, 6.0)
we get a pair (5.7, 5.5)
we get a pair (5.5, 5.8)
we get a pair (6.0, 5.4)
we get a pair (6.0, 6.7)
we get a pair (6.3, 5.6)
we get a pair (5.5, 5.5)
we get a pair (6.1, 5.8)
we get a pair (5.0, 5.6)
we get a pair (5.7, 5.7)
we get a pair (6.2, 5.1)
we get a pair (5.7, 6.3)
we get a pair (5.8, 7.1)
we get a pair (6.3, 6.5)
we get a pair (7.6, 4.9)
we get a pair (7.3, 6.7)
we get a pair (7.2, 6.5)
we get a pair (6.4, 6.8)
we get a pair (5.7, 5.8)
we get a pair (6.4, 6.5)
we get a pair (7.7, 7.7)
we get a pair (6.0, 6.9)
we get a pair (5.6, 7.7)
we get a pair (6.3, 6.7)
we get a pair (7.2, 6.2)
we get a pair (6.1, 6.4)
we get a pair (7.2, 7.4)
we get a pair (7.9, 6.4)
we get a pair (6.3, 6.1)
we get a pair (7.7, 6.3)
we get a pair (6.4, 6.0)
we get a pair (6.9, 6.7)
we get a pair (6.9, 5.8)
we get a pair (6.8, 6.7)
we get a pair (6.7, 6.3)
we get a pair (6.5, 6.2)
we need to close stream java.io.DataInputStream@507affd3 in thread 127
we need to close stream java.io.DataInputStream@507affd3 in thread 127
{code}

We can see from the end of the log that the data stream is closed twice.

The simplest way to avoid the error is using "cache" to cut off the lineage. However, sometimes we do not want to cache the data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org