You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Palash Gupta <sp...@yahoo.com.INVALID> on 2017/02/08 06:36:14 UTC

[Spark 2.0.0] java.util.concurrent.TimeoutException while writing to mongodb from Spark

Hi All,
I'm writing  data frame to mongodb using Stratio/Spark-MongoDB  
Initially it was working fine but when the data volume is high then it started giving me subjected error and details are as follows. 

Could anybody help me out or suggest what might the solution I should apply or how can I increase the timeout value? My cluster setup:
The driver and executor are running in same VM - local[5] modespark.driver.memory 50g
Mongodb: 3.2.10Imported Package:  --packages com.stratio.datasource:spark-mongodb_2.11:0.12.0

Details Log:
17/02/08 07:03:51 INFO scheduler.DAGScheduler: Job 93 failed: foreachPartition at MongodbDataFrame.scala:37, took 39.026989 s
17/02/08 07:03:51 INFO executor.Executor: Finished task 182.0 in stage 253.0 (TID 25297). 60483 bytes result sent to driver
17/02/08 07:03:51 INFO executor.Executor: Executor killed task 185.0 in stage 253.0 (TID 25300)
17/02/08 07:03:51 INFO scheduler.TaskSetManager: Finished task 182.0 in stage 253.0 (TID 25297) in 3797 ms on localhost (183/200)
17/02/08 07:03:51 WARN scheduler.TaskSetManager: Lost task 185.0 in stage 253.0 (TID 25300, localhost): TaskKilled (killed intentional
ly)
17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Getting 3 non-empty blocks out of 200 blocks
17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Getting 8 non-empty blocks out of 200 blocks
17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/02/08 07:03:51 INFO executor.Executor: Executor killed task 186.0 in stage 253.0 (TID 25301)
17/02/08 07:03:51 WARN scheduler.TaskSetManager: Lost task 186.0 in stage 253.0 (TID 25301, localhost): TaskKilled (killed intentional
ly)
[INFO] [02/08/2017 07:03:51.283] [mongodbClientFactory-akka.actor.default-dispatcher-4] [akka://mongodbClientFactory/deadLetters] Mess
age [com.stratio.datasource.mongodb.client.MongodbClientActor$ClientResponse] from Actor[akka://mongodbClientFactory/user/mongoConnect
ionActor#1265577515] to Actor[akka://mongodbClientFactory/deadLetters] was not delivered. [1] dead letters encountered. This logging c
an be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 200 blocks
17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Getting 7 non-empty blocks out of 200 blocks
17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/02/08 07:03:51 INFO executor.Executor: Executor killed task 187.0 in stage 253.0 (TID 25302)
17/02/08 07:03:51 WARN scheduler.TaskSetManager: Lost task 187.0 in stage 253.0 (TID 25302, localhost): TaskKilled (killed intentional
ly)
17/02/08 07:03:51 INFO executor.Executor: Executor killed task 183.0 in stage 253.0 (TID 25298)
17/02/08 07:03:51 WARN scheduler.TaskSetManager: Lost task 183.0 in stage 253.0 (TID 25298, localhost): TaskKilled (killed intentional
ly)
17/02/08 07:03:51 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 253.0, whose tasks have all completed, from pool
Traceback (most recent call last):
  File "/home/hadoop/development/myprogram/datareload_myprogram.py", line 1188, in <module>
    datareporcessing(expected_datetime,expected_directory_hdfs,sqlContext)
  File "/home/hadoop/development/myprogram/datareload_nokialte.py", line 935, in datareporcessing
    df_nokia_myprogram_kpi_ready_raw.write.format("com.stratio.datasource.mongodb").mode('append').options(host='10.15.187.74:27017', cred
entials='parsdev,parsdb,XXXX', database='DB', collection='MY_G_N_LN_HR', connectionsTime='300000', updateFields='
S_DATETIME,CM_SBTS,CM_LNBTS,CM_LNCEL').save()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 530, in save
  File "/usr/local/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/local/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 312, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o839.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 184 in stage 253.0 failed 1 times, most recent failure: Lost
 task 184.0 in stage 253.0 (TID 25299, localhost): java.util.concurrent.TimeoutException: Futures timed out after [3 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:190)
        at com.stratio.datasource.mongodb.client.MongodbClientFactory$.getClient(MongodbClientFactory.scala:103)
        at com.stratio.datasource.mongodb.writer.MongodbWriter.saveWithPk(MongodbWriter.scala:63)
        at com.stratio.datasource.mongodb.MongodbDataFrame$$anonfun$saveToMongodb$1.apply(MongodbDataFrame.scala:42)
        at com.stratio.datasource.mongodb.MongodbDataFrame$$anonfun$saveToMongodb$1.apply(MongodbDataFrame.scala:37)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:85)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.s
cala:1450)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:883)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:881)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
        at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:881)
        at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2117)
        at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2117)
        at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2117)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
        at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
        at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2116)
        at com.stratio.datasource.mongodb.MongodbDataFrame.saveToMongodb(MongodbDataFrame.scala:37)
        at com.stratio.datasource.mongodb.MongodbRelation.insert(MongodbRelation.scala:106)
        at com.stratio.datasource.mongodb.DefaultSource.createRelation(DefaultSource.scala:59)
        at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:429)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:280)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:211)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [3 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:190)
        at com.stratio.datasource.mongodb.client.MongodbClientFactory$.getClient(MongodbClientFactory.scala:103)
        at com.stratio.datasource.mongodb.writer.MongodbWriter.saveWithPk(MongodbWriter.scala:63)
        at com.stratio.datasource.mongodb.MongodbDataFrame$$anonfun$saveToMongodb$1.apply(MongodbDataFrame.scala:42)
        at com.stratio.datasource.mongodb.MongodbDataFrame$$anonfun$saveToMongodb$1.apply(MongodbDataFrame.scala:37)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:85)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        ... 1 more

17/02/08 07:03:51 INFO spark.SparkContext: Invoking stop() from shutdown hook





Thanks & Best Regards,
Palash Gupta