You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by James Hammerton <ja...@gluru.co> on 2016/02/23 19:22:32 UTC

Count job stalling at shuffle stage on 3.4TB input (but only 5.3GB shuffle write)

Hi,

I have been having problems processing a 3.4TB data set - uncompressed tab
separated text - containing object creation/update events from our system,
one event per line.

I decided to see what happens with a count of the number of events (=
number of lines in the text files) and a count of the number of distinct
object ids, which I thought should be straightforward enough to succeed.

The job stalled at the end of the first stage (55657 tasks, albeit 1 failed
but I've seen processing continue to the next stage despite small numbers
of failures) despite only generating a 5.3GB shuffle. It ran for 2.5 hours
and is now sitting apparently doing nothing.

Does this suggest something is wrong with the cluster? Computing either
event count should be straightforward despite the size of the data set, or
am I missing something?

The set up is a spark-ec2 generated cluster (trying EMR will be my next
move, along with bucketing the data via parquet)  running Spark 1.5.2,
openjdk 8 (this is a scala job though, but others are java), r3.2xlarge
instance types, 5 slaves each with 500GB EBS volumes which SPARK_LOCAL_DIRS
points to.

The code is:

    val sc = new SparkContext(conf);
>     try {
>       val rawSchema = StructType(Array(
>         StructField("objectId", DataTypes.StringType, true),
>         StructField("eventName", DataTypes.StringType, true),
>         StructField("eventJson", DataTypes.StringType, true),
>         StructField("timestampNanos", DataTypes.StringType, true)))
>       val sqlContext = new SQLContext(sc)
>       val df = sqlContext.read
>         .format("com.databricks.spark.csv")
>         .option("header", "false")
>         .option("delimiter", "\t")
>         .schema(rawSchema)
>         .load(inputPath)
>       val oids = df.select("objectId")
>       val distinct = oids.distinct.count
>       val events = oids.count
>       println("Number of objectIds: " + distinct);
>       println("Number of events: " + events);
>       println("Elapsed time: " + (System.currentTimeMillis() -
> startMillis)/1000 + "s")


Here's the plan as revealed by the SQL part of the UI:

== Parsed Logical Plan ==
> Aggregate [count(1) AS count#4L]
>  Aggregate [objectId#0], [objectId#0]
>   Project [objectId#0]
>    Relation[objectId#0,eventName#1,eventJson#2,timestampNanos#3] CsvRelation(<function0>,Some(s3n://gluru-research/data/events.prod.2016-02-04/extractedEventsUncompressed),false,	,",null,#,PERMISSIVE,COMMONS,false,false,false,StructType(StructField(objectId,StringType,true), StructField(eventName,StringType,true), StructField(eventJson,StringType,true), StructField(timestampNanos,StringType,true)),false,null)
>
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#4L]
>  Aggregate [objectId#0], [objectId#0]
>   Project [objectId#0]
>    Relation[objectId#0,eventName#1,eventJson#2,timestampNanos#3] CsvRelation(<function0>,Some(s3n://gluru-research/data/events.prod.2016-02-04/extractedEventsUncompressed),false,	,",null,#,PERMISSIVE,COMMONS,false,false,false,StructType(StructField(objectId,StringType,true), StructField(eventName,StringType,true), StructField(eventJson,StringType,true), StructField(timestampNanos,StringType,true)),false,null)
>
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#4L]
>  Aggregate [objectId#0]
>   Project [objectId#0]
>    Relation[objectId#0,eventName#1,eventJson#2,timestampNanos#3] CsvRelation(<function0>,Some(s3n://gluru-research/data/events.prod.2016-02-04/extractedEventsUncompressed),false,	,",null,#,PERMISSIVE,COMMONS,false,false,false,StructType(StructField(objectId,StringType,true), StructField(eventName,StringType,true), StructField(eventJson,StringType,true), StructField(timestampNanos,StringType,true)),false,null)
>
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#4L])
>  TungstenExchange SinglePartition
>   TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#7L])
>    TungstenAggregate(key=[objectId#0], functions=[], output=[])
>     TungstenExchange hashpartitioning(objectId#0)
>      TungstenAggregate(key=[objectId#0], functions=[], output=[objectId#0])
>       Scan CsvRelation(<function0>,Some(s3n://gluru-research/data/events.prod.2016-02-04/extractedEventsUncompressed),false,	,",null,#,PERMISSIVE,COMMONS,false,false,false,StructType(StructField(objectId,StringType,true), StructField(eventName,StringType,true), StructField(eventJson,StringType,true), StructField(timestampNanos,StringType,true)),false,null)[objectId#0]
>
> Code Generation: true
>
>
Regards,

James