You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Gourav Sengupta <go...@gmail.com> on 2022/01/01 06:06:48 UTC

Re: Issue Communicating with Driver, RpcTimeoutException

Hi,

please search for SPARK configurations page, and there is a setting to
increase this.

the code snippet that you have sent is not complete I think. I generally
read files into a dataframe, and then run query to extract first row or
first N rows.

Regards,
Gourav Sengupta

On Thu, Dec 30, 2021 at 7:38 PM Thinh Nguyen <tn...@dtechspace.com> wrote:

> When running my spark-submit application, the following error occurs:
>
>
> This error occurs when I try to read and load a file from a directory:
>
> *spark.read.csv(url+"/thresholds/*.csv").head.getString(0).toDouble*
>
> There is only one csv file and it contains only one text value. The job
> always hang at this part of my code. If I continue to let the job run,
> it’ll eventually failed with a java heap out of memory error.
>
> To provide some more context in case this is a memory-related error, here
> is the code that is ran before trying to read the csv file:
>
> -----------------------------
> *if* (doesTableExist) {
>
>
>         *var* tableName: String = formatTable("pplr");
>
>
>         *var* dfLoad = sparkSession.read
>                           .format("jdbc")
>                                  .option("url",
> postProcessor.getDestinationURL())
>                                  .option("driver",
> postProcessor.getDBDriver())
>                                  .option("user",
> postProcessor.getAuthUserName())
>                                  .option("password",
> postProcessor.getAuthUserPassword())
>                                  .option("query", "select timestamp from
> \"" + tableName + "\" order by timestamp desc limit 1")
>                                  .load()
>
>
>         *var* timeStampVal = dfLoad.select(col("timestamp")).collect()
>
>
>         lastTimestamp = timeStampVal(0)(0).asInstanceOf[Double]
>
> }
>
> *val* dfReader = sparkSession.read.format(fileformat)
>
>
> *var* inputDF = dfReader.schema(schemaFormat).load(dataSource.getURL())
>
>
> *var* distanceDF = pipelinemodel.transform(inputDF)
>
> *val* modelWithStage = findKMeansStage(pipelineModel)
> *val* findDistance = udf({feature: Vector => distToCentroid(feature,
> modelWithStage)})
> *val* resultDF = distanceDF.withColumn("distances", findDistance(col(
> "features")))
>
> --------------------
>
> Here are additional details regarding the submitted job:
> *Currently using:* Spark 3.1.2, Scala 2.12, Java 11
> *Spark Cluster Spec:* 8 workers, 48 cores, 64GB Memory
> *Application Submitted Spec:* 1 worker, 4 driver and executor cores, 8GB
> driver and executor memory
>
>

Re: Issue Communicating with Driver, RpcTimeoutException

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

cannot agree more with Mich, the REPL is one of the best tools for
debugging - at least for me personally  and never thought anyone else used
it much.

When I am using PYSPARK, I use PyCharm in debugging mode, but REPL is
always useful :)

I have though faced the issue that Sam has mentioned before, we just try to
read a few lines, and SPARK sometimes throws errors when the files are more
*spark.read.csv(url+"/thresholds/*.csv").head.getString(0).toDouble*

One of the things that I did was to read just one file, and infer schema
from there, and then read all the files by applying the schema which we
inferred by reading one of the files, but it was a long time back, perhaps
that approach still works.

Regards,
Gourav


On Sat, Jan 1, 2022 at 4:34 PM Mich Talebzadeh <mi...@gmail.com>
wrote:

>
> Guys,
>
>
> My inclination would be to read the CSV file in REPL to see if it exists
> and will not throw an error
>
>
> scala> import scala.util.{Try, Success, Failure}
>
> import scala.util.{Try, Success, Failure}
>
>
> scala> val df = Try(spark.read.option("header", true).
>
>      | csv("hdfs://<HOST>:9000/data/stg/domains/analytics")
>
>      | )match {case Success(df) => df
>
>      | case Failure(e) => throw new Exception("Error Encountered reading
> the csv file")}
>
> change parameters as needed
>
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 1 Jan 2022 at 13:40, Gourav Sengupta <go...@gmail.com>
> wrote:
>
>> Hi,
>>
>> my sincere apologies, I was under the severe wrong assumption that we all
>> can select "spark configuration" and right click on our browser and then
>> click on the first link appearing in Google search.
>>
>> Since you clearly cannot do Google search on "Spark configuration" click
>> on the first link and search for rpc, I am attaching the link here:
>> https://spark.apache.org/docs/latest/configuration.html#networking. Hope
>> that helps.
>>
>> There are several rpc configurations, I am making a massive assumption
>> that we can all read english and figure out that the last configuration
>> relates to rpc connection time out. If you cannot please do let me know.
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Sat, Jan 1, 2022 at 9:29 AM sam smith <qu...@gmail.com>
>> wrote:
>>
>>> Dear Gourav,
>>>
>>> As i myself interested with the issue, please we are here to ask
>>> question not to be answered with "search for Spark configuration page":
>>> that we already know. Clarify with details if you know or else please
>>> refrain from these kind of answers
>>>
>>> Thanks for your understanding
>>>
>>> Le sam. 1 janv. 2022 à 07:07, Gourav Sengupta <go...@gmail.com>
>>> a écrit :
>>>
>>>> Hi,
>>>>
>>>> please search for SPARK configurations page, and there is a setting to
>>>> increase this.
>>>>
>>>> the code snippet that you have sent is not complete I think. I
>>>> generally read files into a dataframe, and then run query to extract first
>>>> row or first N rows.
>>>>
>>>> Regards,
>>>> Gourav Sengupta
>>>>
>>>> On Thu, Dec 30, 2021 at 7:38 PM Thinh Nguyen <tn...@dtechspace.com>
>>>> wrote:
>>>>
>>>>> When running my spark-submit application, the following error occurs:
>>>>>
>>>>>
>>>>> This error occurs when I try to read and load a file from a directory:
>>>>>
>>>>> *spark.read.csv(url+"/thresholds/*.csv").head.getString(0).toDouble*
>>>>>
>>>>> There is only one csv file and it contains only one text value. The
>>>>> job always hang at this part of my code. If I continue to let the job run,
>>>>> it’ll eventually failed with a java heap out of memory error.
>>>>>
>>>>> To provide some more context in case this is a memory-related error,
>>>>> here is the code that is ran before trying to read the csv file:
>>>>>
>>>>> -----------------------------
>>>>> *if* (doesTableExist) {
>>>>>
>>>>>
>>>>>         *var* tableName: String = formatTable("pplr");
>>>>>
>>>>>
>>>>>         *var* dfLoad = sparkSession.read
>>>>>                           .format("jdbc")
>>>>>                                  .option("url",
>>>>> postProcessor.getDestinationURL())
>>>>>                                  .option("driver",
>>>>> postProcessor.getDBDriver())
>>>>>                                  .option("user",
>>>>> postProcessor.getAuthUserName())
>>>>>                                  .option("password",
>>>>> postProcessor.getAuthUserPassword())
>>>>>                                  .option("query", "select timestamp
>>>>> from \"" + tableName + "\" order by timestamp desc limit 1")
>>>>>                                  .load()
>>>>>
>>>>>
>>>>>         *var* timeStampVal = dfLoad.select(col("timestamp")).collect()
>>>>>
>>>>>
>>>>>         lastTimestamp = timeStampVal(0)(0).asInstanceOf[Double]
>>>>>
>>>>> }
>>>>>
>>>>> *val* dfReader = sparkSession.read.format(fileformat)
>>>>>
>>>>>
>>>>> *var* inputDF =
>>>>> dfReader.schema(schemaFormat).load(dataSource.getURL())
>>>>>
>>>>>
>>>>> *var* distanceDF = pipelinemodel.transform(inputDF)
>>>>>
>>>>> *val* modelWithStage = findKMeansStage(pipelineModel)
>>>>> *val* findDistance = udf({feature: Vector => distToCentroid(feature,
>>>>> modelWithStage)})
>>>>> *val* resultDF = distanceDF.withColumn("distances", findDistance(col(
>>>>> "features")))
>>>>>
>>>>> --------------------
>>>>>
>>>>> Here are additional details regarding the submitted job:
>>>>> *Currently using:* Spark 3.1.2, Scala 2.12, Java 11
>>>>> *Spark Cluster Spec:* 8 workers, 48 cores, 64GB Memory
>>>>> *Application Submitted Spec:* 1 worker, 4 driver and executor cores,
>>>>> 8GB driver and executor memory
>>>>>
>>>>>

Re: Issue Communicating with Driver, RpcTimeoutException

Posted by Mich Talebzadeh <mi...@gmail.com>.
Guys,


My inclination would be to read the CSV file in REPL to see if it exists
and will not throw an error


scala> import scala.util.{Try, Success, Failure}

import scala.util.{Try, Success, Failure}


scala> val df = Try(spark.read.option("header", true).

     | csv("hdfs://<HOST>:9000/data/stg/domains/analytics")

     | )match {case Success(df) => df

     | case Failure(e) => throw new Exception("Error Encountered reading
the csv file")}

change parameters as needed


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 1 Jan 2022 at 13:40, Gourav Sengupta <go...@gmail.com>
wrote:

> Hi,
>
> my sincere apologies, I was under the severe wrong assumption that we all
> can select "spark configuration" and right click on our browser and then
> click on the first link appearing in Google search.
>
> Since you clearly cannot do Google search on "Spark configuration" click
> on the first link and search for rpc, I am attaching the link here:
> https://spark.apache.org/docs/latest/configuration.html#networking. Hope
> that helps.
>
> There are several rpc configurations, I am making a massive assumption
> that we can all read english and figure out that the last configuration
> relates to rpc connection time out. If you cannot please do let me know.
>
> Regards,
> Gourav Sengupta
>
> On Sat, Jan 1, 2022 at 9:29 AM sam smith <qu...@gmail.com>
> wrote:
>
>> Dear Gourav,
>>
>> As i myself interested with the issue, please we are here to ask question
>> not to be answered with "search for Spark configuration page": that we
>> already know. Clarify with details if you know or else please refrain from
>> these kind of answers
>>
>> Thanks for your understanding
>>
>> Le sam. 1 janv. 2022 à 07:07, Gourav Sengupta <go...@gmail.com>
>> a écrit :
>>
>>> Hi,
>>>
>>> please search for SPARK configurations page, and there is a setting to
>>> increase this.
>>>
>>> the code snippet that you have sent is not complete I think. I generally
>>> read files into a dataframe, and then run query to extract first row or
>>> first N rows.
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Thu, Dec 30, 2021 at 7:38 PM Thinh Nguyen <tn...@dtechspace.com>
>>> wrote:
>>>
>>>> When running my spark-submit application, the following error occurs:
>>>>
>>>>
>>>> This error occurs when I try to read and load a file from a directory:
>>>>
>>>> *spark.read.csv(url+"/thresholds/*.csv").head.getString(0).toDouble*
>>>>
>>>> There is only one csv file and it contains only one text value. The job
>>>> always hang at this part of my code. If I continue to let the job run,
>>>> it’ll eventually failed with a java heap out of memory error.
>>>>
>>>> To provide some more context in case this is a memory-related error,
>>>> here is the code that is ran before trying to read the csv file:
>>>>
>>>> -----------------------------
>>>> *if* (doesTableExist) {
>>>>
>>>>
>>>>         *var* tableName: String = formatTable("pplr");
>>>>
>>>>
>>>>         *var* dfLoad = sparkSession.read
>>>>                           .format("jdbc")
>>>>                                  .option("url",
>>>> postProcessor.getDestinationURL())
>>>>                                  .option("driver",
>>>> postProcessor.getDBDriver())
>>>>                                  .option("user",
>>>> postProcessor.getAuthUserName())
>>>>                                  .option("password",
>>>> postProcessor.getAuthUserPassword())
>>>>                                  .option("query", "select timestamp
>>>> from \"" + tableName + "\" order by timestamp desc limit 1")
>>>>                                  .load()
>>>>
>>>>
>>>>         *var* timeStampVal = dfLoad.select(col("timestamp")).collect()
>>>>
>>>>
>>>>         lastTimestamp = timeStampVal(0)(0).asInstanceOf[Double]
>>>>
>>>> }
>>>>
>>>> *val* dfReader = sparkSession.read.format(fileformat)
>>>>
>>>>
>>>> *var* inputDF = dfReader.schema(schemaFormat).load(dataSource.getURL())
>>>>
>>>>
>>>> *var* distanceDF = pipelinemodel.transform(inputDF)
>>>>
>>>> *val* modelWithStage = findKMeansStage(pipelineModel)
>>>> *val* findDistance = udf({feature: Vector => distToCentroid(feature,
>>>> modelWithStage)})
>>>> *val* resultDF = distanceDF.withColumn("distances", findDistance(col(
>>>> "features")))
>>>>
>>>> --------------------
>>>>
>>>> Here are additional details regarding the submitted job:
>>>> *Currently using:* Spark 3.1.2, Scala 2.12, Java 11
>>>> *Spark Cluster Spec:* 8 workers, 48 cores, 64GB Memory
>>>> *Application Submitted Spec:* 1 worker, 4 driver and executor cores,
>>>> 8GB driver and executor memory
>>>>
>>>>

Re: Issue Communicating with Driver, RpcTimeoutException

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

my sincere apologies, I was under the severe wrong assumption that we all
can select "spark configuration" and right click on our browser and then
click on the first link appearing in Google search.

Since you clearly cannot do Google search on "Spark configuration" click on
the first link and search for rpc, I am attaching the link here:
https://spark.apache.org/docs/latest/configuration.html#networking. Hope
that helps.

There are several rpc configurations, I am making a massive assumption that
we can all read english and figure out that the last configuration relates
to rpc connection time out. If you cannot please do let me know.

Regards,
Gourav Sengupta

On Sat, Jan 1, 2022 at 9:29 AM sam smith <qu...@gmail.com> wrote:

> Dear Gourav,
>
> As i myself interested with the issue, please we are here to ask question
> not to be answered with "search for Spark configuration page": that we
> already know. Clarify with details if you know or else please refrain from
> these kind of answers
>
> Thanks for your understanding
>
> Le sam. 1 janv. 2022 à 07:07, Gourav Sengupta <go...@gmail.com>
> a écrit :
>
>> Hi,
>>
>> please search for SPARK configurations page, and there is a setting to
>> increase this.
>>
>> the code snippet that you have sent is not complete I think. I generally
>> read files into a dataframe, and then run query to extract first row or
>> first N rows.
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Thu, Dec 30, 2021 at 7:38 PM Thinh Nguyen <tn...@dtechspace.com>
>> wrote:
>>
>>> When running my spark-submit application, the following error occurs:
>>>
>>>
>>> This error occurs when I try to read and load a file from a directory:
>>>
>>> *spark.read.csv(url+"/thresholds/*.csv").head.getString(0).toDouble*
>>>
>>> There is only one csv file and it contains only one text value. The job
>>> always hang at this part of my code. If I continue to let the job run,
>>> it’ll eventually failed with a java heap out of memory error.
>>>
>>> To provide some more context in case this is a memory-related error,
>>> here is the code that is ran before trying to read the csv file:
>>>
>>> -----------------------------
>>> *if* (doesTableExist) {
>>>
>>>
>>>         *var* tableName: String = formatTable("pplr");
>>>
>>>
>>>         *var* dfLoad = sparkSession.read
>>>                           .format("jdbc")
>>>                                  .option("url",
>>> postProcessor.getDestinationURL())
>>>                                  .option("driver",
>>> postProcessor.getDBDriver())
>>>                                  .option("user",
>>> postProcessor.getAuthUserName())
>>>                                  .option("password",
>>> postProcessor.getAuthUserPassword())
>>>                                  .option("query", "select timestamp
>>> from \"" + tableName + "\" order by timestamp desc limit 1")
>>>                                  .load()
>>>
>>>
>>>         *var* timeStampVal = dfLoad.select(col("timestamp")).collect()
>>>
>>>
>>>         lastTimestamp = timeStampVal(0)(0).asInstanceOf[Double]
>>>
>>> }
>>>
>>> *val* dfReader = sparkSession.read.format(fileformat)
>>>
>>>
>>> *var* inputDF = dfReader.schema(schemaFormat).load(dataSource.getURL())
>>>
>>>
>>> *var* distanceDF = pipelinemodel.transform(inputDF)
>>>
>>> *val* modelWithStage = findKMeansStage(pipelineModel)
>>> *val* findDistance = udf({feature: Vector => distToCentroid(feature,
>>> modelWithStage)})
>>> *val* resultDF = distanceDF.withColumn("distances", findDistance(col(
>>> "features")))
>>>
>>> --------------------
>>>
>>> Here are additional details regarding the submitted job:
>>> *Currently using:* Spark 3.1.2, Scala 2.12, Java 11
>>> *Spark Cluster Spec:* 8 workers, 48 cores, 64GB Memory
>>> *Application Submitted Spec:* 1 worker, 4 driver and executor cores, 8GB
>>> driver and executor memory
>>>
>>>

Re: Issue Communicating with Driver, RpcTimeoutException

Posted by sam smith <qu...@gmail.com>.
Dear Gourav,

As i myself interested with the issue, please we are here to ask question
not to be answered with "search for Spark configuration page": that we
already know. Clarify with details if you know or else please refrain from
these kind of answers

Thanks for your understanding

Le sam. 1 janv. 2022 à 07:07, Gourav Sengupta <go...@gmail.com> a
écrit :

> Hi,
>
> please search for SPARK configurations page, and there is a setting to
> increase this.
>
> the code snippet that you have sent is not complete I think. I generally
> read files into a dataframe, and then run query to extract first row or
> first N rows.
>
> Regards,
> Gourav Sengupta
>
> On Thu, Dec 30, 2021 at 7:38 PM Thinh Nguyen <tn...@dtechspace.com>
> wrote:
>
>> When running my spark-submit application, the following error occurs:
>>
>>
>> This error occurs when I try to read and load a file from a directory:
>>
>> *spark.read.csv(url+"/thresholds/*.csv").head.getString(0).toDouble*
>>
>> There is only one csv file and it contains only one text value. The job
>> always hang at this part of my code. If I continue to let the job run,
>> it’ll eventually failed with a java heap out of memory error.
>>
>> To provide some more context in case this is a memory-related error, here
>> is the code that is ran before trying to read the csv file:
>>
>> -----------------------------
>> *if* (doesTableExist) {
>>
>>
>>         *var* tableName: String = formatTable("pplr");
>>
>>
>>         *var* dfLoad = sparkSession.read
>>                           .format("jdbc")
>>                                  .option("url",
>> postProcessor.getDestinationURL())
>>                                  .option("driver",
>> postProcessor.getDBDriver())
>>                                  .option("user",
>> postProcessor.getAuthUserName())
>>                                  .option("password",
>> postProcessor.getAuthUserPassword())
>>                                  .option("query", "select timestamp from
>> \"" + tableName + "\" order by timestamp desc limit 1")
>>                                  .load()
>>
>>
>>         *var* timeStampVal = dfLoad.select(col("timestamp")).collect()
>>
>>
>>         lastTimestamp = timeStampVal(0)(0).asInstanceOf[Double]
>>
>> }
>>
>> *val* dfReader = sparkSession.read.format(fileformat)
>>
>>
>> *var* inputDF = dfReader.schema(schemaFormat).load(dataSource.getURL())
>>
>>
>> *var* distanceDF = pipelinemodel.transform(inputDF)
>>
>> *val* modelWithStage = findKMeansStage(pipelineModel)
>> *val* findDistance = udf({feature: Vector => distToCentroid(feature,
>> modelWithStage)})
>> *val* resultDF = distanceDF.withColumn("distances", findDistance(col(
>> "features")))
>>
>> --------------------
>>
>> Here are additional details regarding the submitted job:
>> *Currently using:* Spark 3.1.2, Scala 2.12, Java 11
>> *Spark Cluster Spec:* 8 workers, 48 cores, 64GB Memory
>> *Application Submitted Spec:* 1 worker, 4 driver and executor cores, 8GB
>> driver and executor memory
>>
>>