You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Marco <ma...@gmail.com> on 2016/01/18 15:27:57 UTC

Calling SparkContext methods in scala Future

Hello,

I am using Spark 1.5.1 within SBT, and Scala 2.10.6 and I am facing an
issue with the SparkContext.

Basically, I have an object that needs to do several things:

- call an external service One (web api)
- call an external service Two (another api)
- read and produce an RDD from HDFS (Spark)
- parallelize the data obtained in the first two calls
- join these different rdds, do stuff with them...

Now, I am trying to do it in an asynchronous way. This doesn't seem to
work, though. My guess is that Spark doesn't see the calls to .parallelize,
as they are made in different tasks (or Future, therefore this code is
called before/later or maybe with an unset Context (can it be?)). I have
tried different ways, one of these being the call to SparkEnv.set in the
calls to flatMap and map (in the Future). However, all I get is Cannot call
methods on a stopped SparkContext. It just doesnt'work - maybe I just
misunderstood what it does, therefore I removed it.

This is the code I have written so far:

object Fetcher {

  def fetch(name, master, ...) = {
    val externalCallOne: Future[WSResponse] = externalService1()
    val externalCallTwo: Future[String] = externalService2()
    // val sparkEnv = SparkEnv.get
    val config = new SparkConf()
    .setAppName(name)
    .set("spark.master", master)
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val sparkContext = new SparkContext(config)
    //val sparkEnv = SparkEnv.get

    val eventuallyJoinedData = externalCallOne flatMap { dataOne =>
      // SparkEnv.set(sparkEnv)
      externalCallTwo map { dataTwo =>
        println("in map") // prints, so it gets here ...
        val rddOne = sparkContext.parallelize(dataOne)
        val rddTwo = sparkContext.parallelize(dataTwo)
        // do stuff here ... foreach/println, and

        val joinedData = rddOne leftOuterJoin (rddTwo)
      }
    }
    eventuallyJoinedData onSuccess { case success => ...  }
    eventuallyJoinedData onFailure { case error =>
println(error.getMessage) }
    // sparkContext.stop
  }

}
As you can see, I have also tried to comment the line to stop the context,
but then I get another issue:

13:09:14.929 [ForkJoinPool-1-worker-5] INFO  org.apache.spark.SparkContext
- Starting job: count at Fetcher.scala:38
13:09:14.932 [shuffle-server-0] DEBUG io.netty.channel.nio.NioEventLoop -
Selector.select() returned prematurely because
Thread.currentThread().interrupt() was called. Use
NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
13:09:14.936 [Spark Context Cleaner] ERROR org.apache.spark.ContextCleaner
- Error in cleaning thread
java.lang.InterruptedException: null
    at java.lang.Object.wait(Native Method) ~[na:1.8.0_65]
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
~[na:1.8.0_65]
    at
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:157)
~[spark-core_2.10-1.5.1.jar:1.5.1]
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
[spark-core_2.10-1.5.1.jar:1.5.1]
    at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
[spark-core_2.10-1.5.1.jar:1.5.1]
    at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
[spark-core_2.10-1.5.1.jar:1.5.1]
13:09:14.940 [db-async-netty-thread-1] DEBUG
io.netty.channel.nio.NioEventLoop - Selector.select() returned prematurely
because Thread.currentThread().interrupt() was called. Use
NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
13:09:14.943 [SparkListenerBus] ERROR org.apache.spark.util.Utils -
uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.InterruptedException: null
    at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
~[na:1.8.0_65]
    at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
~[na:1.8.0_65]
    at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
~[na:1.8.0_65]
    at
org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:65)
~[spark-core_2.10-1.5.1.jar:1.5.1]
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
~[spark-core_2.10-1.5.1.jar:1.5.1]
    at
org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)
[spark-core_2.10-1.5.1.jar:1.5.1]
13:09:14.949 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle -
stopping org.spark-project.jetty.server.Server@787cbcef
13:09:14.959 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle -
stopping SelectChannelConnector@0.0.0.0:4040
13:09:14.959 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle -
stopping
org.spark-project.jetty.server.nio.SelectChannelConnector$ConnectorSelectorManager@797cc465
As you can see, it tries to call the count operation on the RDD, but then
it fails (possibly, because the SparkContext is null(?)).

How do I address this issue? What needs to be done? Do I need to switch to
a synchronous architecture?

Thanks in advance.

Kind regards,
Marco

Re: Calling SparkContext methods in scala Future

Posted by Marco <ma...@gmail.com>.
Thank you guys for the answers.

@Ted Yu: You are right, in general the code to fetch stuff externally
should be called separately, while Spark should only access the data
written by these two services via flume/kafka/whatever. However, before I
get there, I would like to have the Spark job ready.

@Shixiong Zhu: I imagined something like that, and I must say that I
thought since the beginning that SparkContext could not be called in
Futures in general. It seems that I was right with that assumption,
although I tried and I got the confirmation I needed. Unfortunately, I
don't have a reproducer, but I would say that it's enough to create one
Future and call sparkContext from there.

Thanks again for the answers.

Kind regards,
Marco

2016-01-18 19:37 GMT+01:00 Shixiong(Ryan) Zhu <sh...@databricks.com>:

> Hey Marco,
>
> Since the codes in Future is in an asynchronous way, you cannot call
> "sparkContext.stop" at the end of "fetch" because the codes in Future may
> not finish.
>
> However, the exception seems weird. Do you have a simple reproducer?
>
>
> On Mon, Jan 18, 2016 at 9:13 AM, Ted Yu <yu...@gmail.com> wrote:
>
>>       externalCallTwo map { dataTwo =>
>>         println("in map") // prints, so it gets here ...
>>         val rddOne = sparkContext.parallelize(dataOne)
>>
>> I don't think you should call method on sparkContext in map function.
>> sparkContext lives on driver side.
>>
>> Cheers
>>
>> On Mon, Jan 18, 2016 at 6:27 AM, Marco <ma...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I am using Spark 1.5.1 within SBT, and Scala 2.10.6 and I am facing an
>>> issue with the SparkContext.
>>>
>>> Basically, I have an object that needs to do several things:
>>>
>>> - call an external service One (web api)
>>> - call an external service Two (another api)
>>> - read and produce an RDD from HDFS (Spark)
>>> - parallelize the data obtained in the first two calls
>>> - join these different rdds, do stuff with them...
>>>
>>> Now, I am trying to do it in an asynchronous way. This doesn't seem to
>>> work, though. My guess is that Spark doesn't see the calls to .parallelize,
>>> as they are made in different tasks (or Future, therefore this code is
>>> called before/later or maybe with an unset Context (can it be?)). I have
>>> tried different ways, one of these being the call to SparkEnv.set in the
>>> calls to flatMap and map (in the Future). However, all I get is Cannot call
>>> methods on a stopped SparkContext. It just doesnt'work - maybe I just
>>> misunderstood what it does, therefore I removed it.
>>>
>>> This is the code I have written so far:
>>>
>>> object Fetcher {
>>>
>>>   def fetch(name, master, ...) = {
>>>     val externalCallOne: Future[WSResponse] = externalService1()
>>>     val externalCallTwo: Future[String] = externalService2()
>>>     // val sparkEnv = SparkEnv.get
>>>     val config = new SparkConf()
>>>     .setAppName(name)
>>>     .set("spark.master", master)
>>>     .set("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>>
>>>     val sparkContext = new SparkContext(config)
>>>     //val sparkEnv = SparkEnv.get
>>>
>>>     val eventuallyJoinedData = externalCallOne flatMap { dataOne =>
>>>       // SparkEnv.set(sparkEnv)
>>>       externalCallTwo map { dataTwo =>
>>>         println("in map") // prints, so it gets here ...
>>>         val rddOne = sparkContext.parallelize(dataOne)
>>>         val rddTwo = sparkContext.parallelize(dataTwo)
>>>         // do stuff here ... foreach/println, and
>>>
>>>         val joinedData = rddOne leftOuterJoin (rddTwo)
>>>       }
>>>     }
>>>     eventuallyJoinedData onSuccess { case success => ...  }
>>>     eventuallyJoinedData onFailure { case error =>
>>> println(error.getMessage) }
>>>     // sparkContext.stop
>>>   }
>>>
>>> }
>>> As you can see, I have also tried to comment the line to stop the
>>> context, but then I get another issue:
>>>
>>> 13:09:14.929 [ForkJoinPool-1-worker-5] INFO
>>>  org.apache.spark.SparkContext - Starting job: count at Fetcher.scala:38
>>> 13:09:14.932 [shuffle-server-0] DEBUG io.netty.channel.nio.NioEventLoop
>>> - Selector.select() returned prematurely because
>>> Thread.currentThread().interrupt() was called. Use
>>> NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
>>> 13:09:14.936 [Spark Context Cleaner] ERROR
>>> org.apache.spark.ContextCleaner - Error in cleaning thread
>>> java.lang.InterruptedException: null
>>>     at java.lang.Object.wait(Native Method) ~[na:1.8.0_65]
>>>     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
>>> ~[na:1.8.0_65]
>>>     at
>>> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:157)
>>> ~[spark-core_2.10-1.5.1.jar:1.5.1]
>>>     at
>>> org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
>>> [spark-core_2.10-1.5.1.jar:1.5.1]
>>>     at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
>>> [spark-core_2.10-1.5.1.jar:1.5.1]
>>>     at
>>> org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
>>> [spark-core_2.10-1.5.1.jar:1.5.1]
>>> 13:09:14.940 [db-async-netty-thread-1] DEBUG
>>> io.netty.channel.nio.NioEventLoop - Selector.select() returned prematurely
>>> because Thread.currentThread().interrupt() was called. Use
>>> NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
>>> 13:09:14.943 [SparkListenerBus] ERROR org.apache.spark.util.Utils -
>>> uncaught error in thread SparkListenerBus, stopping SparkContext
>>> java.lang.InterruptedException: null
>>>     at
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
>>> ~[na:1.8.0_65]
>>>     at
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>>> ~[na:1.8.0_65]
>>>     at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
>>> ~[na:1.8.0_65]
>>>     at
>>> org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:65)
>>> ~[spark-core_2.10-1.5.1.jar:1.5.1]
>>>     at
>>> org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
>>> ~[spark-core_2.10-1.5.1.jar:1.5.1]
>>>     at
>>> org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)
>>> [spark-core_2.10-1.5.1.jar:1.5.1]
>>> 13:09:14.949 [SparkListenerBus] DEBUG
>>> o.s.j.u.component.AbstractLifeCycle - stopping
>>> org.spark-project.jetty.server.Server@787cbcef
>>> 13:09:14.959 [SparkListenerBus] DEBUG
>>> o.s.j.u.component.AbstractLifeCycle - stopping
>>> SelectChannelConnector@0.0.0.0:4040
>>> 13:09:14.959 [SparkListenerBus] DEBUG
>>> o.s.j.u.component.AbstractLifeCycle - stopping
>>> org.spark-project.jetty.server.nio.SelectChannelConnector$ConnectorSelectorManager@797cc465
>>> As you can see, it tries to call the count operation on the RDD, but
>>> then it fails (possibly, because the SparkContext is null(?)).
>>>
>>> How do I address this issue? What needs to be done? Do I need to switch
>>> to a synchronous architecture?
>>>
>>> Thanks in advance.
>>>
>>> Kind regards,
>>> Marco
>>>
>>>
>>>
>>
>

Re: Calling SparkContext methods in scala Future

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Hey Marco,

Since the codes in Future is in an asynchronous way, you cannot call
"sparkContext.stop" at the end of "fetch" because the codes in Future may
not finish.

However, the exception seems weird. Do you have a simple reproducer?


On Mon, Jan 18, 2016 at 9:13 AM, Ted Yu <yu...@gmail.com> wrote:

>       externalCallTwo map { dataTwo =>
>         println("in map") // prints, so it gets here ...
>         val rddOne = sparkContext.parallelize(dataOne)
>
> I don't think you should call method on sparkContext in map function.
> sparkContext lives on driver side.
>
> Cheers
>
> On Mon, Jan 18, 2016 at 6:27 AM, Marco <ma...@gmail.com> wrote:
>
>> Hello,
>>
>> I am using Spark 1.5.1 within SBT, and Scala 2.10.6 and I am facing an
>> issue with the SparkContext.
>>
>> Basically, I have an object that needs to do several things:
>>
>> - call an external service One (web api)
>> - call an external service Two (another api)
>> - read and produce an RDD from HDFS (Spark)
>> - parallelize the data obtained in the first two calls
>> - join these different rdds, do stuff with them...
>>
>> Now, I am trying to do it in an asynchronous way. This doesn't seem to
>> work, though. My guess is that Spark doesn't see the calls to .parallelize,
>> as they are made in different tasks (or Future, therefore this code is
>> called before/later or maybe with an unset Context (can it be?)). I have
>> tried different ways, one of these being the call to SparkEnv.set in the
>> calls to flatMap and map (in the Future). However, all I get is Cannot call
>> methods on a stopped SparkContext. It just doesnt'work - maybe I just
>> misunderstood what it does, therefore I removed it.
>>
>> This is the code I have written so far:
>>
>> object Fetcher {
>>
>>   def fetch(name, master, ...) = {
>>     val externalCallOne: Future[WSResponse] = externalService1()
>>     val externalCallTwo: Future[String] = externalService2()
>>     // val sparkEnv = SparkEnv.get
>>     val config = new SparkConf()
>>     .setAppName(name)
>>     .set("spark.master", master)
>>     .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>>
>>     val sparkContext = new SparkContext(config)
>>     //val sparkEnv = SparkEnv.get
>>
>>     val eventuallyJoinedData = externalCallOne flatMap { dataOne =>
>>       // SparkEnv.set(sparkEnv)
>>       externalCallTwo map { dataTwo =>
>>         println("in map") // prints, so it gets here ...
>>         val rddOne = sparkContext.parallelize(dataOne)
>>         val rddTwo = sparkContext.parallelize(dataTwo)
>>         // do stuff here ... foreach/println, and
>>
>>         val joinedData = rddOne leftOuterJoin (rddTwo)
>>       }
>>     }
>>     eventuallyJoinedData onSuccess { case success => ...  }
>>     eventuallyJoinedData onFailure { case error =>
>> println(error.getMessage) }
>>     // sparkContext.stop
>>   }
>>
>> }
>> As you can see, I have also tried to comment the line to stop the
>> context, but then I get another issue:
>>
>> 13:09:14.929 [ForkJoinPool-1-worker-5] INFO
>>  org.apache.spark.SparkContext - Starting job: count at Fetcher.scala:38
>> 13:09:14.932 [shuffle-server-0] DEBUG io.netty.channel.nio.NioEventLoop -
>> Selector.select() returned prematurely because
>> Thread.currentThread().interrupt() was called. Use
>> NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
>> 13:09:14.936 [Spark Context Cleaner] ERROR
>> org.apache.spark.ContextCleaner - Error in cleaning thread
>> java.lang.InterruptedException: null
>>     at java.lang.Object.wait(Native Method) ~[na:1.8.0_65]
>>     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
>> ~[na:1.8.0_65]
>>     at
>> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:157)
>> ~[spark-core_2.10-1.5.1.jar:1.5.1]
>>     at
>> org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
>> [spark-core_2.10-1.5.1.jar:1.5.1]
>>     at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
>> [spark-core_2.10-1.5.1.jar:1.5.1]
>>     at
>> org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
>> [spark-core_2.10-1.5.1.jar:1.5.1]
>> 13:09:14.940 [db-async-netty-thread-1] DEBUG
>> io.netty.channel.nio.NioEventLoop - Selector.select() returned prematurely
>> because Thread.currentThread().interrupt() was called. Use
>> NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
>> 13:09:14.943 [SparkListenerBus] ERROR org.apache.spark.util.Utils -
>> uncaught error in thread SparkListenerBus, stopping SparkContext
>> java.lang.InterruptedException: null
>>     at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
>> ~[na:1.8.0_65]
>>     at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>> ~[na:1.8.0_65]
>>     at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
>> ~[na:1.8.0_65]
>>     at
>> org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:65)
>> ~[spark-core_2.10-1.5.1.jar:1.5.1]
>>     at
>> org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
>> ~[spark-core_2.10-1.5.1.jar:1.5.1]
>>     at
>> org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)
>> [spark-core_2.10-1.5.1.jar:1.5.1]
>> 13:09:14.949 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle
>> - stopping org.spark-project.jetty.server.Server@787cbcef
>> 13:09:14.959 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle
>> - stopping SelectChannelConnector@0.0.0.0:4040
>> 13:09:14.959 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle
>> - stopping
>> org.spark-project.jetty.server.nio.SelectChannelConnector$ConnectorSelectorManager@797cc465
>> As you can see, it tries to call the count operation on the RDD, but then
>> it fails (possibly, because the SparkContext is null(?)).
>>
>> How do I address this issue? What needs to be done? Do I need to switch
>> to a synchronous architecture?
>>
>> Thanks in advance.
>>
>> Kind regards,
>> Marco
>>
>>
>>
>

Re: Calling SparkContext methods in scala Future

Posted by Ted Yu <yu...@gmail.com>.
      externalCallTwo map { dataTwo =>
        println("in map") // prints, so it gets here ...
        val rddOne = sparkContext.parallelize(dataOne)

I don't think you should call method on sparkContext in map function.
sparkContext lives on driver side.

Cheers

On Mon, Jan 18, 2016 at 6:27 AM, Marco <ma...@gmail.com> wrote:

> Hello,
>
> I am using Spark 1.5.1 within SBT, and Scala 2.10.6 and I am facing an
> issue with the SparkContext.
>
> Basically, I have an object that needs to do several things:
>
> - call an external service One (web api)
> - call an external service Two (another api)
> - read and produce an RDD from HDFS (Spark)
> - parallelize the data obtained in the first two calls
> - join these different rdds, do stuff with them...
>
> Now, I am trying to do it in an asynchronous way. This doesn't seem to
> work, though. My guess is that Spark doesn't see the calls to .parallelize,
> as they are made in different tasks (or Future, therefore this code is
> called before/later or maybe with an unset Context (can it be?)). I have
> tried different ways, one of these being the call to SparkEnv.set in the
> calls to flatMap and map (in the Future). However, all I get is Cannot call
> methods on a stopped SparkContext. It just doesnt'work - maybe I just
> misunderstood what it does, therefore I removed it.
>
> This is the code I have written so far:
>
> object Fetcher {
>
>   def fetch(name, master, ...) = {
>     val externalCallOne: Future[WSResponse] = externalService1()
>     val externalCallTwo: Future[String] = externalService2()
>     // val sparkEnv = SparkEnv.get
>     val config = new SparkConf()
>     .setAppName(name)
>     .set("spark.master", master)
>     .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>
>     val sparkContext = new SparkContext(config)
>     //val sparkEnv = SparkEnv.get
>
>     val eventuallyJoinedData = externalCallOne flatMap { dataOne =>
>       // SparkEnv.set(sparkEnv)
>       externalCallTwo map { dataTwo =>
>         println("in map") // prints, so it gets here ...
>         val rddOne = sparkContext.parallelize(dataOne)
>         val rddTwo = sparkContext.parallelize(dataTwo)
>         // do stuff here ... foreach/println, and
>
>         val joinedData = rddOne leftOuterJoin (rddTwo)
>       }
>     }
>     eventuallyJoinedData onSuccess { case success => ...  }
>     eventuallyJoinedData onFailure { case error =>
> println(error.getMessage) }
>     // sparkContext.stop
>   }
>
> }
> As you can see, I have also tried to comment the line to stop the context,
> but then I get another issue:
>
> 13:09:14.929 [ForkJoinPool-1-worker-5] INFO  org.apache.spark.SparkContext
> - Starting job: count at Fetcher.scala:38
> 13:09:14.932 [shuffle-server-0] DEBUG io.netty.channel.nio.NioEventLoop -
> Selector.select() returned prematurely because
> Thread.currentThread().interrupt() was called. Use
> NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
> 13:09:14.936 [Spark Context Cleaner] ERROR org.apache.spark.ContextCleaner
> - Error in cleaning thread
> java.lang.InterruptedException: null
>     at java.lang.Object.wait(Native Method) ~[na:1.8.0_65]
>     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> ~[na:1.8.0_65]
>     at
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:157)
> ~[spark-core_2.10-1.5.1.jar:1.5.1]
>     at
> org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
> [spark-core_2.10-1.5.1.jar:1.5.1]
>     at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
> [spark-core_2.10-1.5.1.jar:1.5.1]
>     at
> org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
> [spark-core_2.10-1.5.1.jar:1.5.1]
> 13:09:14.940 [db-async-netty-thread-1] DEBUG
> io.netty.channel.nio.NioEventLoop - Selector.select() returned prematurely
> because Thread.currentThread().interrupt() was called. Use
> NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
> 13:09:14.943 [SparkListenerBus] ERROR org.apache.spark.util.Utils -
> uncaught error in thread SparkListenerBus, stopping SparkContext
> java.lang.InterruptedException: null
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
> ~[na:1.8.0_65]
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> ~[na:1.8.0_65]
>     at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
> ~[na:1.8.0_65]
>     at
> org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:65)
> ~[spark-core_2.10-1.5.1.jar:1.5.1]
>     at
> org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
> ~[spark-core_2.10-1.5.1.jar:1.5.1]
>     at
> org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)
> [spark-core_2.10-1.5.1.jar:1.5.1]
> 13:09:14.949 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle
> - stopping org.spark-project.jetty.server.Server@787cbcef
> 13:09:14.959 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle
> - stopping SelectChannelConnector@0.0.0.0:4040
> 13:09:14.959 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle
> - stopping
> org.spark-project.jetty.server.nio.SelectChannelConnector$ConnectorSelectorManager@797cc465
> As you can see, it tries to call the count operation on the RDD, but then
> it fails (possibly, because the SparkContext is null(?)).
>
> How do I address this issue? What needs to be done? Do I need to switch to
> a synchronous architecture?
>
> Thanks in advance.
>
> Kind regards,
> Marco
>
>
>