You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Yang Lei <ge...@gmail.com> on 2015/04/24 23:38:02 UTC

Re: Issue of running partitioned loading (RDD) in Spark External Datasource on Mesos

forward to dev.

On Mon, Apr 20, 2015 at 10:46 AM, Yang Lei <ge...@gmail.com> wrote:

> I implemented two kinds of DataSource, one load data during buildScan,
> the other returning my RDD class with partition information for future
> loading.
>
> My RDD's compute gets actorSystem from  SparkEnv.get.actorSystem, then
> use Spray to interact with a HTTP endpoint, which is the same flow as
> loading data in buildScan.  All the Spray dependencies are included in a
> jar and passes to spark-submit using --jar.
>
> The Job is define in python.
>
> Both scenarios work testing locally using --master local[4]. For mesos,
> the not partitioned loading works too, but the partitioned loading hits the
> following exception.
>
> Traceback (most recent call last):
>
>   File "/root/spark-1.3.1-bin-hadoop2.4/../CloudantApp.py", line 78, in
> <module>
>
>     for code in airportData.collect():
>
>   File "/root/spark-1.3.1-bin-hadoop2.4/python/pyspark/sql/dataframe.py",
> line 293, in collect
>
>     port =
> self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd())
>
>   File
> "/root/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__
>
>   File
> "/root/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
> line 300, in get_return_value
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o60.javaToPython.
>
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 0 in stage 36.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 36.0 (TID 147, 198.11.207.72): com.typesafe.config.ConfigException$Missing:
> No configuration setting found for key 'spray'
>
> at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
>
> at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:147)
>
> at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
>
> at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
>
> at com.typesafe.config.impl.SimpleConfig.getObject(SimpleConfig.java:218)
>
> at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:224)
>
> at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:33)
>
> at spray.can.HttpExt.<init>(Http.scala:143)
>
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>
> at
> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
>
> at scala.util.Try$.apply(Try.scala:161)
>
> at
> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
>
> at akka.actor.ExtensionKey.createExtension(Extension.scala:153)
>
> at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:711)
>
> at akka.actor.ExtensionId$class.apply(Extension.scala:79)
>
> at akka.actor.ExtensionKey.apply(Extension.scala:149)
>
> at akka.io.IO$.apply(IO.scala:30)
>
> at spray.client.pipelining$.sendReceive(pipelining.scala:35)
>
> at
> com.cloudant.spark.common.JsonStoreDataAccess.getQueryResult(JsonStoreDataAccess.scala:118)
>
> at
> com.cloudant.spark.common.JsonStoreDataAccess.getIterator(JsonStoreDataAccess.scala:71)
>
> at com.cloudant.spark.common.JsonStoreRDD.compute(JsonStoreRDD.scala:86)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Is this due to some kind of classpath setup issue on the executor for the
> external jar for handing RDD?
>
> Thanks in advance for any suggestions on how to resolve this.
>
> Yang
>

Re: Issue of running partitioned loading (RDD) in Spark External Datasource on Mesos

Posted by Yang Lei <ge...@gmail.com>.
I finally isolated the issue to be related to the ActorSystem I reuse from
SparkEnv.get.actorSystem. This ActorSystem will contain the configuration
defined in my application jar's reference.conf in both local cluster case,
and in the case I use it directly in an extension to BaseRelation's buildScan
method. However if used in my RDD which is returned in the buildScan, it
loses the configuration.

I solve / bypass the problem by checking if my configuration exists in the
SparkEnv.get.actorSystem(settings.config) .If it does not exist, I will
create a new ActorSystem using my class's classLoader to force config
reading from my application jar:

            val classLoader = this.getClass.getClassLoader

            val myconfig = ConfigFactory.load(classLoader)// force config
reading from my classloader

            ActorSystem("somename..",myconfig,classLoader)


I wonder if this different behavior of SparkEnv.get.actorSystem is
working-as-designed, or something is missing in executor setup for this
custom RDD driven execution case.


Thanks.


Yang

Re: Issue of running partitioned loading (RDD) in Spark External Datasource on Mesos

Posted by Yang Lei <ge...@gmail.com>.
The configure is in the jar I passed in.  And if I do not create my own RDD for partitioned loading, everything is fine, in which case the task is run in executor right? So it seems some special call path before triggering my RDD compute makes the configure 'lost'. 

I will try to see if I can debug further. But any  insight in this special call path will be appreciated. 

Yang

Sent from my iPhone

> On Apr 24, 2015, at 8:14 PM, Reynold Xin <rx...@databricks.com> wrote:
> 
> This looks like a specific Spray configuration issue (or how Spray reads config files). Maybe Spray is reading some local config file that doesn't exist on your executors? 
> 
> You might need to email the Spray list.
> 
> 
>> On Fri, Apr 24, 2015 at 2:38 PM, Yang Lei <ge...@gmail.com> wrote:
>> forward to dev.
>> 
>> On Mon, Apr 20, 2015 at 10:46 AM, Yang Lei <ge...@gmail.com> wrote:
>> 
>> > I implemented two kinds of DataSource, one load data during buildScan,
>> > the other returning my RDD class with partition information for future
>> > loading.
>> >
>> > My RDD's compute gets actorSystem from  SparkEnv.get.actorSystem, then
>> > use Spray to interact with a HTTP endpoint, which is the same flow as
>> > loading data in buildScan.  All the Spray dependencies are included in a
>> > jar and passes to spark-submit using --jar.
>> >
>> > The Job is define in python.
>> >
>> > Both scenarios work testing locally using --master local[4]. For mesos,
>> > the not partitioned loading works too, but the partitioned loading hits the
>> > following exception.
>> >
>> > Traceback (most recent call last):
>> >
>> >   File "/root/spark-1.3.1-bin-hadoop2.4/../CloudantApp.py", line 78, in
>> > <module>
>> >
>> >     for code in airportData.collect():
>> >
>> >   File "/root/spark-1.3.1-bin-hadoop2.4/python/pyspark/sql/dataframe.py",
>> > line 293, in collect
>> >
>> >     port =
>> > self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd())
>> >
>> >   File
>> > "/root/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>> > line 538, in __call__
>> >
>> >   File
>> > "/root/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>> > line 300, in get_return_value
>> >
>> > py4j.protocol.Py4JJavaError: An error occurred while calling
>> > o60.javaToPython.
>> >
>> > : org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> > 0 in stage 36.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>> > 36.0 (TID 147, 198.11.207.72): com.typesafe.config.ConfigException$Missing:
>> > No configuration setting found for key 'spray'
>> >
>> > at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
>> >
>> > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:147)
>> >
>> > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
>> >
>> > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
>> >
>> > at com.typesafe.config.impl.SimpleConfig.getObject(SimpleConfig.java:218)
>> >
>> > at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:224)
>> >
>> > at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:33)
>> >
>> > at spray.can.HttpExt.<init>(Http.scala:143)
>> >
>> > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> >
>> > at
>> > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>> >
>> > at
>> > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> >
>> > at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>> >
>> > at
>> > akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
>> >
>> > at scala.util.Try$.apply(Try.scala:161)
>> >
>> > at
>> > akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
>> >
>> > at akka.actor.ExtensionKey.createExtension(Extension.scala:153)
>> >
>> > at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:711)
>> >
>> > at akka.actor.ExtensionId$class.apply(Extension.scala:79)
>> >
>> > at akka.actor.ExtensionKey.apply(Extension.scala:149)
>> >
>> > at akka.io.IO$.apply(IO.scala:30)
>> >
>> > at spray.client.pipelining$.sendReceive(pipelining.scala:35)
>> >
>> > at
>> > com.cloudant.spark.common.JsonStoreDataAccess.getQueryResult(JsonStoreDataAccess.scala:118)
>> >
>> > at
>> > com.cloudant.spark.common.JsonStoreDataAccess.getIterator(JsonStoreDataAccess.scala:71)
>> >
>> > at com.cloudant.spark.common.JsonStoreRDD.compute(JsonStoreRDD.scala:86)
>> >
>> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>> >
>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>> >
>> > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> >
>> > at org.apache.spark.scheduler.Task.run(Task.scala:64)
>> >
>> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>> >
>> > at
>> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> >
>> > at
>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> >
>> > at java.lang.Thread.run(Thread.java:745)
>> >
>> > Is this due to some kind of classpath setup issue on the executor for the
>> > external jar for handing RDD?
>> >
>> > Thanks in advance for any suggestions on how to resolve this.
>> >
>> > Yang
>> >
> 

Re: Issue of running partitioned loading (RDD) in Spark External Datasource on Mesos

Posted by Reynold Xin <rx...@databricks.com>.
This looks like a specific Spray configuration issue (or how Spray reads
config files). Maybe Spray is reading some local config file that doesn't
exist on your executors?

You might need to email the Spray list.


On Fri, Apr 24, 2015 at 2:38 PM, Yang Lei <ge...@gmail.com> wrote:

> forward to dev.
>
> On Mon, Apr 20, 2015 at 10:46 AM, Yang Lei <ge...@gmail.com> wrote:
>
> > I implemented two kinds of DataSource, one load data during buildScan,
> > the other returning my RDD class with partition information for future
> > loading.
> >
> > My RDD's compute gets actorSystem from  SparkEnv.get.actorSystem, then
> > use Spray to interact with a HTTP endpoint, which is the same flow as
> > loading data in buildScan.  All the Spray dependencies are included in a
> > jar and passes to spark-submit using --jar.
> >
> > The Job is define in python.
> >
> > Both scenarios work testing locally using --master local[4]. For mesos,
> > the not partitioned loading works too, but the partitioned loading hits
> the
> > following exception.
> >
> > Traceback (most recent call last):
> >
> >   File "/root/spark-1.3.1-bin-hadoop2.4/../CloudantApp.py", line 78, in
> > <module>
> >
> >     for code in airportData.collect():
> >
> >   File "/root/spark-1.3.1-bin-hadoop2.4/python/pyspark/sql/dataframe.py",
> > line 293, in collect
> >
> >     port =
> > self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd())
> >
> >   File
> >
> "/root/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> > line 538, in __call__
> >
> >   File
> >
> "/root/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
> > line 300, in get_return_value
> >
> > py4j.protocol.Py4JJavaError: An error occurred while calling
> > o60.javaToPython.
> >
> > : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> > 0 in stage 36.0 failed 4 times, most recent failure: Lost task 0.3 in
> stage
> > 36.0 (TID 147, 198.11.207.72):
> com.typesafe.config.ConfigException$Missing:
> > No configuration setting found for key 'spray'
> >
> > at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
> >
> > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:147)
> >
> > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
> >
> > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
> >
> > at com.typesafe.config.impl.SimpleConfig.getObject(SimpleConfig.java:218)
> >
> > at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:224)
> >
> > at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:33)
> >
> > at spray.can.HttpExt.<init>(Http.scala:143)
> >
> > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> >
> > at
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> >
> > at
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >
> > at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> >
> > at
> >
> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
> >
> > at scala.util.Try$.apply(Try.scala:161)
> >
> > at
> >
> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
> >
> > at akka.actor.ExtensionKey.createExtension(Extension.scala:153)
> >
> > at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:711)
> >
> > at akka.actor.ExtensionId$class.apply(Extension.scala:79)
> >
> > at akka.actor.ExtensionKey.apply(Extension.scala:149)
> >
> > at akka.io.IO$.apply(IO.scala:30)
> >
> > at spray.client.pipelining$.sendReceive(pipelining.scala:35)
> >
> > at
> >
> com.cloudant.spark.common.JsonStoreDataAccess.getQueryResult(JsonStoreDataAccess.scala:118)
> >
> > at
> >
> com.cloudant.spark.common.JsonStoreDataAccess.getIterator(JsonStoreDataAccess.scala:71)
> >
> > at com.cloudant.spark.common.JsonStoreRDD.compute(JsonStoreRDD.scala:86)
> >
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> >
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> >
> > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> >
> > at org.apache.spark.scheduler.Task.run(Task.scala:64)
> >
> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> >
> > at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >
> > at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >
> > at java.lang.Thread.run(Thread.java:745)
> >
> > Is this due to some kind of classpath setup issue on the executor for the
> > external jar for handing RDD?
> >
> > Thanks in advance for any suggestions on how to resolve this.
> >
> > Yang
> >
>