You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/29 02:05:01 UTC

[jira] [Resolved] (SPARK-27828) spark job hangs when kryo.serializers.FieldSerializer is called under multi-executor-cores settings

     [ https://issues.apache.org/jira/browse/SPARK-27828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-27828.
----------------------------------
    Resolution: Cannot Reproduce

> spark job hangs when kryo.serializers.FieldSerializer is called under multi-executor-cores settings
> ---------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-27828
>                 URL: https://issues.apache.org/jira/browse/SPARK-27828
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.6.0
>            Reporter: Itsuki Toyota
>            Priority: Major
>
> Hi,
> I found that spark job hangs when kryo.serializers.FieldSerializer is called under multi-executor-cores settings.
> Concretely to say, when I try to load facts (e.g., <Obama> <wasBornIn> <America>) from gzipped .nt text file and convert it to RDD[Triple] [0] and evaluate it, spark job hangs under specific conditions.
> A reproducible procedure is as follows:
> 1) Create .nt file
> {code:java}
> # BIBM deta generator ( https://sourceforge.net/projects/bsbmtools/files/bsbmtools/bsbmtools-0.2/bsbmtools-v0.2.zip/download )
> $ ./generate -fc -s nt -fn dataset_10M -pc 28480{code}
> 2) Compress .nt file
> {code:java}
> $ spark-shell
> > import org.apache.hadoop.io.compress.GzipCodec
> > sc.textFile("dataset_10M.nt").repartition(100).saveAsTextFile("dataset_10M_gzip_100", classOf[GzipCodec]){code}
> 3) Load the .nt file and evaluate (i.e., RDD.count) it after repartition
> Code:
> {code:java}
>  
> package jp.hang.spark
> import java.io.{StringReader, ByteArrayInputStream}
> import org.apache.spark.{SparkConf, SparkContext}
> import org.apache.spark.rdd.RDD
> import org.apache.jena.graph.Triple
> import org.apache.jena.rdf.model.{Model, ModelFactory}
> object MyTest {
> private def argParser(args: Array[String]): ArgConfig = {
> val optionParser = new scopt.OptionParser[ArgConfig]("spark-submit <spark commands>") {
> head("AAAA")
> opt[String]('i', "nt-path") required() valueName "<Path>" action {
> (x, c) => c.copy(ntPath = x)
> } text "Path to the ntriple"
> }
> optionParser.parse(args, ArgConfig()) match {
> case Some(config) => config
> case None => sys.exit(-1) // arguments are bad, error message will have been displayed
> }
> }
> private case class ArgConfig(ntPath: String = "")
> def main(args: Array[String]) {
> val arguments: ArgConfig = argParser(args)
> val conf = new SparkConf().setAppName("MyTest")
> val sc = new SparkContext(conf)
> val rawTriples: RDD[String] = sc.textFile(arguments.ntPath)
> convertRawTriple(rawTriples).repartition(5000).count
> }
> private def convertRawTriple(rawTriples: RDD[String]): RDD[Triple] = {
> rawTriples.mapPartitions { case iter=>
> iter.map { case tripleText =>
> val model: Model = ModelFactory.createDefaultModel
> val r: StringReader = new StringReader(tripleText)
> model.read(r, null, "N-TRIPLE") //scalastyle:ignore null
> val stmt = model.listStatements.next()
> val triple = Triple.create(stmt.getSubject.asNode, stmt.getPredicate.asNode, stmt.getObject.asNode)
> r.close
> model.close
> triple
> }
> }
> }
> }{code}
>  
> These commands show that when executor-cores is 1 spark could finish the job; but when executore-cores is 5 spark hangs regardless of how the input file is stored (e.g., compressed by gzip, uncompressed "as-is"):
> {code:java}
> $ spark-submit --executor-cores 1 --num-executors 50 --driver-memory 12G --executor-memory 3G spark_hangtest-assembly-0.1-SNAPSHOT.jar --nt-path dataset_10M.nt # ok
> $ spark-submit --executor-cores 5 --num-executors 50 --driver-memory 12G --executor-memory 3G spark_hangtest-assembly-0.1-SNAPSHOT.jar --nt-path dataset_10M.nt # hang
> $ spark-submit --executor-cores 1 --num-executors 50 --driver-memory 12G --executor-memory 3G spark_hangtest-assembly-0.1-SNAPSHOT.jar --nt-path dataset_10M_gzip_100 # ok
> $ spark-submit --executor-cores 5 --num-executors 50 --driver-memory 12G --executor-memory 3G spark_hangtest-assembly-0.1-SNAPSHOT.jar --nt-path dataset_10M_gzip_100 # hang{code}
> When spark hangs, jstack shows that kryo.serializers.FieldSerializer couldn't finish its task:
> {code:java}
> "Executor task launch worker-9" #164 daemon prio=5 os_prio=0 tid=0x00007fcb2cd5a000 nid=0x3416 in Object.wait() [0x00007fcaf0f48000]
>  java.lang.Thread.State: RUNNABLE
>  at sun.reflect.GeneratedSerializationConstructorAccessor325.newInstance(Unknown Source)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>  at org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:56)
>  at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065)
>  at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228)
>  at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217)
>  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>  at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>  at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>  at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>  at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>  at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>  at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)e
>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>  at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228)
>  at org.apache.spark.serializer.DeserializationStream.readValue(Serializer.scala:171)
>  at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:201)
>  at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:198)
>  at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>  at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>  at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>  at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:152)
>  at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1196)
>  at org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply$mcV$sp(DiskStore.scala:81)
>  at org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply(DiskStore.scala:81)
>  at org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply(DiskStore.scala:81)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251)
>  at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:82)
>  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:798)
>  - locked <0x00000007b736da08> (a org.apache.spark.storage.BlockInfo)
>  at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:645)
>  at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153)
>  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>  at org.apache.spark.scheduler.Task.run(Task.scala:89)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x00000007b83e8768> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> {code}
> –
> I also found that spark doesn't hang when I use serializer other than kryo.serializers.FieldSerializer.
>  For example, this serializer is based on STriple [1]:
> {code:java}
> class TripleSerializer extends KryoSerializer[Triple] {
>  override def write(kryo: Kryo, output: Output, triple: Triple) {
>  val protocol: TProtocol = TRDF.protocol(output);
>  val tterm: RDF_Term = new RDF_Term();
>  SerializerRDF.write(protocol, tterm, triple.getSubject);
>  SerializerRDF.write(protocol, tterm, triple.getPredicate);
>  SerializerRDF.write(protocol, tterm, triple.getObject);
>  TRDF.flush(protocol);
>  }
> override def read(kryo: Kryo, input: Input, objClass: Class[Triple]): Triple = {
>  val protocol: TProtocol = TRDF.protocol(input);
>  val tterm: RDF_Term = new RDF_Term();
>  val s: Node = SerializerRDF.read(protocol, tterm);
>  val p: Node = SerializerRDF.read(protocol, tterm);
>  val o: Node = SerializerRDF.read(protocol, tterm);
>  Triple.create(s, p, o);
>  }
> }
> {code}
>  
> and SNode [2]:
> {code:java}
> class NodeSerializer extends KryoSerializer[Node] {
>  override def write(kryo: Kryo, output: Output, obj: Node) {
>  output.writeString(FmtUtils.stringForNode(obj))
>  }
> override def read(kryo: Kryo, input: Input, objClass: Class[Node]): Node = {
>  val s = input.readString
>  RiotLib.parse(s)
>  }
> }
> {code}
>  
> Cheers,
> [0] Jena Triple (I used version 3.6.0)
>  [https://jena.apache.org/documentation/javadoc/jena/org/apache/jena/graph/Triple.html]
> [1] [https://github.com/apache/jena/blob/jena-3.6.0/jena-arq/src/main/java/org/apache/jena/riot/system/STriple.java]
>  [2] [https://github.com/apache/jena/blob/jena-3.6.0/jena-arq/src/main/java/org/apache/jena/riot/system/SNode.java]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org