You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Eemil Lagerspetz (JIRA)" <ji...@apache.org> on 2014/08/04 14:44:11 UTC

[jira] [Comment Edited] (SPARK-2579) Reading from S3 returns an inconsistent number of items with Spark 0.9.1

    [ https://issues.apache.org/jira/browse/SPARK-2579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14083484#comment-14083484 ] 

Eemil Lagerspetz edited comment on SPARK-2579 at 8/4/14 12:42 PM:
------------------------------------------------------------------

Hi,
I have tried with 1.0.1. Result is the same, not the right amount of partitions:
Row item counts: 999912

This happens only with a recently saved set of data. For an older set, it works fine.
If the data set is less than 8 hours old (maybe it also happens with a little older set), then the read result is not consistent.
This does not happen when using distcp.

Some additional notes:
I am starting a Spot cluster of 10 slaves, m4.4xlarge, on us-west-2 using the spark-ec2 scripts.  I ssh to the master and copy-dir the code to every machine. I then run my code on the master.
I use Maven appassembler to set the classpath. So, I have parallel accesses to S3, maybe you can try with a cluster instead of a single machine? Thanks.


was (Author: lagerspetz):
Hi,
I have tried with 1.0.1. Result is the same, not the right amount of partitions:
Row item counts: 999912

This happens only with a recently saved set of data. For an older set, it works fine.
If the data set is less than 8 hours old (maybe it also happens with a little older set), then the read result is not consistent.
This does not happen when using distcp.


> Reading from S3 returns an inconsistent number of items with Spark 0.9.1
> ------------------------------------------------------------------------
>
>                 Key: SPARK-2579
>                 URL: https://issues.apache.org/jira/browse/SPARK-2579
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output
>    Affects Versions: 0.9.1
>            Reporter: Eemil Lagerspetz
>            Priority: Critical
>              Labels: hdfs, read, s3, skipping
>
> I have created a random matrix of 1M rows with 10K items on each row, semicolon-separated. While reading it with Spark 0.9.1 and doing a count, I consistently get less than 1M rows, and a different number every time at that ( !! ). Example below:
> head -n 1 tool-generate-random-matrix*log
> ==> tool-generate-random-matrix-999158.log <==
> Row item counts: 999158
> ==> tool-generate-random-matrix.log <==
> Row item counts: 997163
> The data is split into 1000 partitions. When I download it using s3cmd sync, and run the following AWK on it, I get the correct number of rows in each partition (1000x1000 = 1M). What is up?
> {code:title=checkrows.sh|borderStyle=solid}
> for k in part-0*
> do
>   echo $k
>   awk -F ";" '
>     NF != 10000 {
>       print "Wrong number of items:",NF
>     }
>     END {
>       if (NR != 1000) {
>         print "Wrong number of rows:",NR
>       }
>     }' "$k"
> done
> {code}
> The matrix generation and counting code is below:
> {code:title=Matrix.scala|borderStyle=solid}
> package fi.helsinki.cs.nodes.matrix
> import java.util.Random
> import org.apache.spark._
> import org.apache.spark.SparkContext._
> import scala.collection.mutable.ListBuffer
> import org.apache.spark.rdd.RDD
> import org.apache.spark.storage.StorageLevel._
> object GenerateRandomMatrix {
>   def NewGeMatrix(rSeed: Int, rdd: RDD[Int], features: Int) = {
>     rdd.mapPartitions(part => part.map(xarr => {
>         val rdm = new Random(rSeed + xarr)
>         val arr = new Array[Double](features)
>         for (i <- 0 until features)
>           arr(i) = rdm.nextDouble()
>         new Row(xarr, arr)
>       }))
>   }
>   case class Row(id: Int, elements: Array[Double]) {}
>   def rowFromText(line: String) = {
>     val idarr = line.split(" ")
>     val arr = idarr(1).split(";")
>     // -1 to fix saved matrix indexing error
>     new Row(idarr(0).toInt-1, arr.map(_.toDouble))
>   }
>   def main(args: Array[String]) {
>     val master = args(0)
>     val tasks = args(1).toInt
>     val savePath = args(2)
>     val read = args.contains("read")
>     
>     val datapoints = 1000000
>     val features = 10000
>     val sc = new SparkContext(master, "RandomMatrix")
>     if (read) {
>       val randomMatrix: RDD[Row] = sc.textFile(savePath, tasks).map(rowFromText).persist(MEMORY_AND_DISK)
>       println("Row item counts: "+ randomMatrix.count)
>     } else {
>       val rdd = sc.parallelize(0 until datapoints, tasks)
>       val bcSeed = sc.broadcast(128)
>       /* Generating a matrix of random Doubles */
>       val randomMatrix = NewGeMatrix(bcSeed.value, rdd, features).persist(MEMORY_AND_DISK)
>       randomMatrix.map(row => row.id + " " + row.elements.mkString(";")).saveAsTextFile(savePath)
>     }
>     
>     sc.stop
>   }
> }
> {code}
> I run this with:
> appassembler/bin/tool-generate-random-matrix master 1000 s3n://keys@path/to/data 1>matrix.log 2>matrix.err
> Reading from HDFS gives the right count and right number of items on each row. However, I had to run with the full path with the server name, just /matrix does not work (it thinks I want file://):
> p="hdfs://ec2-54-188-6-77.us-west-2.compute.amazonaws.com:9000/matrix"
> appassembler/bin/tool-generate-random-matrix $( cat /root/spark-ec2/cluster-url ) 1000 "$p" read 1>readmatrix.log 2>readmatrix.err



--
This message was sent by Atlassian JIRA
(v6.2#6252)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org