You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Balachandar R.A." <ba...@gmail.com> on 2016/07/13 16:44:48 UTC

Issue in spark job. Remote rpc client dissociated

Hello

In one of my use cases, i need to process list of folders in parallel. I
used
Sc.parallelize (list,list.size).map(" logic to process the folder").
I have a six node cluster and there are six folders to process.  Ideally i
expect that each of my node process one folder.  But,  i see that a node
process multiple folders while one or two of the nodes do not get any job.
In the end, the spark- submit crashes with the exception saying "remote RPC
client dissociated". Can someone give me a hint on what's going wrong here?
Please note that this issue does not arise if i comment my logic that
processes the folder but simply print folder name. In this case,  every
node gets one folder to process.  I inserted a sleep of 40 seconds inside
the map. No issue. But when i uncomment my logic i see this issue. Also,
before crashing it does process some of the folders successfully.
Successfully means the business logic generates a file in a shared file
system.

Regards
Bala

Re: Issue in spark job. Remote rpc client dissociated

Posted by "Balachandar R.A." <ba...@gmail.com>.
Hello,

The variable argsList is an array defined above the parallel block. This
variawis accessed inside the map function. Launcher.main is not threadsafe.
Is is not possible to specify to spark that every folder needs to be
processed as a separate process in a separate working directory?

Regards
Bala
On 14-Jul-2016 2:37 pm, "Sun Rui" <su...@163.com> wrote:

> Where is argsList defined? is Launcher.main() thread-safe? Note that if
> multiple folders are processed in a node, multiple threads may concurrently
> run in the executor, each processing a folder.
>
> On Jul 14, 2016, at 12:28, Balachandar R.A. <ba...@gmail.com>
> wrote:
>
> Hello Ted,
>>
>
>
> Thanks for the response. Here is the additional information.
>
>
>> I am using spark 1.6.1  (spark-1.6.1-bin-hadoop2.6)
>>
>> Here is the code snippet
>>
>>
>> JavaRDD<File> add = jsc.parallelize(listFolders, listFolders.size());
>>             JavaRDD<Integer> test = add.map(new Function<File, Integer>()
>> {
>>                 @Override
>>                 public Integer call(File file) throws Exception {
>>                     String folder = file.getName();
>>                     System.out.println("[x] Processing dataset from the
>> directory " + folder);
>>                     int status = 0;
>>                    argsList[3] = argsList[3] + "/"+ folder;   // full
>> path of the input folder. Input folder is in shared file system that every
>> worker node has access to it. Something like (“/home/user/software/data/”)
>> and folder name will be like (“20161307”)
>>                     argsList[7] = argsList[7] + "/" + folder + ".csv"; //
>> full path of the output.
>>                     try{
>>                         Launcher.main(argsList);  // Launcher class is a
>> black box. It process the input folder and create a csv file which in the
>> output location (argsList[7]). This is also in a shared file system
>>                         status = 0;
>>                     }
>>                     catch(Exception e)
>>                     {
>>                         System.out.println("[x] Execution of import tool
>> for the directory " + folder + "failed");
>>                         status = 0;
>>                     }
>>                     accum.add(1);
>>                     return status;
>>                 }
>>             });
>>
>>
>> Here is the spark-env.sh
>>
>> export SPARK_WORKER_INSTANCES=1
>> export JAVA_HOME=/home/work_IW1/opt/jdk1.8.0_77/
>> export HADOOP_CONF_DIR=/home/work_IW1/opt/hadoop-2.7.2/etc/hadoop
>>
>> Here is the spark-defaults.conf
>>
>>
>>   spark.master                     spark:// master:7077
>>   spark.eventLog.enabled           true
>>   spark.eventLog.dir               hdfs://master:9000/sparkEvent
>>   spark.serializer
>> org.apache.spark.serializer.KryoSerializer
>>   spark.driver.memory              4g
>>
>>
>
>
> Hope it helps.
>
>
>

Re: Issue in spark job. Remote rpc client dissociated

Posted by Sun Rui <su...@163.com>.
Where is argsList defined? is Launcher.main() thread-safe? Note that if multiple folders are processed in a node, multiple threads may concurrently run in the executor, each processing a folder.

> On Jul 14, 2016, at 12:28, Balachandar R.A. <ba...@gmail.com> wrote:
> 
> Hello Ted, 
> 
> 
> Thanks for the response. Here is the additional information.
>  
> I am using spark 1.6.1  (spark-1.6.1-bin-hadoop2.6)
>  
> Here is the code snippet
>  
>  
> JavaRDD<File> add = jsc.parallelize(listFolders, listFolders.size());
>             JavaRDD<Integer> test = add.map(new Function<File, Integer>() {
>                 @Override
>                 public Integer call(File file) throws Exception {
>                     String folder = file.getName();
>                     System.out.println("[x] Processing dataset from the directory " + folder);
>                     int status = 0;
>                    argsList[3] = argsList[3] + "/"+ folder;   // full path of the input folder. Input folder is in shared file system that every worker node has access to it. Something like (“/home/user/software/data/”) and folder name will be like (“20161307”)
>                     argsList[7] = argsList[7] + "/" + folder + ".csv"; // full path of the output.
>                     try{
>                         Launcher.main(argsList);  // Launcher class is a black box. It process the input folder and create a csv file which in the output location (argsList[7]). This is also in a shared file system
>                         status = 0;
>                     }
>                     catch(Exception e)
>                     {
>                         System.out.println("[x] Execution of import tool for the directory " + folder + "failed");
>                         status = 0;
>                     }
>                     accum.add(1);
>                     return status;
>                 }
>             });
>  
>  
> Here is the spark-env.sh
>  
> export SPARK_WORKER_INSTANCES=1
> export JAVA_HOME=/home/work_IW1/opt/jdk1.8.0_77/
> export HADOOP_CONF_DIR=/home/work_IW1/opt/hadoop-2.7.2/etc/hadoop
>  
> Here is the spark-defaults.conf
>  
>  
>   spark.master                     spark:// master:7077
>   spark.eventLog.enabled           true
>   spark.eventLog.dir               hdfs://master:9000/sparkEvent
>   spark.serializer                 org.apache.spark.serializer.KryoSerializer
>   spark.driver.memory              4g
>  
> 
> 
> Hope it helps. 


Re: Issue in spark job. Remote rpc client dissociated

Posted by "Balachandar R.A." <ba...@gmail.com>.
>
> Hello Ted,
>


Thanks for the response. Here is the additional information.


> I am using spark 1.6.1  (spark-1.6.1-bin-hadoop2.6)
>
>
>
> Here is the code snippet
>
>
>
>
>
> JavaRDD<File> add = jsc.parallelize(listFolders, listFolders.size());
>
>             JavaRDD<Integer> test = add.map(new Function<File, Integer>() {
>
>                 @Override
>
>                 public Integer call(File file) throws Exception {
>
>                     String folder = file.getName();
>
>                     System.out.println("[x] Processing dataset from the
> directory " + folder);
>
>                     int status = 0;
>
>                    argsList[3] = argsList[3] + "/"+ folder;   // full path
> of the input folder. Input folder is in shared file system that every
> worker node has access to it. Something like (“/home/user/software/data/”)
> and folder name will be like (“20161307”)
>
>                     argsList[7] = argsList[7] + "/" + folder + ".csv"; //
> full path of the output.
>
>                     try{
>
>                         Launcher.main(argsList);  // Launcher class is a
> black box. It process the input folder and create a csv file which in the
> output location (argsList[7]). This is also in a shared file system
>
>                         status = 0;
>
>                     }
>
>                     catch(Exception e)
>
>                     {
>
>                         System.out.println("[x] Execution of import tool
> for the directory " + folder + "failed");
>
>                         status = 0;
>
>                     }
>
>                     accum.add(1);
>
>                     return status;
>
>                 }
>
>             });
>
>
>
>
>
> Here is the spark-env.sh
>
>
>
> export SPARK_WORKER_INSTANCES=1
>
> export JAVA_HOME=/home/work_IW1/opt/jdk1.8.0_77/
>
> export HADOOP_CONF_DIR=/home/work_IW1/opt/hadoop-2.7.2/etc/hadoop
>
>
>
> Here is the spark-defaults.conf
>
>
>
>
>
>   spark.master                     spark:// master:7077
>
>   spark.eventLog.enabled           true
>
>   spark.eventLog.dir               hdfs://master:9000/sparkEvent
>
>   spark.serializer
> org.apache.spark.serializer.KryoSerializer
>
>   spark.driver.memory              4g
>
>
>


Hope it helps.

Re: Issue in spark job. Remote rpc client dissociated

Posted by Ted Yu <yu...@gmail.com>.
Which Spark release are you using ?

Can you disclose what the folder processing does (code snippet is better) ?

Thanks

On Wed, Jul 13, 2016 at 9:44 AM, Balachandar R.A. <ba...@gmail.com>
wrote:

> Hello
>
> In one of my use cases, i need to process list of folders in parallel. I
> used
> Sc.parallelize (list,list.size).map(" logic to process the folder").
> I have a six node cluster and there are six folders to process.  Ideally i
> expect that each of my node process one folder.  But,  i see that a node
> process multiple folders while one or two of the nodes do not get any job.
> In the end, the spark- submit crashes with the exception saying "remote RPC
> client dissociated". Can someone give me a hint on what's going wrong here?
> Please note that this issue does not arise if i comment my logic that
> processes the folder but simply print folder name. In this case,  every
> node gets one folder to process.  I inserted a sleep of 40 seconds inside
> the map. No issue. But when i uncomment my logic i see this issue. Also,
> before crashing it does process some of the folders successfully.
> Successfully means the business logic generates a file in a shared file
> system.
>
> Regards
> Bala
>