You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "liyunzhang_intel (JIRA)" <ji...@apache.org> on 2016/10/20 06:38:59 UTC

[jira] [Comment Edited] (PIG-4920) Fail to use Javascript UDF in spark yarn client mode

    [ https://issues.apache.org/jira/browse/PIG-4920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15590984#comment-15590984 ] 

liyunzhang_intel edited comment on PIG-4920 at 10/20/16 6:38 AM:
-----------------------------------------------------------------

[~mohitsabharwal]: upload PIG-4920_6.patch,please help review. modification:
1. add more comments on SparkEngineConf#writeObject


here explain more on following code:
 Here following code is
 {code}

    if (!UDFContext.getUDFContext().isUDFConfEmpty()) {
        ...
    } else {
        out.writeObject(this.properties.getProperty(SPARK_UDFCONTEXT_UDFCONFS));
        out.writeObject(this.properties.getProperty(SPARK_UDFCONTEXT_CLIENTSYSPROPS));
    }
{code}

There are *2* threads will call SparkEngineConf#writeObject

*main* thread: SparkLauncher#initialize->
         SparkUtil#newJobConf->
         ObjectSerializer#serialize->
         SparkEngineConf#writeObject

     in main thread. UDFContext is not empty

*dag-scheduler-event-loop* thread: DAGScheduler.submitMissingTasks->JavaSerializationStream.writeObject
         in dag-scheduler-event-loop:  UDFContext is empty because UDFContext#getUDFContext is a thread local variable but at that time this.properties.getProperty(SPARK_UDFCONTEXT_UDFCONFS) is not empty because we initialize them in main thread.

the stacktrace of dag-scheduler-event-loop thread is like following:
{code}
dag-scheduler-event-loop@10914 daemon, prio=5, in group 'main', status: 'RUNNING'
      at org.apache.pig.backend.hadoop.executionengine.spark.SparkEngineConf.writeObject(SparkEngineConf.java:84)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:483)
      at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
      at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:483)
      at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
      at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:483)
      at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
      at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
      at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
      at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1003)
      at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921)
      at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1607)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
      at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

{code}


was (Author: kellyzly):
[~mohitsabharwal]: upload PIG-4920_6.patch, modification:
1. add more comments on SparkEngineConf#writeObject


here explain more on following code:
 Here following code is
 {code}

    if (!UDFContext.getUDFContext().isUDFConfEmpty()) {
        ...
    } else {
        out.writeObject(this.properties.getProperty(SPARK_UDFCONTEXT_UDFCONFS));
        out.writeObject(this.properties.getProperty(SPARK_UDFCONTEXT_CLIENTSYSPROPS));
    }
{code}

There are *2* threads will call SparkEngineConf#writeObject

*main* thread: SparkLauncher#initialize->
         SparkUtil#newJobConf->
         ObjectSerializer#serialize->
         SparkEngineConf#writeObject

     in main thread. UDFContext is not empty

*dag-scheduler-event-loop* thread: DAGScheduler.submitMissingTasks->JavaSerializationStream.writeObject
         in dag-scheduler-event-loop:  UDFContext is empty because UDFContext#getUDFContext is a thread local variable but at that time this.properties.getProperty(SPARK_UDFCONTEXT_UDFCONFS) is not empty because we initialize them in main thread.

the stacktrace of dag-scheduler-event-loop thread is like following:
{code}
dag-scheduler-event-loop@10914 daemon, prio=5, in group 'main', status: 'RUNNING'
      at org.apache.pig.backend.hadoop.executionengine.spark.SparkEngineConf.writeObject(SparkEngineConf.java:84)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:483)
      at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
      at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:483)
      at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
      at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:483)
      at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
      at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
      at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
      at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1003)
      at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921)
      at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1607)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
      at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

{code}

> Fail to use Javascript UDF in spark yarn client mode
> ----------------------------------------------------
>
>                 Key: PIG-4920
>                 URL: https://issues.apache.org/jira/browse/PIG-4920
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-4920.patch, PIG-4920_2.patch, PIG-4920_3.patch, PIG-4920_4.patch, PIG-4920_5.patch, PIG-4920_6.patch
>
>
> udf.pig 
> {code}
> register '/home/zly/prj/oss/merge.pig/pig/bin/udf.js' using javascript as myfuncs;
> A = load './passwd' as (a0:chararray, a1:chararray);
> B = foreach A generate myfuncs.helloworld();
> store B into './udf.out';
> {code}
> udf.js
> {code}
> helloworld.outputSchema = "word:chararray";
> function helloworld() {
>     return 'Hello, World';
> }
>     
> complex.outputSchema = "word:chararray";
> function complex(word){
>     return {word:word};
> }
> {code}
> run udf.pig in spark local mode(export SPARK_MASTER="local"), it successfully.
> run udf.pig in spark yarn client mode(export SPARK_MASTER="yarn-client"), it fails and error message like following:
> {noformat}
> Caused by: java.lang.reflect.InvocationTargetException
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>         at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
>         at org.apache.pig.impl.PigContext.instantiateFuncFromSpec(PigContext.java:744)
>         ... 84 more
> Caused by: java.lang.ExceptionInInitializerError
>         at org.apache.pig.scripting.js.JsScriptEngine.getInstance(JsScriptEngine.java:87)
>         at org.apache.pig.scripting.js.JsFunction.<init>(JsFunction.java:173)
>         ... 89 more
> Caused by: java.lang.IllegalStateException: could not get script path from UDFContext
>         at org.apache.pig.scripting.js.JsScriptEngine$Holder.<clinit>(JsScriptEngine.java:69)
>         ... 91 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)