You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Mikhailau, Alex" <Al...@mlb.com> on 2017/07/26 18:47:20 UTC

DStream Spark 2.1.1 Streaming on EMR at scale - long running job fails after two hours

Guys,

I am trying hard to make a DStream API Spark streaming job work on EMR. I’ve succeeded to the point of running it for a few hours with eventual failure which is when I start seeing some out of memory exception via “yarn logs” aggregate.

I am doing a JSON map and extraction of some fields via play-json in the map portion (mappedEvents)

val mappedEvents:DStream[(String, Iterable[Array[Byte]])] = {map json events keyed off of `user-id` }

val stateFulRDD = mappedEvents
.reduceByKeyAndWindow( (x: Iterable[Array[Byte]], y:Iterable[Array[Byte]]) => {
      val x1 = x.map(arrayToUserSession)
      val y1 = y.map(arrayToUserSession)
      val z = x1 ++ y1
      val now = System.currentTimeMillis()
      z.groupBy(_.psid).map(_._2.maxBy(_.lastTime))
        .filter(l =>  l.lastTime + eventValiditySeconds*1000 >= now)
          .map(userSessionToArray)
  }, windowSize, slidingInterval)
  .filter(_._2.size>1)
.mapWithState(stateSpec)

//doing sessionization where I keep last timestamp as the beginning of session via mapWithState for any session counts > 1 to make it use state API less frequently.

val stateSpec = StateSpec.function(updateUserEvents _).timeout(windowSize.times(1).plus(batchInterval))

def updateUserEvents(key: String,
                     newValue: Option[scala.collection.immutable.Iterable[Array[Byte]]],
                     state: State[UserSessions]): Option[UserSessions]


My window is 60 seconds and my slidingInterval is 10 seconds with batchInterval of 20 seconds

In my load test of 250K records per second (each record is around 1.6KB) in Kinesis Stream running on EMR 5.7.0 cluster on yarn with 25 core nodes of m4.2xlarge and a master of m4.4xlarge with plenty of EBS Storage sc1 attached (10TB), I cannot sustain load for longer than 2 hours. The cluster errors out.

This is my submit job parameters,

aws emr add-steps --cluster-id $CLUSTER_ID --steps Name=SessionCount,Jar=s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar,Args=[/usr/lib/spark/bin/spark-submit,--deploy-mode,cluster,--master,yarn,--conf,spark.streaming.stopGracefullyOnShutdown=true,--conf,spark.locality.wait=7500ms,--conf,spark.streaming.blockInterval=10000ms,--conf,spark.shuffle.consolidateFiles=true,--conf,spark.serializer=org.apache.spark.serializer.KryoSerializer,--conf,spark.closure.serializer=org.apache.spark.serializer.KryoSerializer,--conf,spark.dynamicAllocation.enabled=true,--conf,spark.scheduler.mode=FIFO,--conf,spark.ui.retainedJobs=50,--conf,spark.ui.retainedStages=50,--conf,spark.ui.retainedTasks=500,--conf,spark.worker.ui.retainedExecutors=50,--conf,spark.worker.ui.retainedDrivers=50,--conf,spark.sql.ui.retainedExecutions=50,--conf,spark.streaming.ui.retainedBatches=50,--conf,spark.rdd.compress=false,--conf,spark.yarn.executor.memoryOverhead=5120,--executor-memory,15G,--class,SessionCountX  - and job parameters follow

with env.json
{
{
  "Classification": "yarn-site",
  "Properties": {
    "yarn.log-aggregation-enable": "true",
    "yarn.log-aggregation.retain-seconds": "-1",
    "yarn.nodemanager.remote-app-log-dir": "s3:\/\/my-bucket-logs",
    "yarn.nodemanager.vmem.check.enabled": "false"
  }
},
{
  "Classification": "spark",
  "Properties": {
    "maximizeResourceAllocation": "true"
  }
}
}

Also, looking at the executors page of Spark UI, I see Input continuing to grow with time. I am not sure if the fact that user-id is UUID.random() in the load test is the cause of that and if I should load test with finite set of user-id’s for limited key-space in Spark but that is something I noticed. Shuffle read/write size normalizes eventually though and stays about the same.


The following exceptions are seen from a failed job:

17/07/26 07:13:51 WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(28,[Lscala.Tuple2;@2147b84f,BlockManagerId(28, ip-10-202-138-81.mlbam.qa.us-east-1.bamgrid.net, 38630, None))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval


17/07/26 07:13:51 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Thread-9,5,main]
java.lang.OutOfMemoryError: Java heap space
                at io.netty.buffer.UnpooledHeapByteBuf.<init>(UnpooledHeapByteBuf.java:45)
                at io.netty.buffer.UnpooledUnsafeHeapByteBuf.<init>(UnpooledUnsafeHeapByteBuf.java:29)
                at io.netty.buffer.UnpooledByteBufAllocator.newHeapBuffer(UnpooledByteBufAllocator.java:59)
                at io.netty.buffer.AbstractByteBufAllocator.heapBuffer(AbstractByteBufAllocator.java:158)
                at io.netty.buffer.AbstractByteBufAllocator.heapBuffer(AbstractByteBufAllocator.java:149)
                at io.netty.buffer.Unpooled.buffer(Unpooled.java:116)
                at org.apache.spark.network.shuffle.protocol.BlockTransferMessage.toByteBuffer(BlockTransferMessage.java:78)
                at org.apache.spark.network.netty.NettyBlockTransferService.uploadBlock(NettyBlockTransferService.scala:137)
                at org.apache.spark.network.BlockTransferService.uploadBlockSync(BlockTransferService.scala:121)
                at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$replicate(BlockManager.scala:1220)
                at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1067)
                at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
                at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
                at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
                at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:732)
                at org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:80)
                at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:158)
                at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:129)
                at org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:133)
                at org.apache.spark.streaming.kinesis.KinesisReceiver.org$apache$spark$streaming$kinesis$KinesisReceiver$$storeBlockWithRanges(KinesisReceiver.scala:282)
                at org.apache.spark.streaming.kinesis.KinesisReceiver$GeneratedBlockHandler.onPushBlock(KinesisReceiver.scala:352)
                at org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:297)
                at org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:269)
                at org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:110)
17/07/26 07:13:51 WARN KinesisRecordProcessor: No shardId for workerId ip-10-202-138-81.mlbam.qa.us-east-1.bamgrid.net:ce6e8bc3-d484-4d26-8bd1-168c933803e3?
17/07/26 07:13:51 WARN ReceiverSupervisorImpl: Skip stopping receiver because it has not yet stared
17/07/26 07:13:51 WARN ReceiverSupervisorImpl: Skip stopping receiver because it has not yet stared
17/07/26 07:13:51 WARN ReceiverSupervisorImpl: Skip stopping receiver because it has not yet stared
17/07/26 07:13:51 WARN ReceiverSupervisorImpl: Skip stopping receiver because it has not yet stared


# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill %p"
#   Executing /bin/sh -c "kill 27914"...
os::fork_and_exec failed: Cannot allocate memory (12)
End of LogType:stdout


What else can I do or look for? Any help would be greatly appreciated. Thank you.