You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Jonnas Li(Contractor)" <zh...@envisioncn.com> on 2017/06/06 09:35:54 UTC

Java SPI jar reload in Spark

I have a Spark Streaming application, which dynamically calling a jar (Java SPI), and the jar is called in a mapWithState() function, it was working fine for a long time.
Recently, I got a requirement which required to reload the jar during runtime.
But when the reloading is completed, the spark streaming job got failed, and I get the following exception, it seems the spark try to deserialize the checkpoint failed.
My question is whether the logic in the jar will be serialized into checkpoint, and is it possible to do the jar reloading during runtime in Spark Streaming?



[2017-06-06 17:13:12,185] WARN Lost task 1.0 in stage 5355.0 (TID 4817, ip-10-21-14-205.envisioncn.com): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.streaming.rdd.MapWithStateRDD
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:71)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
 (org.apache.spark.scheduler.TaskSetManager)
[2017-06-06 17:13:12,298] ERROR Task 1 in stage 5355.0 failed 8 times; aborting job (org.apache.spark.scheduler.TaskSetManager)
[2017-06-06 17:13:12,314] ERROR Error running job streaming job 1496740392000 ms.0 (org.apache.spark.streaming.scheduler.JobScheduler)

Thank you in advanced!

Regards
Jonnas Li



本邮件(包括任何附件)内容机密并受法律保护。如果您意外地收到这封邮件,请回复通知发件人并从当前系统中删除本邮件。任何未经授权者,严禁使用并部分或者全部的转发本条信息。任何未经授权的使用或传播都是被严格禁止的。远景能源与其分公司不对因不正确和不完整的转发此邮件包含的信息以及因任何因邮件延迟或对你的系统造成的损害而负责。远景能源不能保证此邮件的真实完整性,也不能确定此邮件是否含有病毒或者监听程序。
This email message (including any attachments) is confidential and may be legally privileged. If you have received it by mistake, please notify the sender by return email and delete this message from your system. Any unauthorized use or dissemination of this message in whole or in part is strictly prohibited. Envision Energy Limited and all its subsidiaries shall not be liable for the improper or incomplete transmission of the information contained in this email nor for any delay in its receipt or damage to your system. Envision Energy Limited does not guarantee the integrity of this email message, nor that this email message is free of viruses, interceptions, or interference.

Re: Java SPI jar reload in Spark

Posted by Ryan <ry...@gmail.com>.
I'd suggest scripts like js, groovy, etc.. To my understanding the service
loader mechanism isn't a good fit for runtime reloading.

On Wed, Jun 7, 2017 at 4:55 PM, Jonnas Li(Contractor) <
zhongshuang.li@envisioncn.com> wrote:

> To be more explicit, I used mapwithState() in my application, just like
> this:
>
> stream = KafkaUtils.createStream(..)
> mappedStream = stream.mapPartitionToPair(..)
> stateStream = mappedStream.mapwithState(*MyUpdateFunc*(..))
> stateStream.foreachRDD(..)
>
> I call the jar in *MyUpdateFunc()*, and the jar reloading is triggered by
> some other event.
>
> I'm not sure if this approach is feasible. To my understand, Spark will
> checkpoint the status, so the application can’t be updated at runtime,
> that’s why I got the exception.
>
> Any suggestion is welcome, if there is any other idea to do something like
> this, I just want to provide a approach to enable users can customize for
> their business logic.
>
> Regards
> 李忠双 / Jonnas
>
> 发件人: Zhongshuang Li <zh...@envisioncn.com>
> 日期: 2017年6月6日 星期二 下午6:30
> 至: Alonso Isidoro Roman <al...@gmail.com>
>
> 抄送: Jörn Franke <jo...@gmail.com>, "user@spark.apache.org" <
> user@spark.apache.org>
> 主题: Re: Java SPI jar reload in Spark
>
> I used java.util.ServiceLoader<S>
> <https://docs.oracle.com/javase/7/docs/api/java/util/ServiceLoader.html>  ,
> as the javadoc says it supports reloading.
> Please point it out if I'm mis-understanding.
>
> Regards
> Jonnas Li
>
> 发件人: Alonso Isidoro Roman <al...@gmail.com>
> 日期: 2017年6月6日 星期二 下午6:21
> 至: Zhongshuang Li <zh...@envisioncn.com>
> 抄送: Jörn Franke <jo...@gmail.com>, "user@spark.apache.org" <
> user@spark.apache.org>
> 主题: Re: Java SPI jar reload in Spark
>
> Hi, a quick search on google.
>
> https://github.com/spark-jobserver/spark-jobserver/issues/130
>
> Alonso Isidoro Roman
> [image: https://]about.me/alonso.isidoro.roman
>
> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>
> 2017-06-06 12:14 GMT+02:00 Jonnas Li(Contractor) <
> zhongshuang.li@envisioncn.com>:
>
>> Thank for your quick response.
>> These jars are used to define some customize business logic, and they can
>> be treat as plug-ins in our business scenario. And the jars are
>> developed/maintain by some third-party partners, this means there will be
>> some version updating.
>> My expectation is update the business code with restarting the spark
>> streaming job, any suggestion will be very grateful.
>>
>> Regards
>> Jonnas Li
>>
>> 发件人: Jörn Franke <jo...@gmail.com>
>> 日期: 2017年6月6日 星期二 下午5:55
>> 至: Zhongshuang Li <zh...@envisioncn.com>
>> 抄送: "user@spark.apache.org" <us...@spark.apache.org>
>> 主题: Re: Java SPI jar reload in Spark
>>
>> Why do you need jar reloading? What functionality is executed during jar
>> reloading. Maybe there is another way to achieve the same without jar
>> reloading. In fact, it might be dangerous from a functional point of view-
>> functionality in jar changed and all your computation is wrong.
>>
>> On 6. Jun 2017, at 11:35, Jonnas Li(Contractor) <
>> zhongshuang.li@envisioncn.com> wrote:
>>
>> I have a Spark Streaming application, which dynamically calling a jar
>> (Java SPI), and the jar is called in a mapWithState() function, it was
>> working fine for a long time.
>> Recently, I got a requirement which required to reload the jar during
>> runtime.
>> But when the reloading is completed, the spark streaming job got failed,
>> and I get the following exception, it seems the spark try to deserialize
>> the checkpoint failed.
>> My question is whether the logic in the jar will be serialized into
>> checkpoint, and is it possible to do the jar reloading during runtime in
>> Spark Streaming?
>>
>>
>> [2017-06-06 17:13:12,185] WARN Lost task 1.0 in stage 5355.0 (TID 4817, ip-10-21-14-205.envisioncn.com): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.streaming.rdd.MapWithStateRDD
>> 	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
>> 	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
>> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
>> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 	at java.lang.reflect.Method.invoke(Method.java:606)
>> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
>> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 	at java.lang.reflect.Method.invoke(Method.java:606)
>> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
>> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 	at java.lang.reflect.Method.invoke(Method.java:606)
>> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
>> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 	at java.lang.reflect.Method.invoke(Method.java:606)
>> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
>> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 	at java.lang.reflect.Method.invoke(Method.java:606)
>> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
>> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 	at java.lang.reflect.Method.invoke(Method.java:606)
>> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
>> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 	at java.lang.reflect.Method.invoke(Method.java:606)
>> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
>> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 	at java.lang.reflect.Method.invoke(Method.java:606)
>> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
>> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 	at java.lang.reflect.Method.invoke(Method.java:606)
>> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
>> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 	at java.lang.reflect.Method.invoke(Method.java:606)
>> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> 	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
>> 	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:71)
>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>> 	at org.apache.spark.scheduler.Task.run(Task.scala:86)
>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> 	at java.lang.Thread.run(Thread.java:745)
>>  (org.apache.spark.scheduler.TaskSetManager)
>> [2017-06-06 17:13:12,298] ERROR Task 1 in stage 5355.0 failed 8 times; aborting job (org.apache.spark.scheduler.TaskSetManager)
>> [2017-06-06 17:13:12,314] ERROR Error running job streaming job 1496740392000 ms.0 (org.apache.spark.streaming.scheduler.JobScheduler)
>>
>>
>> Thank you in advanced!
>>
>> Regards
>> Jonnas Li
>>
>>
>>
>> 本邮件(包括任何附件)内容机密并受法律保护。如果您意外地收到这封邮件,请回复通知发件人并从当前系统中删除本邮件。
>> 任何未经授权者,严禁使用并部分或者全部的转发本条信息。任何未经授权的使用或传播都是被严格禁止的。远景能源与其分公司不对因
>> 不正确和不完整的转发此邮件包含的信息以及因任何因邮件延迟或对你的系统造成的损害而负责。远景能源不能保证此邮件的真实完整性,也不能确定此邮件是否含有病毒或者监听程序。
>>
>> This email message (including any attachments) is confidential and may be
>> legally privileged. If you have received it by mistake, please notify the
>> sender by return email and delete this message from your system. Any
>> unauthorized use or dissemination of this message in whole or in part is
>> strictly prohibited. Envision Energy Limited and all its subsidiaries shall
>> not be liable for the improper or incomplete transmission of the
>> information contained in this email nor for any delay in its receipt or
>> damage to your system. Envision Energy Limited does not guarantee the
>> integrity of this email message, nor that this email message is free of
>> viruses, interceptions, or interference.
>>
>>
>>
>>
>> 本邮件(包括任何附件)内容机密并受法律保护。如果您意外地收到这封邮件,请回复通知发件人并从当前系统中删除本邮件。
>> 任何未经授权者,严禁使用并部分或者全部的转发本条信息。任何未经授权的使用或传播都是被严格禁止的。远景能源与其分公司不对因
>> 不正确和不完整的转发此邮件包含的信息以及因任何因邮件延迟或对你的系统造成的损害而负责。远景能源不能保证此邮件的真实完整性,也不能确定此邮件是否含有病毒或者监听程序。
>>
>> This email message (including any attachments) is confidential and may be
>> legally privileged. If you have received it by mistake, please notify the
>> sender by return email and delete this message from your system. Any
>> unauthorized use or dissemination of this message in whole or in part is
>> strictly prohibited. Envision Energy Limited and all its subsidiaries shall
>> not be liable for the improper or incomplete transmission of the
>> information contained in this email nor for any delay in its receipt or
>> damage to your system. Envision Energy Limited does not guarantee the
>> integrity of this email message, nor that this email message is free of
>> viruses, interceptions, or interference.
>>
>
>
>
>
> 本邮件(包括任何附件)内容机密并受法律保护。如果您意外地收到这封邮件,请回复通知发件人并从当前系统中删除本邮件。任何未经授权者,
> 严禁使用并部分或者全部的转发本条信息。任何未经授权的使用或传播都是被严格禁止的。远景能源与其分公司不对因不正确和不完整的转发此邮件包含的信息
> 以及因任何因邮件延迟或对你的系统造成的损害而负责。远景能源不能保证此邮件的真实完整性,也不能确定此邮件是否含有病毒或者监听程序。
> This email message (including any attachments) is confidential and may be
> legally privileged. If you have received it by mistake, please notify the
> sender by return email and delete this message from your system. Any
> unauthorized use or dissemination of this message in whole or in part is
> strictly prohibited. Envision Energy Limited and all its subsidiaries shall
> not be liable for the improper or incomplete transmission of the
> information contained in this email nor for any delay in its receipt or
> damage to your system. Envision Energy Limited does not guarantee the
> integrity of this email message, nor that this email message is free of
> viruses, interceptions, or interference.
>
>
>
> 本邮件(包括任何附件)内容机密并受法律保护。如果您意外地收到这封邮件,请回复通知发件人并从当前系统中删除本邮件。任何未经授权者,
> 严禁使用并部分或者全部的转发本条信息。任何未经授权的使用或传播都是被严格禁止的。远景能源与其分公司不对因不正确和不完整的转发此邮件包含的信息
> 以及因任何因邮件延迟或对你的系统造成的损害而负责。远景能源不能保证此邮件的真实完整性,也不能确定此邮件是否含有病毒或者监听程序。
> This email message (including any attachments) is confidential and may be
> legally privileged. If you have received it by mistake, please notify the
> sender by return email and delete this message from your system. Any
> unauthorized use or dissemination of this message in whole or in part is
> strictly prohibited. Envision Energy Limited and all its subsidiaries shall
> not be liable for the improper or incomplete transmission of the
> information contained in this email nor for any delay in its receipt or
> damage to your system. Envision Energy Limited does not guarantee the
> integrity of this email message, nor that this email message is free of
> viruses, interceptions, or interference.
>

Re: Java SPI jar reload in Spark

Posted by "Jonnas Li(Contractor)" <zh...@envisioncn.com>.
To be more explicit, I used mapwithState() in my application, just like this:

stream = KafkaUtils.createStream(..)
mappedStream = stream.mapPartitionToPair(..)
stateStream = mappedStream.mapwithState(MyUpdateFunc(..))
stateStream.foreachRDD(..)

I call the jar in MyUpdateFunc(), and the jar reloading is triggered by some other event.

I'm not sure if this approach is feasible. To my understand, Spark will checkpoint the status, so the application can’t be updated at runtime, that’s why I got the exception.

Any suggestion is welcome, if there is any other idea to do something like this, I just want to provide a approach to enable users can customize for their business logic.

Regards
李忠双 / Jonnas

发件人: Zhongshuang Li <zh...@envisioncn.com>>
日期: 2017年6月6日 星期二 下午6:30
至: Alonso Isidoro Roman <al...@gmail.com>>
抄送: Jörn Franke <jo...@gmail.com>>, "user@spark.apache.org<ma...@spark.apache.org>" <us...@spark.apache.org>>
主题: Re: Java SPI jar reload in Spark

I used java.util.ServiceLoader<S><https://docs.oracle.com/javase/7/docs/api/java/util/ServiceLoader.html>  , as the javadoc says it supports reloading.
Please point it out if I'm mis-understanding.

Regards
Jonnas Li

发件人: Alonso Isidoro Roman <al...@gmail.com>>
日期: 2017年6月6日 星期二 下午6:21
至: Zhongshuang Li <zh...@envisioncn.com>>
抄送: Jörn Franke <jo...@gmail.com>>, "user@spark.apache.org<ma...@spark.apache.org>" <us...@spark.apache.org>>
主题: Re: Java SPI jar reload in Spark

Hi, a quick search on google.

https://github.com/spark-jobserver/spark-jobserver/issues/130

<https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
Alonso Isidoro Roman
about.me/alonso.isidoro.roman


2017-06-06 12:14 GMT+02:00 Jonnas Li(Contractor) <zh...@envisioncn.com>>:
Thank for your quick response.
These jars are used to define some customize business logic, and they can be treat as plug-ins in our business scenario. And the jars are developed/maintain by some third-party partners, this means there will be some version updating.
My expectation is update the business code with restarting the spark streaming job, any suggestion will be very grateful.

Regards
Jonnas Li

发件人: Jörn Franke <jo...@gmail.com>>
日期: 2017年6月6日 星期二 下午5:55
至: Zhongshuang Li <zh...@envisioncn.com>>
抄送: "user@spark.apache.org<ma...@spark.apache.org>" <us...@spark.apache.org>>
主题: Re: Java SPI jar reload in Spark

Why do you need jar reloading? What functionality is executed during jar reloading. Maybe there is another way to achieve the same without jar reloading. In fact, it might be dangerous from a functional point of view- functionality in jar changed and all your computation is wrong.

On 6. Jun 2017, at 11:35, Jonnas Li(Contractor) <zh...@envisioncn.com>> wrote:

I have a Spark Streaming application, which dynamically calling a jar (Java SPI), and the jar is called in a mapWithState() function, it was working fine for a long time.
Recently, I got a requirement which required to reload the jar during runtime.
But when the reloading is completed, the spark streaming job got failed, and I get the following exception, it seems the spark try to deserialize the checkpoint failed.
My question is whether the logic in the jar will be serialized into checkpoint, and is it possible to do the jar reloading during runtime in Spark Streaming?



[2017-06-06 17:13:12,185] WARN Lost task 1.0 in stage 5355.0 (TID 4817, ip-10-21-14-205.envisioncn.com<http://ip-10-21-14-205.envisioncn.com>): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org<http://org.apache.spark.rdd.RDD.org>$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.streaming.rdd.MapWithStateRDD
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:71)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
 (org.apache.spark.scheduler.TaskSetManager)
[2017-06-06 17:13:12,298] ERROR Task 1 in stage 5355.0 failed 8 times; aborting job (org.apache.spark.scheduler.TaskSetManager)
[2017-06-06 17:13:12,314] ERROR Error running job streaming job 1496740392000 ms.0 (org.apache.spark.streaming.scheduler.JobScheduler)

Thank you in advanced!

Regards
Jonnas Li



本邮件(包括任何附件)内容机密并受法律保护。如果您意外地收到这封邮件,请回复通知发件人并从当前系统中删除本邮件。任何未经授权者,严禁使用并部分或者全部的转发本条信息。任何未经授权的使用或传播都是被严格禁止的。远景能源与其分公司不对因不正确和不完整的转发此邮件包含的信息以及因任何因邮件延迟或对你的系统造成的损害而负责。远景能源不能保证此邮件的真实完整性,也不能确定此邮件是否含有病毒或者监听程序。
This email message (including any attachments) is confidential and may be legally privileged. If you have received it by mistake, please notify the sender by return email and delete this message from your system. Any unauthorized use or dissemination of this message in whole or in part is strictly prohibited. Envision Energy Limited and all its subsidiaries shall not be liable for the improper or incomplete transmission of the information contained in this email nor for any delay in its receipt or damage to your system. Envision Energy Limited does not guarantee the integrity of this email message, nor that this email message is free of viruses, interceptions, or interference.



本邮件(包括任何附件)内容机密并受法律保护。如果您意外地收到这封邮件,请回复通知发件人并从当前系统中删除本邮件。任何未经授权者,严禁使用并部分或者全部的转发本条信息。任何未经授权的使用或传播都是被严格禁止的。远景能源与其分公司不对因不正确和不完整的转发此邮件包含的信息以及因任何因邮件延迟或对你的系统造成的损害而负责。远景能源不能保证此邮件的真实完整性,也不能确定此邮件是否含有病毒或者监听程序。
This email message (including any attachments) is confidential and may be legally privileged. If you have received it by mistake, please notify the sender by return email and delete this message from your system. Any unauthorized use or dissemination of this message in whole or in part is strictly prohibited. Envision Energy Limited and all its subsidiaries shall not be liable for the improper or incomplete transmission of the information contained in this email nor for any delay in its receipt or damage to your system. Envision Energy Limited does not guarantee the integrity of this email message, nor that this email message is free of viruses, interceptions, or interference.




本邮件(包括任何附件)内容机密并受法律保护。如果您意外地收到这封邮件,请回复通知发件人并从当前系统中删除本邮件。任何未经授权者,严禁使用并部分或者全部的转发本条信息。任何未经授权的使用或传播都是被严格禁止的。远景能源与其分公司不对因不正确和不完整的转发此邮件包含的信息以及因任何因邮件延迟或对你的系统造成的损害而负责。远景能源不能保证此邮件的真实完整性,也不能确定此邮件是否含有病毒或者监听程序。
This email message (including any attachments) is confidential and may be legally privileged. If you have received it by mistake, please notify the sender by return email and delete this message from your system. Any unauthorized use or dissemination of this message in whole or in part is strictly prohibited. Envision Energy Limited and all its subsidiaries shall not be liable for the improper or incomplete transmission of the information contained in this email nor for any delay in its receipt or damage to your system. Envision Energy Limited does not guarantee the integrity of this email message, nor that this email message is free of viruses, interceptions, or interference.



本邮件(包括任何附件)内容机密并受法律保护。如果您意外地收到这封邮件,请回复通知发件人并从当前系统中删除本邮件。任何未经授权者,严禁使用并部分或者全部的转发本条信息。任何未经授权的使用或传播都是被严格禁止的。远景能源与其分公司不对因不正确和不完整的转发此邮件包含的信息以及因任何因邮件延迟或对你的系统造成的损害而负责。远景能源不能保证此邮件的真实完整性,也不能确定此邮件是否含有病毒或者监听程序。
This email message (including any attachments) is confidential and may be legally privileged. If you have received it by mistake, please notify the sender by return email and delete this message from your system. Any unauthorized use or dissemination of this message in whole or in part is strictly prohibited. Envision Energy Limited and all its subsidiaries shall not be liable for the improper or incomplete transmission of the information contained in this email nor for any delay in its receipt or damage to your system. Envision Energy Limited does not guarantee the integrity of this email message, nor that this email message is free of viruses, interceptions, or interference.

Re: Java SPI jar reload in Spark

Posted by "Jonnas Li(Contractor)" <zh...@envisioncn.com>.
I used java.util.ServiceLoader<S><https://docs.oracle.com/javase/7/docs/api/java/util/ServiceLoader.html>  , as the javadoc says it supports reloading.
Please point it out if I'm mis-understanding.

Regards
Jonnas Li

发件人: Alonso Isidoro Roman <al...@gmail.com>>
日期: 2017年6月6日 星期二 下午6:21
至: Zhongshuang Li <zh...@envisioncn.com>>
抄送: Jörn Franke <jo...@gmail.com>>, "user@spark.apache.org<ma...@spark.apache.org>" <us...@spark.apache.org>>
主题: Re: Java SPI jar reload in Spark

Hi, a quick search on google.

https://github.com/spark-jobserver/spark-jobserver/issues/130

<https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
Alonso Isidoro Roman
about.me/alonso.isidoro.roman


2017-06-06 12:14 GMT+02:00 Jonnas Li(Contractor) <zh...@envisioncn.com>>:
Thank for your quick response.
These jars are used to define some customize business logic, and they can be treat as plug-ins in our business scenario. And the jars are developed/maintain by some third-party partners, this means there will be some version updating.
My expectation is update the business code with restarting the spark streaming job, any suggestion will be very grateful.

Regards
Jonnas Li

发件人: Jörn Franke <jo...@gmail.com>>
日期: 2017年6月6日 星期二 下午5:55
至: Zhongshuang Li <zh...@envisioncn.com>>
抄送: "user@spark.apache.org<ma...@spark.apache.org>" <us...@spark.apache.org>>
主题: Re: Java SPI jar reload in Spark

Why do you need jar reloading? What functionality is executed during jar reloading. Maybe there is another way to achieve the same without jar reloading. In fact, it might be dangerous from a functional point of view- functionality in jar changed and all your computation is wrong.

On 6. Jun 2017, at 11:35, Jonnas Li(Contractor) <zh...@envisioncn.com>> wrote:

I have a Spark Streaming application, which dynamically calling a jar (Java SPI), and the jar is called in a mapWithState() function, it was working fine for a long time.
Recently, I got a requirement which required to reload the jar during runtime.
But when the reloading is completed, the spark streaming job got failed, and I get the following exception, it seems the spark try to deserialize the checkpoint failed.
My question is whether the logic in the jar will be serialized into checkpoint, and is it possible to do the jar reloading during runtime in Spark Streaming?



[2017-06-06 17:13:12,185] WARN Lost task 1.0 in stage 5355.0 (TID 4817, ip-10-21-14-205.envisioncn.com<http://ip-10-21-14-205.envisioncn.com>): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org<http://org.apache.spark.rdd.RDD.org>$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.streaming.rdd.MapWithStateRDD
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:71)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
 (org.apache.spark.scheduler.TaskSetManager)
[2017-06-06 17:13:12,298] ERROR Task 1 in stage 5355.0 failed 8 times; aborting job (org.apache.spark.scheduler.TaskSetManager)
[2017-06-06 17:13:12,314] ERROR Error running job streaming job 1496740392000 ms.0 (org.apache.spark.streaming.scheduler.JobScheduler)

Thank you in advanced!

Regards
Jonnas Li



本邮件(包括任何附件)内容机密并受法律保护。如果您意外地收到这封邮件,请回复通知发件人并从当前系统中删除本邮件。任何未经授权者,严禁使用并部分或者全部的转发本条信息。任何未经授权的使用或传播都是被严格禁止的。远景能源与其分公司不对因不正确和不完整的转发此邮件包含的信息以及因任何因邮件延迟或对你的系统造成的损害而负责。远景能源不能保证此邮件的真实完整性,也不能确定此邮件是否含有病毒或者监听程序。
This email message (including any attachments) is confidential and may be legally privileged. If you have received it by mistake, please notify the sender by return email and delete this message from your system. Any unauthorized use or dissemination of this message in whole or in part is strictly prohibited. Envision Energy Limited and all its subsidiaries shall not be liable for the improper or incomplete transmission of the information contained in this email nor for any delay in its receipt or damage to your system. Envision Energy Limited does not guarantee the integrity of this email message, nor that this email message is free of viruses, interceptions, or interference.



本邮件(包括任何附件)内容机密并受法律保护。如果您意外地收到这封邮件,请回复通知发件人并从当前系统中删除本邮件。任何未经授权者,严禁使用并部分或者全部的转发本条信息。任何未经授权的使用或传播都是被严格禁止的。远景能源与其分公司不对因不正确和不完整的转发此邮件包含的信息以及因任何因邮件延迟或对你的系统造成的损害而负责。远景能源不能保证此邮件的真实完整性,也不能确定此邮件是否含有病毒或者监听程序。
This email message (including any attachments) is confidential and may be legally privileged. If you have received it by mistake, please notify the sender by return email and delete this message from your system. Any unauthorized use or dissemination of this message in whole or in part is strictly prohibited. Envision Energy Limited and all its subsidiaries shall not be liable for the improper or incomplete transmission of the information contained in this email nor for any delay in its receipt or damage to your system. Envision Energy Limited does not guarantee the integrity of this email message, nor that this email message is free of viruses, interceptions, or interference.




本邮件(包括任何附件)内容机密并受法律保护。如果您意外地收到这封邮件,请回复通知发件人并从当前系统中删除本邮件。任何未经授权者,严禁使用并部分或者全部的转发本条信息。任何未经授权的使用或传播都是被严格禁止的。远景能源与其分公司不对因不正确和不完整的转发此邮件包含的信息以及因任何因邮件延迟或对你的系统造成的损害而负责。远景能源不能保证此邮件的真实完整性,也不能确定此邮件是否含有病毒或者监听程序。
This email message (including any attachments) is confidential and may be legally privileged. If you have received it by mistake, please notify the sender by return email and delete this message from your system. Any unauthorized use or dissemination of this message in whole or in part is strictly prohibited. Envision Energy Limited and all its subsidiaries shall not be liable for the improper or incomplete transmission of the information contained in this email nor for any delay in its receipt or damage to your system. Envision Energy Limited does not guarantee the integrity of this email message, nor that this email message is free of viruses, interceptions, or interference.

Re: Java SPI jar reload in Spark

Posted by Alonso Isidoro Roman <al...@gmail.com>.
Hi, a quick search on google.

https://github.com/spark-jobserver/spark-jobserver/issues/130

Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman
<https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>

2017-06-06 12:14 GMT+02:00 Jonnas Li(Contractor) <
zhongshuang.li@envisioncn.com>:

> Thank for your quick response.
> These jars are used to define some customize business logic, and they can
> be treat as plug-ins in our business scenario. And the jars are
> developed/maintain by some third-party partners, this means there will be
> some version updating.
> My expectation is update the business code with restarting the spark
> streaming job, any suggestion will be very grateful.
>
> Regards
> Jonnas Li
>
> 发件人: Jörn Franke <jo...@gmail.com>
> 日期: 2017年6月6日 星期二 下午5:55
> 至: Zhongshuang Li <zh...@envisioncn.com>
> 抄送: "user@spark.apache.org" <us...@spark.apache.org>
> 主题: Re: Java SPI jar reload in Spark
>
> Why do you need jar reloading? What functionality is executed during jar
> reloading. Maybe there is another way to achieve the same without jar
> reloading. In fact, it might be dangerous from a functional point of view-
> functionality in jar changed and all your computation is wrong.
>
> On 6. Jun 2017, at 11:35, Jonnas Li(Contractor) <
> zhongshuang.li@envisioncn.com> wrote:
>
> I have a Spark Streaming application, which dynamically calling a jar
> (Java SPI), and the jar is called in a mapWithState() function, it was
> working fine for a long time.
> Recently, I got a requirement which required to reload the jar during
> runtime.
> But when the reloading is completed, the spark streaming job got failed,
> and I get the following exception, it seems the spark try to deserialize
> the checkpoint failed.
> My question is whether the logic in the jar will be serialized into
> checkpoint, and is it possible to do the jar reloading during runtime in
> Spark Streaming?
>
>
> [2017-06-06 17:13:12,185] WARN Lost task 1.0 in stage 5355.0 (TID 4817, ip-10-21-14-205.envisioncn.com): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.streaming.rdd.MapWithStateRDD
> 	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
> 	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
> 	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:71)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:86)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
>  (org.apache.spark.scheduler.TaskSetManager)
> [2017-06-06 17:13:12,298] ERROR Task 1 in stage 5355.0 failed 8 times; aborting job (org.apache.spark.scheduler.TaskSetManager)
> [2017-06-06 17:13:12,314] ERROR Error running job streaming job 1496740392000 ms.0 (org.apache.spark.streaming.scheduler.JobScheduler)
>
>
> Thank you in advanced!
>
> Regards
> Jonnas Li
>
>
>
> 本邮件(包括任何附件)内容机密并受法律保护。如果您意外地收到这封邮件,请回复通知发件人并从当前系统中删除本邮件。任何未经授权者,
> 严禁使用并部分或者全部的转发本条信息。任何未经授权的使用或传播都是被严格禁止的。远景能源与其分公司不对因不正确和不完整的转发此邮件包含的信息
> 以及因任何因邮件延迟或对你的系统造成的损害而负责。远景能源不能保证此邮件的真实完整性,也不能确定此邮件是否含有病毒或者监听程序。
> This email message (including any attachments) is confidential and may be
> legally privileged. If you have received it by mistake, please notify the
> sender by return email and delete this message from your system. Any
> unauthorized use or dissemination of this message in whole or in part is
> strictly prohibited. Envision Energy Limited and all its subsidiaries shall
> not be liable for the improper or incomplete transmission of the
> information contained in this email nor for any delay in its receipt or
> damage to your system. Envision Energy Limited does not guarantee the
> integrity of this email message, nor that this email message is free of
> viruses, interceptions, or interference.
>
>
>
>
> 本邮件(包括任何附件)内容机密并受法律保护。如果您意外地收到这封邮件,请回复通知发件人并从当前系统中删除本邮件。任何未经授权者,
> 严禁使用并部分或者全部的转发本条信息。任何未经授权的使用或传播都是被严格禁止的。远景能源与其分公司不对因不正确和不完整的转发此邮件包含的信息
> 以及因任何因邮件延迟或对你的系统造成的损害而负责。远景能源不能保证此邮件的真实完整性,也不能确定此邮件是否含有病毒或者监听程序。
> This email message (including any attachments) is confidential and may be
> legally privileged. If you have received it by mistake, please notify the
> sender by return email and delete this message from your system. Any
> unauthorized use or dissemination of this message in whole or in part is
> strictly prohibited. Envision Energy Limited and all its subsidiaries shall
> not be liable for the improper or incomplete transmission of the
> information contained in this email nor for any delay in its receipt or
> damage to your system. Envision Energy Limited does not guarantee the
> integrity of this email message, nor that this email message is free of
> viruses, interceptions, or interference.
>

Re: Java SPI jar reload in Spark

Posted by "Jonnas Li(Contractor)" <zh...@envisioncn.com>.
Thank for your quick response.
These jars are used to define some customize business logic, and they can be treat as plug-ins in our business scenario. And the jars are developed/maintain by some third-party partners, this means there will be some version updating.
My expectation is update the business code with restarting the spark streaming job, any suggestion will be very grateful.

Regards
Jonnas Li

发件人: Jörn Franke <jo...@gmail.com>>
日期: 2017年6月6日 星期二 下午5:55
至: Zhongshuang Li <zh...@envisioncn.com>>
抄送: "user@spark.apache.org<ma...@spark.apache.org>" <us...@spark.apache.org>>
主题: Re: Java SPI jar reload in Spark

Why do you need jar reloading? What functionality is executed during jar reloading. Maybe there is another way to achieve the same without jar reloading. In fact, it might be dangerous from a functional point of view- functionality in jar changed and all your computation is wrong.

On 6. Jun 2017, at 11:35, Jonnas Li(Contractor) <zh...@envisioncn.com>> wrote:

I have a Spark Streaming application, which dynamically calling a jar (Java SPI), and the jar is called in a mapWithState() function, it was working fine for a long time.
Recently, I got a requirement which required to reload the jar during runtime.
But when the reloading is completed, the spark streaming job got failed, and I get the following exception, it seems the spark try to deserialize the checkpoint failed.
My question is whether the logic in the jar will be serialized into checkpoint, and is it possible to do the jar reloading during runtime in Spark Streaming?



[2017-06-06 17:13:12,185] WARN Lost task 1.0 in stage 5355.0 (TID 4817, ip-10-21-14-205.envisioncn.com<http://ip-10-21-14-205.envisioncn.com>): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.streaming.rdd.MapWithStateRDD
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:71)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
 (org.apache.spark.scheduler.TaskSetManager)
[2017-06-06 17:13:12,298] ERROR Task 1 in stage 5355.0 failed 8 times; aborting job (org.apache.spark.scheduler.TaskSetManager)
[2017-06-06 17:13:12,314] ERROR Error running job streaming job 1496740392000 ms.0 (org.apache.spark.streaming.scheduler.JobScheduler)

Thank you in advanced!

Regards
Jonnas Li



本邮件(包括任何附件)内容机密并受法律保护。如果您意外地收到这封邮件,请回复通知发件人并从当前系统中删除本邮件。任何未经授权者,严禁使用并部分或者全部的转发本条信息。任何未经授权的使用或传播都是被严格禁止的。远景能源与其分公司不对因不正确和不完整的转发此邮件包含的信息以及因任何因邮件延迟或对你的系统造成的损害而负责。远景能源不能保证此邮件的真实完整性,也不能确定此邮件是否含有病毒或者监听程序。
This email message (including any attachments) is confidential and may be legally privileged. If you have received it by mistake, please notify the sender by return email and delete this message from your system. Any unauthorized use or dissemination of this message in whole or in part is strictly prohibited. Envision Energy Limited and all its subsidiaries shall not be liable for the improper or incomplete transmission of the information contained in this email nor for any delay in its receipt or damage to your system. Envision Energy Limited does not guarantee the integrity of this email message, nor that this email message is free of viruses, interceptions, or interference.



本邮件(包括任何附件)内容机密并受法律保护。如果您意外地收到这封邮件,请回复通知发件人并从当前系统中删除本邮件。任何未经授权者,严禁使用并部分或者全部的转发本条信息。任何未经授权的使用或传播都是被严格禁止的。远景能源与其分公司不对因不正确和不完整的转发此邮件包含的信息以及因任何因邮件延迟或对你的系统造成的损害而负责。远景能源不能保证此邮件的真实完整性,也不能确定此邮件是否含有病毒或者监听程序。
This email message (including any attachments) is confidential and may be legally privileged. If you have received it by mistake, please notify the sender by return email and delete this message from your system. Any unauthorized use or dissemination of this message in whole or in part is strictly prohibited. Envision Energy Limited and all its subsidiaries shall not be liable for the improper or incomplete transmission of the information contained in this email nor for any delay in its receipt or damage to your system. Envision Energy Limited does not guarantee the integrity of this email message, nor that this email message is free of viruses, interceptions, or interference.

Re: Java SPI jar reload in Spark

Posted by Jörn Franke <jo...@gmail.com>.
Why do you need jar reloading? What functionality is executed during jar reloading. Maybe there is another way to achieve the same without jar reloading. In fact, it might be dangerous from a functional point of view- functionality in jar changed and all your computation is wrong.

> On 6. Jun 2017, at 11:35, Jonnas Li(Contractor) <zh...@envisioncn.com> wrote:
> 
> I have a Spark Streaming application, which dynamically calling a jar (Java SPI), and the jar is called in a mapWithState() function, it was working fine for a long time.
> Recently, I got a requirement which required to reload the jar during runtime.
> But when the reloading is completed, the spark streaming job got failed, and I get the following exception, it seems the spark try to deserialize the checkpoint failed.
> My question is whether the logic in the jar will be serialized into checkpoint, and is it possible to do the jar reloading during runtime in Spark Streaming?
> 
> 
> [2017-06-06 17:13:12,185] WARN Lost task 1.0 in stage 5355.0 (TID 4817, ip-10-21-14-205.envisioncn.com): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.streaming.rdd.MapWithStateRDD
> 	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
> 	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
> 	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
> 	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:71)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:86)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
>  (org.apache.spark.scheduler.TaskSetManager)
> [2017-06-06 17:13:12,298] ERROR Task 1 in stage 5355.0 failed 8 times; aborting job (org.apache.spark.scheduler.TaskSetManager)
> [2017-06-06 17:13:12,314] ERROR Error running job streaming job 1496740392000 ms.0 (org.apache.spark.streaming.scheduler.JobScheduler)
> 
> Thank you in advanced!
> 
> Regards
> Jonnas Li
> 
> 
> 
> 本邮件(包括任何附件)内容机密并受法律保护。如果您意外地收到这封邮件,请回复通知发件人并从当前系统中删除本邮件。任何未经授权者,严禁使用并部分或者全部的转发本条信息。任何未经授权的使用或传播都是被严格禁止的。远景能源与其分公司不对因不正确和不完整的转发此邮件包含的信息以及因任何因邮件延迟或对你的系统造成的损害而负责。远景能源不能保证此邮件的真实完整性,也不能确定此邮件是否含有病毒或者监听程序。 
> This email message (including any attachments) is confidential and may be legally privileged. If you have received it by mistake, please notify the sender by return email and delete this message from your system. Any unauthorized use or dissemination of this message in whole or in part is strictly prohibited. Envision Energy Limited and all its subsidiaries shall not be liable for the improper or incomplete transmission of the information contained in this email nor for any delay in its receipt or damage to your system. Envision Energy Limited does not guarantee the integrity of this email message, nor that this email message is free of viruses, interceptions, or interference.