You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mich Talebzadeh <mi...@gmail.com> on 2020/05/05 15:25:18 UTC
Exception handling in Spark
Hi,
As I understand exception handling in Spark only makes sense if one
attempts an action as opposed to lazy transformations?
Let us assume that I am reading an XML file from the HDFS directory and
create a dataframe DF on it
val broadcastValue = "123456789" // I assume this will be sent as a
constant for the batch
// Create a DF on top of XML
val df = spark.read.
format("com.databricks.spark.xml").
option("rootTag", "hierarchy").
option("rowTag", "sms_request").
load("/tmp/broadcast.xml")
val newDF = df.withColumn("broadcastid", lit(broadcastValue))
newDF.createOrReplaceTempView("tmp")
// Put data in Hive table
//
sqltext = """
INSERT INTO TABLE michtest.BroadcastStaging PARTITION
(broadcastid="123456", brand)
SELECT
ocis_party_id AS partyId
, target_mobile_no AS phoneNumber
, brand
, broadcastid
FROM tmp
"""
//
// Here I am performing a collection
try {
spark.sql(sqltext)
} catch {
case e: SQLException => e.printStackTrace
sys.exit()
}
Now the issue I have is that what if the xml file /tmp/broadcast.xml does
not exist or deleted? I won't be able to catch the error until the hive
table is populated. Of course I can write a shell script to check if the
file exist before running the job or put small collection like
df.show(1,0). Are there more general alternatives?
Thanks
Dr Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
Re: Exception handling in Spark
Posted by Brandon Geise <br...@gmail.com>.
Read is an action, so you could wrap it in a Try (or whatever you want)
scala> val df = Try(spark.read.csv("test"))
df: scala.util.Try[org.apache.spark.sql.DataFrame] = Failure(org.apache.spark.sql.AnalysisException: Path does not exist: file:/test;)
From: Mich Talebzadeh <mi...@gmail.com>
Date: Tuesday, May 5, 2020 at 12:45 PM
To: Brandon Geise <br...@gmail.com>
Cc: "user @spark" <us...@spark.apache.org>
Subject: Re: Exception handling in Spark
Thanks Brandon!
i should have remembered that.
basically the code gets out with sys.exit(1) if it cannot find the file
I guess there is no easy way of validating DF except actioning it by show(1,0) etc and checking if it works?
Regards,
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Tue, 5 May 2020 at 16:41, Brandon Geise <br...@gmail.com> wrote:
You could use the Hadoop API and check if the file exists.
From: Mich Talebzadeh <mi...@gmail.com>
Date: Tuesday, May 5, 2020 at 11:25 AM
To: "user @spark" <us...@spark.apache.org>
Subject: Exception handling in Spark
Hi,
As I understand exception handling in Spark only makes sense if one attempts an action as opposed to lazy transformations?
Let us assume that I am reading an XML file from the HDFS directory and create a dataframe DF on it
val broadcastValue = "123456789" // I assume this will be sent as a constant for the batch
// Create a DF on top of XML
val df = spark.read.
format("com.databricks.spark.xml").
option("rootTag", "hierarchy").
option("rowTag", "sms_request").
load("/tmp/broadcast.xml")
val newDF = df.withColumn("broadcastid", lit(broadcastValue))
newDF.createOrReplaceTempView("tmp")
// Put data in Hive table
//
sqltext = """
INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastid="123456", brand)
SELECT
ocis_party_id AS partyId
, target_mobile_no AS phoneNumber
, brand
, broadcastid
FROM tmp
"""
//
// Here I am performing a collection
try {
spark.sql(sqltext)
} catch {
case e: SQLException => e.printStackTrace
sys.exit()
}
Now the issue I have is that what if the xml file /tmp/broadcast.xml does not exist or deleted? I won't be able to catch the error until the hive table is populated. Of course I can write a shell script to check if the file exist before running the job or put small collection like df.show(1,0). Are there more general alternatives?
Thanks
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Re: Exception handling in Spark
Posted by Brandon Geise <br...@gmail.com>.
Import scala.util.Try
Import scala.util.Success
Import scala.util.Failure
From: Mich Talebzadeh <mi...@gmail.com>
Date: Tuesday, May 5, 2020 at 6:11 PM
To: Brandon Geise <br...@gmail.com>
Cc: Todd Nist <ts...@gmail.com>, "user @spark" <us...@spark.apache.org>
Subject: Re: Exception handling in Spark
This is what I get
scala> val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
<console>:47: error: not found: value Try
val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
^
<console>:47: error: not found: value Success
val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
^
<console>:47: error: not found: value Failure
val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Tue, 5 May 2020 at 23:03, Brandon Geise <br...@gmail.com> wrote:
This is what I had in mind. Can you give this approach a try?
val df = Try(spark.read.csv("")) match {
case Success(df) => df
case Failure(e) => throw new Exception("foo")
}
From: Mich Talebzadeh <mi...@gmail.com>
Date: Tuesday, May 5, 2020 at 5:17 PM
To: Todd Nist <ts...@gmail.com>
Cc: Brandon Geise <br...@gmail.com>, "user @spark" <us...@spark.apache.org>
Subject: Re: Exception handling in Spark
I am trying this approach
val broadcastValue = "123456789" // I assume this will be sent as a constant for the batch
// Create a DF on top of XML
try {
val df = spark.read.
format("com.databricks.spark.xml").
option("rootTag", "hierarchy").
option("rowTag", "sms_request").
load("/tmp/broadcast.xml")
df
} catch {
case ex: FileNotFoundException => {
println (s"\nFile /tmp/broadcast.xml not found\n")
None
}
case unknown: Exception => {
println(s"\n Error encountered $unknown\n")
None
}
}
val newDF = df.withColumn("broadcastid", lit(broadcastValue))
But this does not work
scala> try {
| val df = spark.read.
| format("com.databricks.spark.xml").
| option("rootTag", "hierarchy").
| option("rowTag", "sms_request").
| load("/tmp/broadcast.xml")
| Some(df)
| } catch {
| case ex: FileNotFoundException => {
| println (s"\nFile /tmp/broadcast.xml not found\n")
| None
| }
| case unknown: Exception => {
| println(s"\n Error encountered $unknown\n")
| None
| }
| }
res6: Option[org.apache.spark.sql.DataFrame] = Some([brand: string, ocis_party_id: bigint ... 6 more fields])
scala>
scala> df.printSchema
<console>:48: error: not found: value df
df.printSchema
data frame seems to be lost!
Thanks,
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Tue, 5 May 2020 at 18:08, Mich Talebzadeh <mi...@gmail.com> wrote:
Thanks Todd. This is what I did before creating DF on top of that file
var exists = true
exists = xmlDirExists(broadcastStagingConfig.xmlFilePath)
if(!exists) {
println(s"\n Error: The xml file ${ broadcastStagingConfig.xmlFilePath} does not exist, aborting!\n")
sys.exit(1)
}
.
.
def xmlFileExists(hdfsDirectory: String): Boolean = {
val hadoopConf = new org.apache.hadoop.conf.Configuration()
val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))
}
And checked it. It works.
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Tue, 5 May 2020 at 17:54, Todd Nist <ts...@gmail.com> wrote:
Could you do something like this prior to calling the action.
// Create FileSystem object from Hadoop Configuration
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
// This methods returns Boolean (true - if file exists, false - if file doesn't exist
val fileExists = fs.exists(new Path("<parh_to_file>"))
if (fileExists) println("File exists!")
else println("File doesn't exist!")
Not sure that will help you or not, just a thought.
-Todd
On Tue, May 5, 2020 at 11:45 AM Mich Talebzadeh <mi...@gmail.com> wrote:
Thanks Brandon!
i should have remembered that.
basically the code gets out with sys.exit(1) if it cannot find the file
I guess there is no easy way of validating DF except actioning it by show(1,0) etc and checking if it works?
Regards,
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Tue, 5 May 2020 at 16:41, Brandon Geise <br...@gmail.com> wrote:
You could use the Hadoop API and check if the file exists.
From: Mich Talebzadeh <mi...@gmail.com>
Date: Tuesday, May 5, 2020 at 11:25 AM
To: "user @spark" <us...@spark.apache.org>
Subject: Exception handling in Spark
Hi,
As I understand exception handling in Spark only makes sense if one attempts an action as opposed to lazy transformations?
Let us assume that I am reading an XML file from the HDFS directory and create a dataframe DF on it
val broadcastValue = "123456789" // I assume this will be sent as a constant for the batch
// Create a DF on top of XML
val df = spark.read.
format("com.databricks.spark.xml").
option("rootTag", "hierarchy").
option("rowTag", "sms_request").
load("/tmp/broadcast.xml")
val newDF = df.withColumn("broadcastid", lit(broadcastValue))
newDF.createOrReplaceTempView("tmp")
// Put data in Hive table
//
sqltext = """
INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastid="123456", brand)
SELECT
ocis_party_id AS partyId
, target_mobile_no AS phoneNumber
, brand
, broadcastid
FROM tmp
"""
//
// Here I am performing a collection
try {
spark.sql(sqltext)
} catch {
case e: SQLException => e.printStackTrace
sys.exit()
}
Now the issue I have is that what if the xml file /tmp/broadcast.xml does not exist or deleted? I won't be able to catch the error until the hive table is populated. Of course I can write a shell script to check if the file exist before running the job or put small collection like df.show(1,0). Are there more general alternatives?
Thanks
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Re: Exception handling in Spark
Posted by Brandon Geise <br...@gmail.com>.
Sure, just do case Failure(e) => throw e
From: Mich Talebzadeh <mi...@gmail.com>
Date: Tuesday, May 5, 2020 at 6:36 PM
To: Brandon Geise <br...@gmail.com>
Cc: Todd Nist <ts...@gmail.com>, "user @spark" <us...@spark.apache.org>
Subject: Re: Exception handling in Spark
Hi Brandon.
In dealing with
df case Failure(e) => throw new Exception("foo")
Can one print the Exception message?
Thanks
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Tue, 5 May 2020 at 23:15, Mich Talebzadeh <mi...@gmail.com> wrote:
OK looking promising thanks
scala> import scala.util.{Try, Success, Failure}
import scala.util.{Try, Success, Failure}
scala> val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
df: org.apache.spark.sql.DataFrame = [brand: string, ocis_party_id: bigint ... 6 more fields]
regards,
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Tue, 5 May 2020 at 23:13, Brandon Geise <br...@gmail.com> wrote:
Match needs to be lower case “match”
From: Mich Talebzadeh <mi...@gmail.com>
Date: Tuesday, May 5, 2020 at 6:13 PM
To: Brandon Geise <br...@gmail.com>
Cc: Todd Nist <ts...@gmail.com>, "user @spark" <us...@spark.apache.org>
Subject: Re: Exception handling in Spark
scala> import scala.util.{Try, Success, Failure}
import scala.util.{Try, Success, Failure}
scala> val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
<console>:48: error: value Match is not a member of scala.util.Try[org.apache.spark.sql.DataFrame]
val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Tue, 5 May 2020 at 23:10, Mich Talebzadeh <mi...@gmail.com> wrote:
This is what I get
scala> val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
<console>:47: error: not found: value Try
val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
^
<console>:47: error: not found: value Success
val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
^
<console>:47: error: not found: value Failure
val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Tue, 5 May 2020 at 23:03, Brandon Geise <br...@gmail.com> wrote:
This is what I had in mind. Can you give this approach a try?
val df = Try(spark.read.csv("")) match {
case Success(df) => df
case Failure(e) => throw new Exception("foo")
}
From: Mich Talebzadeh <mi...@gmail.com>
Date: Tuesday, May 5, 2020 at 5:17 PM
To: Todd Nist <ts...@gmail.com>
Cc: Brandon Geise <br...@gmail.com>, "user @spark" <us...@spark.apache.org>
Subject: Re: Exception handling in Spark
I am trying this approach
val broadcastValue = "123456789" // I assume this will be sent as a constant for the batch
// Create a DF on top of XML
try {
val df = spark.read.
format("com.databricks.spark.xml").
option("rootTag", "hierarchy").
option("rowTag", "sms_request").
load("/tmp/broadcast.xml")
df
} catch {
case ex: FileNotFoundException => {
println (s"\nFile /tmp/broadcast.xml not found\n")
None
}
case unknown: Exception => {
println(s"\n Error encountered $unknown\n")
None
}
}
val newDF = df.withColumn("broadcastid", lit(broadcastValue))
But this does not work
scala> try {
| val df = spark.read.
| format("com.databricks.spark.xml").
| option("rootTag", "hierarchy").
| option("rowTag", "sms_request").
| load("/tmp/broadcast.xml")
| Some(df)
| } catch {
| case ex: FileNotFoundException => {
| println (s"\nFile /tmp/broadcast.xml not found\n")
| None
| }
| case unknown: Exception => {
| println(s"\n Error encountered $unknown\n")
| None
| }
| }
res6: Option[org.apache.spark.sql.DataFrame] = Some([brand: string, ocis_party_id: bigint ... 6 more fields])
scala>
scala> df.printSchema
<console>:48: error: not found: value df
df.printSchema
data frame seems to be lost!
Thanks,
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Tue, 5 May 2020 at 18:08, Mich Talebzadeh <mi...@gmail.com> wrote:
Thanks Todd. This is what I did before creating DF on top of that file
var exists = true
exists = xmlDirExists(broadcastStagingConfig.xmlFilePath)
if(!exists) {
println(s"\n Error: The xml file ${ broadcastStagingConfig.xmlFilePath} does not exist, aborting!\n")
sys.exit(1)
}
.
.
def xmlFileExists(hdfsDirectory: String): Boolean = {
val hadoopConf = new org.apache.hadoop.conf.Configuration()
val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))
}
And checked it. It works.
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Tue, 5 May 2020 at 17:54, Todd Nist <ts...@gmail.com> wrote:
Could you do something like this prior to calling the action.
// Create FileSystem object from Hadoop Configuration
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
// This methods returns Boolean (true - if file exists, false - if file doesn't exist
val fileExists = fs.exists(new Path("<parh_to_file>"))
if (fileExists) println("File exists!")
else println("File doesn't exist!")
Not sure that will help you or not, just a thought.
-Todd
On Tue, May 5, 2020 at 11:45 AM Mich Talebzadeh <mi...@gmail.com> wrote:
Thanks Brandon!
i should have remembered that.
basically the code gets out with sys.exit(1) if it cannot find the file
I guess there is no easy way of validating DF except actioning it by show(1,0) etc and checking if it works?
Regards,
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Tue, 5 May 2020 at 16:41, Brandon Geise <br...@gmail.com> wrote:
You could use the Hadoop API and check if the file exists.
From: Mich Talebzadeh <mi...@gmail.com>
Date: Tuesday, May 5, 2020 at 11:25 AM
To: "user @spark" <us...@spark.apache.org>
Subject: Exception handling in Spark
Hi,
As I understand exception handling in Spark only makes sense if one attempts an action as opposed to lazy transformations?
Let us assume that I am reading an XML file from the HDFS directory and create a dataframe DF on it
val broadcastValue = "123456789" // I assume this will be sent as a constant for the batch
// Create a DF on top of XML
val df = spark.read.
format("com.databricks.spark.xml").
option("rootTag", "hierarchy").
option("rowTag", "sms_request").
load("/tmp/broadcast.xml")
val newDF = df.withColumn("broadcastid", lit(broadcastValue))
newDF.createOrReplaceTempView("tmp")
// Put data in Hive table
//
sqltext = """
INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastid="123456", brand)
SELECT
ocis_party_id AS partyId
, target_mobile_no AS phoneNumber
, brand
, broadcastid
FROM tmp
"""
//
// Here I am performing a collection
try {
spark.sql(sqltext)
} catch {
case e: SQLException => e.printStackTrace
sys.exit()
}
Now the issue I have is that what if the xml file /tmp/broadcast.xml does not exist or deleted? I won't be able to catch the error until the hive table is populated. Of course I can write a shell script to check if the file exist before running the job or put small collection like df.show(1,0). Are there more general alternatives?
Thanks
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Re: Exception handling in Spark
Posted by Mich Talebzadeh <mi...@gmail.com>.
Hi Brandon.
In dealing with
df case Failure(e) => throw new Exception("foo")
Can one print the Exception message?
Thanks
Dr Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Tue, 5 May 2020 at 23:15, Mich Talebzadeh <mi...@gmail.com>
wrote:
> OK looking promising thanks
>
> scala> import scala.util.{Try, Success, Failure}
> import scala.util.{Try, Success, Failure}
>
> scala> val df =
> Try(spark.read.format("com.databricks.spark.xml").option("rootTag",
> "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml"))
> match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
> df: org.apache.spark.sql.DataFrame = [brand: string, ocis_party_id: bigint
> ... 6 more fields]
>
>
> regards,
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 5 May 2020 at 23:13, Brandon Geise <br...@gmail.com> wrote:
>
>> Match needs to be lower case “match”
>>
>>
>>
>> *From: *Mich Talebzadeh <mi...@gmail.com>
>> *Date: *Tuesday, May 5, 2020 at 6:13 PM
>> *To: *Brandon Geise <br...@gmail.com>
>> *Cc: *Todd Nist <ts...@gmail.com>, "user @spark" <
>> user@spark.apache.org>
>> *Subject: *Re: Exception handling in Spark
>>
>>
>>
>>
>> scala> import scala.util.{Try, Success, Failure}
>>
>> import scala.util.{Try, Success, Failure}
>>
>> scala> val df =
>> Try(spark.read.format("com.databricks.spark.xml").option("rootTag",
>> "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml"))
>> Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
>> <console>:48: error: value Match is not a member of
>> scala.util.Try[org.apache.spark.sql.DataFrame]
>> val df =
>> Try(spark.read.format("com.databricks.spark.xml").option("rootTag",
>> "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml"))
>> Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
>>
>>
>>
>> Mich Talebzadeh
>>
>>
>>
>> LinkedIn *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>>
>>
>>
>> On Tue, 5 May 2020 at 23:10, Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>> This is what I get
>>
>>
>>
>> scala> val df =
>> Try(spark.read.format("com.databricks.spark.xml").option("rootTag",
>> "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml"))
>> Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
>> <console>:47: error: not found: value Try
>> val df =
>> Try(spark.read.format("com.databricks.spark.xml").option("rootTag",
>> "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml"))
>> Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
>> ^
>> <console>:47: error: not found: value Success
>> val df =
>> Try(spark.read.format("com.databricks.spark.xml").option("rootTag",
>> "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml"))
>> Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
>>
>>
>> ^
>> <console>:47: error: not found: value Failure
>> val df =
>> Try(spark.read.format("com.databricks.spark.xml").option("rootTag",
>> "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml"))
>> Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>>
>>
>>
>> On Tue, 5 May 2020 at 23:03, Brandon Geise <br...@gmail.com>
>> wrote:
>>
>> This is what I had in mind. Can you give this approach a try?
>>
>>
>>
>> val df = Try(spark.read.csv("")) match {
>>
>> case Success(df) => df
>>
>> case Failure(e) => throw new Exception("foo")
>>
>> }
>>
>>
>>
>> *From: *Mich Talebzadeh <mi...@gmail.com>
>> *Date: *Tuesday, May 5, 2020 at 5:17 PM
>> *To: *Todd Nist <ts...@gmail.com>
>> *Cc: *Brandon Geise <br...@gmail.com>, "user @spark" <
>> user@spark.apache.org>
>> *Subject: *Re: Exception handling in Spark
>>
>>
>>
>> I am trying this approach
>>
>>
>>
>>
>> val broadcastValue = "123456789" // I assume this will be sent as a
>> constant for the batch
>> // Create a DF on top of XML
>> try {
>> val df = spark.read.
>> format("com.databricks.spark.xml").
>> option("rootTag", "hierarchy").
>> option("rowTag", "sms_request").
>> load("/tmp/broadcast.xml")
>> df
>> } catch {
>> case ex: FileNotFoundException => {
>> println (s"\nFile /tmp/broadcast.xml not found\n")
>> None
>> }
>> case unknown: Exception => {
>> println(s"\n Error encountered $unknown\n")
>> None
>> }
>> }
>>
>> val newDF = df.withColumn("broadcastid", lit(broadcastValue))
>>
>> But this does not work
>>
>>
>>
>> scala> try {
>> | val df = spark.read.
>> | format("com.databricks.spark.xml").
>> | option("rootTag", "hierarchy").
>> | option("rowTag", "sms_request").
>> | load("/tmp/broadcast.xml")
>> | Some(df)
>> | } catch {
>> | case ex: FileNotFoundException => {
>> | println (s"\nFile /tmp/broadcast.xml not found\n")
>> | None
>> | }
>> | case unknown: Exception => {
>> | println(s"\n Error encountered $unknown\n")
>> | None
>> | }
>> | }
>> res6: Option[org.apache.spark.sql.DataFrame] = Some([brand: string,
>> ocis_party_id: bigint ... 6 more fields])
>>
>> scala>
>>
>> scala> df.printSchema
>> <console>:48: error: not found: value df
>> df.printSchema
>>
>> data frame seems to be lost!
>>
>>
>>
>> Thanks,
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>>
>>
>>
>> On Tue, 5 May 2020 at 18:08, Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>> Thanks Todd. This is what I did before creating DF on top of that file
>>
>>
>>
>> var exists = true
>>
>> exists = xmlDirExists(broadcastStagingConfig.xmlFilePath)
>>
>> if(!exists) {
>>
>> println(s"\n Error: The xml file ${ broadcastStagingConfig.xmlFilePath}
>> does not exist, aborting!\n")
>>
>> sys.exit(1)
>>
>> }
>>
>> .
>>
>> .
>>
>> def xmlFileExists(hdfsDirectory: String): Boolean = {
>>
>> val hadoopConf = new org.apache.hadoop.conf.Configuration()
>>
>> val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
>>
>> fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))
>>
>> }
>>
>>
>>
>> And checked it. It works.
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>>
>>
>>
>> On Tue, 5 May 2020 at 17:54, Todd Nist <ts...@gmail.com> wrote:
>>
>> Could you do something like this prior to calling the action.
>>
>>
>>
>> // Create FileSystem object from Hadoop Configuration
>>
>> val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
>>
>> // This methods returns Boolean (true - if file exists, false - if file
>> doesn't exist
>>
>> val fileExists = fs.exists(new Path("<parh_to_file>"))
>>
>> *if* (fileExists) println("File exists!")
>>
>> *else* println("File doesn't exist!")
>>
>>
>>
>> Not sure that will help you or not, just a thought.
>>
>>
>>
>> -Todd
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Tue, May 5, 2020 at 11:45 AM Mich Talebzadeh <
>> mich.talebzadeh@gmail.com> wrote:
>>
>> Thanks Brandon!
>>
>>
>>
>> i should have remembered that.
>>
>>
>>
>> basically the code gets out with sys.exit(1) if it cannot find the file
>>
>>
>>
>> I guess there is no easy way of validating DF except actioning it by
>> show(1,0) etc and checking if it works?
>>
>>
>>
>> Regards,
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>>
>>
>>
>> On Tue, 5 May 2020 at 16:41, Brandon Geise <br...@gmail.com>
>> wrote:
>>
>> You could use the Hadoop API and check if the file exists.
>>
>>
>>
>> *From: *Mich Talebzadeh <mi...@gmail.com>
>> *Date: *Tuesday, May 5, 2020 at 11:25 AM
>> *To: *"user @spark" <us...@spark.apache.org>
>> *Subject: *Exception handling in Spark
>>
>>
>>
>> Hi,
>>
>>
>>
>> As I understand exception handling in Spark only makes sense if one
>> attempts an action as opposed to lazy transformations?
>>
>>
>>
>> Let us assume that I am reading an XML file from the HDFS directory and
>> create a dataframe DF on it
>>
>>
>>
>> val broadcastValue = "123456789" // I assume this will be sent as a
>> constant for the batch
>>
>> // Create a DF on top of XML
>> val df = spark.read.
>> format("com.databricks.spark.xml").
>> option("rootTag", "hierarchy").
>> option("rowTag", "sms_request").
>> load("/tmp/broadcast.xml")
>>
>> val newDF = df.withColumn("broadcastid", lit(broadcastValue))
>>
>> newDF.createOrReplaceTempView("tmp")
>>
>> // Put data in Hive table
>> //
>> sqltext = """
>> INSERT INTO TABLE michtest.BroadcastStaging PARTITION
>> (broadcastid="123456", brand)
>> SELECT
>> ocis_party_id AS partyId
>> , target_mobile_no AS phoneNumber
>> , brand
>> , broadcastid
>> FROM tmp
>> """
>> //
>>
>> // Here I am performing a collection
>>
>> try {
>>
>> spark.sql(sqltext)
>>
>> } catch {
>>
>> case e: SQLException => e.printStackTrace
>>
>> sys.exit()
>>
>> }
>>
>>
>>
>> Now the issue I have is that what if the xml file /tmp/broadcast.xml
>> does not exist or deleted? I won't be able to catch the error until the
>> hive table is populated. Of course I can write a shell script to check if
>> the file exist before running the job or put small collection like
>> df.show(1,0). Are there more general alternatives?
>>
>>
>>
>> Thanks
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
Re: Exception handling in Spark
Posted by Mich Talebzadeh <mi...@gmail.com>.
OK looking promising thanks
scala> import scala.util.{Try, Success, Failure}
import scala.util.{Try, Success, Failure}
scala> val df =
Try(spark.read.format("com.databricks.spark.xml").option("rootTag",
"hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml"))
match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
df: org.apache.spark.sql.DataFrame = [brand: string, ocis_party_id: bigint
... 6 more fields]
regards,
Dr Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Tue, 5 May 2020 at 23:13, Brandon Geise <br...@gmail.com> wrote:
> Match needs to be lower case “match”
>
>
>
> *From: *Mich Talebzadeh <mi...@gmail.com>
> *Date: *Tuesday, May 5, 2020 at 6:13 PM
> *To: *Brandon Geise <br...@gmail.com>
> *Cc: *Todd Nist <ts...@gmail.com>, "user @spark" <user@spark.apache.org
> >
> *Subject: *Re: Exception handling in Spark
>
>
>
>
> scala> import scala.util.{Try, Success, Failure}
>
> import scala.util.{Try, Success, Failure}
>
> scala> val df =
> Try(spark.read.format("com.databricks.spark.xml").option("rootTag",
> "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml"))
> Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
> <console>:48: error: value Match is not a member of
> scala.util.Try[org.apache.spark.sql.DataFrame]
> val df =
> Try(spark.read.format("com.databricks.spark.xml").option("rootTag",
> "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml"))
> Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
>
>
>
> Mich Talebzadeh
>
>
>
> LinkedIn *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Tue, 5 May 2020 at 23:10, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
> This is what I get
>
>
>
> scala> val df =
> Try(spark.read.format("com.databricks.spark.xml").option("rootTag",
> "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml"))
> Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
> <console>:47: error: not found: value Try
> val df =
> Try(spark.read.format("com.databricks.spark.xml").option("rootTag",
> "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml"))
> Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
> ^
> <console>:47: error: not found: value Success
> val df =
> Try(spark.read.format("com.databricks.spark.xml").option("rootTag",
> "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml"))
> Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
>
>
> ^
> <console>:47: error: not found: value Failure
> val df =
> Try(spark.read.format("com.databricks.spark.xml").option("rootTag",
> "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml"))
> Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Tue, 5 May 2020 at 23:03, Brandon Geise <br...@gmail.com> wrote:
>
> This is what I had in mind. Can you give this approach a try?
>
>
>
> val df = Try(spark.read.csv("")) match {
>
> case Success(df) => df
>
> case Failure(e) => throw new Exception("foo")
>
> }
>
>
>
> *From: *Mich Talebzadeh <mi...@gmail.com>
> *Date: *Tuesday, May 5, 2020 at 5:17 PM
> *To: *Todd Nist <ts...@gmail.com>
> *Cc: *Brandon Geise <br...@gmail.com>, "user @spark" <
> user@spark.apache.org>
> *Subject: *Re: Exception handling in Spark
>
>
>
> I am trying this approach
>
>
>
>
> val broadcastValue = "123456789" // I assume this will be sent as a
> constant for the batch
> // Create a DF on top of XML
> try {
> val df = spark.read.
> format("com.databricks.spark.xml").
> option("rootTag", "hierarchy").
> option("rowTag", "sms_request").
> load("/tmp/broadcast.xml")
> df
> } catch {
> case ex: FileNotFoundException => {
> println (s"\nFile /tmp/broadcast.xml not found\n")
> None
> }
> case unknown: Exception => {
> println(s"\n Error encountered $unknown\n")
> None
> }
> }
>
> val newDF = df.withColumn("broadcastid", lit(broadcastValue))
>
> But this does not work
>
>
>
> scala> try {
> | val df = spark.read.
> | format("com.databricks.spark.xml").
> | option("rootTag", "hierarchy").
> | option("rowTag", "sms_request").
> | load("/tmp/broadcast.xml")
> | Some(df)
> | } catch {
> | case ex: FileNotFoundException => {
> | println (s"\nFile /tmp/broadcast.xml not found\n")
> | None
> | }
> | case unknown: Exception => {
> | println(s"\n Error encountered $unknown\n")
> | None
> | }
> | }
> res6: Option[org.apache.spark.sql.DataFrame] = Some([brand: string,
> ocis_party_id: bigint ... 6 more fields])
>
> scala>
>
> scala> df.printSchema
> <console>:48: error: not found: value df
> df.printSchema
>
> data frame seems to be lost!
>
>
>
> Thanks,
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Tue, 5 May 2020 at 18:08, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
> Thanks Todd. This is what I did before creating DF on top of that file
>
>
>
> var exists = true
>
> exists = xmlDirExists(broadcastStagingConfig.xmlFilePath)
>
> if(!exists) {
>
> println(s"\n Error: The xml file ${ broadcastStagingConfig.xmlFilePath}
> does not exist, aborting!\n")
>
> sys.exit(1)
>
> }
>
> .
>
> .
>
> def xmlFileExists(hdfsDirectory: String): Boolean = {
>
> val hadoopConf = new org.apache.hadoop.conf.Configuration()
>
> val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
>
> fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))
>
> }
>
>
>
> And checked it. It works.
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Tue, 5 May 2020 at 17:54, Todd Nist <ts...@gmail.com> wrote:
>
> Could you do something like this prior to calling the action.
>
>
>
> // Create FileSystem object from Hadoop Configuration
>
> val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
>
> // This methods returns Boolean (true - if file exists, false - if file
> doesn't exist
>
> val fileExists = fs.exists(new Path("<parh_to_file>"))
>
> *if* (fileExists) println("File exists!")
>
> *else* println("File doesn't exist!")
>
>
>
> Not sure that will help you or not, just a thought.
>
>
>
> -Todd
>
>
>
>
>
>
>
>
>
> On Tue, May 5, 2020 at 11:45 AM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
> Thanks Brandon!
>
>
>
> i should have remembered that.
>
>
>
> basically the code gets out with sys.exit(1) if it cannot find the file
>
>
>
> I guess there is no easy way of validating DF except actioning it by
> show(1,0) etc and checking if it works?
>
>
>
> Regards,
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Tue, 5 May 2020 at 16:41, Brandon Geise <br...@gmail.com> wrote:
>
> You could use the Hadoop API and check if the file exists.
>
>
>
> *From: *Mich Talebzadeh <mi...@gmail.com>
> *Date: *Tuesday, May 5, 2020 at 11:25 AM
> *To: *"user @spark" <us...@spark.apache.org>
> *Subject: *Exception handling in Spark
>
>
>
> Hi,
>
>
>
> As I understand exception handling in Spark only makes sense if one
> attempts an action as opposed to lazy transformations?
>
>
>
> Let us assume that I am reading an XML file from the HDFS directory and
> create a dataframe DF on it
>
>
>
> val broadcastValue = "123456789" // I assume this will be sent as a
> constant for the batch
>
> // Create a DF on top of XML
> val df = spark.read.
> format("com.databricks.spark.xml").
> option("rootTag", "hierarchy").
> option("rowTag", "sms_request").
> load("/tmp/broadcast.xml")
>
> val newDF = df.withColumn("broadcastid", lit(broadcastValue))
>
> newDF.createOrReplaceTempView("tmp")
>
> // Put data in Hive table
> //
> sqltext = """
> INSERT INTO TABLE michtest.BroadcastStaging PARTITION
> (broadcastid="123456", brand)
> SELECT
> ocis_party_id AS partyId
> , target_mobile_no AS phoneNumber
> , brand
> , broadcastid
> FROM tmp
> """
> //
>
> // Here I am performing a collection
>
> try {
>
> spark.sql(sqltext)
>
> } catch {
>
> case e: SQLException => e.printStackTrace
>
> sys.exit()
>
> }
>
>
>
> Now the issue I have is that what if the xml file /tmp/broadcast.xml does
> not exist or deleted? I won't be able to catch the error until the hive
> table is populated. Of course I can write a shell script to check if the
> file exist before running the job or put small collection like
> df.show(1,0). Are there more general alternatives?
>
>
>
> Thanks
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
Re: Exception handling in Spark
Posted by Brandon Geise <br...@gmail.com>.
Match needs to be lower case “match”
From: Mich Talebzadeh <mi...@gmail.com>
Date: Tuesday, May 5, 2020 at 6:13 PM
To: Brandon Geise <br...@gmail.com>
Cc: Todd Nist <ts...@gmail.com>, "user @spark" <us...@spark.apache.org>
Subject: Re: Exception handling in Spark
scala> import scala.util.{Try, Success, Failure}
import scala.util.{Try, Success, Failure}
scala> val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
<console>:48: error: value Match is not a member of scala.util.Try[org.apache.spark.sql.DataFrame]
val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Tue, 5 May 2020 at 23:10, Mich Talebzadeh <mi...@gmail.com> wrote:
This is what I get
scala> val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
<console>:47: error: not found: value Try
val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
^
<console>:47: error: not found: value Success
val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
^
<console>:47: error: not found: value Failure
val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Tue, 5 May 2020 at 23:03, Brandon Geise <br...@gmail.com> wrote:
This is what I had in mind. Can you give this approach a try?
val df = Try(spark.read.csv("")) match {
case Success(df) => df
case Failure(e) => throw new Exception("foo")
}
From: Mich Talebzadeh <mi...@gmail.com>
Date: Tuesday, May 5, 2020 at 5:17 PM
To: Todd Nist <ts...@gmail.com>
Cc: Brandon Geise <br...@gmail.com>, "user @spark" <us...@spark.apache.org>
Subject: Re: Exception handling in Spark
I am trying this approach
val broadcastValue = "123456789" // I assume this will be sent as a constant for the batch
// Create a DF on top of XML
try {
val df = spark.read.
format("com.databricks.spark.xml").
option("rootTag", "hierarchy").
option("rowTag", "sms_request").
load("/tmp/broadcast.xml")
df
} catch {
case ex: FileNotFoundException => {
println (s"\nFile /tmp/broadcast.xml not found\n")
None
}
case unknown: Exception => {
println(s"\n Error encountered $unknown\n")
None
}
}
val newDF = df.withColumn("broadcastid", lit(broadcastValue))
But this does not work
scala> try {
| val df = spark.read.
| format("com.databricks.spark.xml").
| option("rootTag", "hierarchy").
| option("rowTag", "sms_request").
| load("/tmp/broadcast.xml")
| Some(df)
| } catch {
| case ex: FileNotFoundException => {
| println (s"\nFile /tmp/broadcast.xml not found\n")
| None
| }
| case unknown: Exception => {
| println(s"\n Error encountered $unknown\n")
| None
| }
| }
res6: Option[org.apache.spark.sql.DataFrame] = Some([brand: string, ocis_party_id: bigint ... 6 more fields])
scala>
scala> df.printSchema
<console>:48: error: not found: value df
df.printSchema
data frame seems to be lost!
Thanks,
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Tue, 5 May 2020 at 18:08, Mich Talebzadeh <mi...@gmail.com> wrote:
Thanks Todd. This is what I did before creating DF on top of that file
var exists = true
exists = xmlDirExists(broadcastStagingConfig.xmlFilePath)
if(!exists) {
println(s"\n Error: The xml file ${ broadcastStagingConfig.xmlFilePath} does not exist, aborting!\n")
sys.exit(1)
}
.
.
def xmlFileExists(hdfsDirectory: String): Boolean = {
val hadoopConf = new org.apache.hadoop.conf.Configuration()
val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))
}
And checked it. It works.
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Tue, 5 May 2020 at 17:54, Todd Nist <ts...@gmail.com> wrote:
Could you do something like this prior to calling the action.
// Create FileSystem object from Hadoop Configuration
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
// This methods returns Boolean (true - if file exists, false - if file doesn't exist
val fileExists = fs.exists(new Path("<parh_to_file>"))
if (fileExists) println("File exists!")
else println("File doesn't exist!")
Not sure that will help you or not, just a thought.
-Todd
On Tue, May 5, 2020 at 11:45 AM Mich Talebzadeh <mi...@gmail.com> wrote:
Thanks Brandon!
i should have remembered that.
basically the code gets out with sys.exit(1) if it cannot find the file
I guess there is no easy way of validating DF except actioning it by show(1,0) etc and checking if it works?
Regards,
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Tue, 5 May 2020 at 16:41, Brandon Geise <br...@gmail.com> wrote:
You could use the Hadoop API and check if the file exists.
From: Mich Talebzadeh <mi...@gmail.com>
Date: Tuesday, May 5, 2020 at 11:25 AM
To: "user @spark" <us...@spark.apache.org>
Subject: Exception handling in Spark
Hi,
As I understand exception handling in Spark only makes sense if one attempts an action as opposed to lazy transformations?
Let us assume that I am reading an XML file from the HDFS directory and create a dataframe DF on it
val broadcastValue = "123456789" // I assume this will be sent as a constant for the batch
// Create a DF on top of XML
val df = spark.read.
format("com.databricks.spark.xml").
option("rootTag", "hierarchy").
option("rowTag", "sms_request").
load("/tmp/broadcast.xml")
val newDF = df.withColumn("broadcastid", lit(broadcastValue))
newDF.createOrReplaceTempView("tmp")
// Put data in Hive table
//
sqltext = """
INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastid="123456", brand)
SELECT
ocis_party_id AS partyId
, target_mobile_no AS phoneNumber
, brand
, broadcastid
FROM tmp
"""
//
// Here I am performing a collection
try {
spark.sql(sqltext)
} catch {
case e: SQLException => e.printStackTrace
sys.exit()
}
Now the issue I have is that what if the xml file /tmp/broadcast.xml does not exist or deleted? I won't be able to catch the error until the hive table is populated. Of course I can write a shell script to check if the file exist before running the job or put small collection like df.show(1,0). Are there more general alternatives?
Thanks
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Re: Exception handling in Spark
Posted by Mich Talebzadeh <mi...@gmail.com>.
scala> import scala.util.{Try, Success, Failure}
import scala.util.{Try, Success, Failure}
scala> val df =
Try(spark.read.format("com.databricks.spark.xml").option("rootTag",
"hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml"))
Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
<console>:48: error: value Match is not a member of
scala.util.Try[org.apache.spark.sql.DataFrame]
val df =
Try(spark.read.format("com.databricks.spark.xml").option("rootTag",
"hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml"))
Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Tue, 5 May 2020 at 23:10, Mich Talebzadeh <mi...@gmail.com>
wrote:
> This is what I get
>
> scala> val df =
> Try(spark.read.format("com.databricks.spark.xml").option("rootTag",
> "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml"))
> Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
> <console>:47: error: not found: value Try
> val df =
> Try(spark.read.format("com.databricks.spark.xml").option("rootTag",
> "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml"))
> Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
> ^
> <console>:47: error: not found: value Success
> val df =
> Try(spark.read.format("com.databricks.spark.xml").option("rootTag",
> "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml"))
> Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
>
>
> ^
> <console>:47: error: not found: value Failure
> val df =
> Try(spark.read.format("com.databricks.spark.xml").option("rootTag",
> "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml"))
> Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 5 May 2020 at 23:03, Brandon Geise <br...@gmail.com> wrote:
>
>> This is what I had in mind. Can you give this approach a try?
>>
>>
>>
>> val df = Try(spark.read.csv("")) match {
>>
>> case Success(df) => df
>>
>> case Failure(e) => throw new Exception("foo")
>>
>> }
>>
>>
>>
>> *From: *Mich Talebzadeh <mi...@gmail.com>
>> *Date: *Tuesday, May 5, 2020 at 5:17 PM
>> *To: *Todd Nist <ts...@gmail.com>
>> *Cc: *Brandon Geise <br...@gmail.com>, "user @spark" <
>> user@spark.apache.org>
>> *Subject: *Re: Exception handling in Spark
>>
>>
>>
>> I am trying this approach
>>
>>
>>
>>
>> val broadcastValue = "123456789" // I assume this will be sent as a
>> constant for the batch
>> // Create a DF on top of XML
>> try {
>> val df = spark.read.
>> format("com.databricks.spark.xml").
>> option("rootTag", "hierarchy").
>> option("rowTag", "sms_request").
>> load("/tmp/broadcast.xml")
>> df
>> } catch {
>> case ex: FileNotFoundException => {
>> println (s"\nFile /tmp/broadcast.xml not found\n")
>> None
>> }
>> case unknown: Exception => {
>> println(s"\n Error encountered $unknown\n")
>> None
>> }
>> }
>>
>> val newDF = df.withColumn("broadcastid", lit(broadcastValue))
>>
>> But this does not work
>>
>>
>>
>> scala> try {
>> | val df = spark.read.
>> | format("com.databricks.spark.xml").
>> | option("rootTag", "hierarchy").
>> | option("rowTag", "sms_request").
>> | load("/tmp/broadcast.xml")
>> | Some(df)
>> | } catch {
>> | case ex: FileNotFoundException => {
>> | println (s"\nFile /tmp/broadcast.xml not found\n")
>> | None
>> | }
>> | case unknown: Exception => {
>> | println(s"\n Error encountered $unknown\n")
>> | None
>> | }
>> | }
>> res6: Option[org.apache.spark.sql.DataFrame] = Some([brand: string,
>> ocis_party_id: bigint ... 6 more fields])
>>
>> scala>
>>
>> scala> df.printSchema
>> <console>:48: error: not found: value df
>> df.printSchema
>>
>> data frame seems to be lost!
>>
>>
>>
>> Thanks,
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>>
>>
>>
>> On Tue, 5 May 2020 at 18:08, Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>> Thanks Todd. This is what I did before creating DF on top of that file
>>
>>
>>
>> var exists = true
>>
>> exists = xmlDirExists(broadcastStagingConfig.xmlFilePath)
>>
>> if(!exists) {
>>
>> println(s"\n Error: The xml file ${ broadcastStagingConfig.xmlFilePath}
>> does not exist, aborting!\n")
>>
>> sys.exit(1)
>>
>> }
>>
>> .
>>
>> .
>>
>> def xmlFileExists(hdfsDirectory: String): Boolean = {
>>
>> val hadoopConf = new org.apache.hadoop.conf.Configuration()
>>
>> val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
>>
>> fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))
>>
>> }
>>
>>
>>
>> And checked it. It works.
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>>
>>
>>
>> On Tue, 5 May 2020 at 17:54, Todd Nist <ts...@gmail.com> wrote:
>>
>> Could you do something like this prior to calling the action.
>>
>>
>>
>> // Create FileSystem object from Hadoop Configuration
>>
>> val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
>>
>> // This methods returns Boolean (true - if file exists, false - if file
>> doesn't exist
>>
>> val fileExists = fs.exists(new Path("<parh_to_file>"))
>>
>> *if* (fileExists) println("File exists!")
>>
>> *else* println("File doesn't exist!")
>>
>>
>>
>> Not sure that will help you or not, just a thought.
>>
>>
>>
>> -Todd
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Tue, May 5, 2020 at 11:45 AM Mich Talebzadeh <
>> mich.talebzadeh@gmail.com> wrote:
>>
>> Thanks Brandon!
>>
>>
>>
>> i should have remembered that.
>>
>>
>>
>> basically the code gets out with sys.exit(1) if it cannot find the file
>>
>>
>>
>> I guess there is no easy way of validating DF except actioning it by
>> show(1,0) etc and checking if it works?
>>
>>
>>
>> Regards,
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>>
>>
>>
>> On Tue, 5 May 2020 at 16:41, Brandon Geise <br...@gmail.com>
>> wrote:
>>
>> You could use the Hadoop API and check if the file exists.
>>
>>
>>
>> *From: *Mich Talebzadeh <mi...@gmail.com>
>> *Date: *Tuesday, May 5, 2020 at 11:25 AM
>> *To: *"user @spark" <us...@spark.apache.org>
>> *Subject: *Exception handling in Spark
>>
>>
>>
>> Hi,
>>
>>
>>
>> As I understand exception handling in Spark only makes sense if one
>> attempts an action as opposed to lazy transformations?
>>
>>
>>
>> Let us assume that I am reading an XML file from the HDFS directory and
>> create a dataframe DF on it
>>
>>
>>
>> val broadcastValue = "123456789" // I assume this will be sent as a
>> constant for the batch
>>
>> // Create a DF on top of XML
>> val df = spark.read.
>> format("com.databricks.spark.xml").
>> option("rootTag", "hierarchy").
>> option("rowTag", "sms_request").
>> load("/tmp/broadcast.xml")
>>
>> val newDF = df.withColumn("broadcastid", lit(broadcastValue))
>>
>> newDF.createOrReplaceTempView("tmp")
>>
>> // Put data in Hive table
>> //
>> sqltext = """
>> INSERT INTO TABLE michtest.BroadcastStaging PARTITION
>> (broadcastid="123456", brand)
>> SELECT
>> ocis_party_id AS partyId
>> , target_mobile_no AS phoneNumber
>> , brand
>> , broadcastid
>> FROM tmp
>> """
>> //
>>
>> // Here I am performing a collection
>>
>> try {
>>
>> spark.sql(sqltext)
>>
>> } catch {
>>
>> case e: SQLException => e.printStackTrace
>>
>> sys.exit()
>>
>> }
>>
>>
>>
>> Now the issue I have is that what if the xml file /tmp/broadcast.xml
>> does not exist or deleted? I won't be able to catch the error until the
>> hive table is populated. Of course I can write a shell script to check if
>> the file exist before running the job or put small collection like
>> df.show(1,0). Are there more general alternatives?
>>
>>
>>
>> Thanks
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
Re: Exception handling in Spark
Posted by Mich Talebzadeh <mi...@gmail.com>.
This is what I get
scala> val df =
Try(spark.read.format("com.databricks.spark.xml").option("rootTag",
"hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml"))
Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
<console>:47: error: not found: value Try
val df =
Try(spark.read.format("com.databricks.spark.xml").option("rootTag",
"hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml"))
Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
^
<console>:47: error: not found: value Success
val df =
Try(spark.read.format("com.databricks.spark.xml").option("rootTag",
"hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml"))
Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
^
<console>:47: error: not found: value Failure
val df =
Try(spark.read.format("com.databricks.spark.xml").option("rootTag",
"hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml"))
Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
Dr Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Tue, 5 May 2020 at 23:03, Brandon Geise <br...@gmail.com> wrote:
> This is what I had in mind. Can you give this approach a try?
>
>
>
> val df = Try(spark.read.csv("")) match {
>
> case Success(df) => df
>
> case Failure(e) => throw new Exception("foo")
>
> }
>
>
>
> *From: *Mich Talebzadeh <mi...@gmail.com>
> *Date: *Tuesday, May 5, 2020 at 5:17 PM
> *To: *Todd Nist <ts...@gmail.com>
> *Cc: *Brandon Geise <br...@gmail.com>, "user @spark" <
> user@spark.apache.org>
> *Subject: *Re: Exception handling in Spark
>
>
>
> I am trying this approach
>
>
>
>
> val broadcastValue = "123456789" // I assume this will be sent as a
> constant for the batch
> // Create a DF on top of XML
> try {
> val df = spark.read.
> format("com.databricks.spark.xml").
> option("rootTag", "hierarchy").
> option("rowTag", "sms_request").
> load("/tmp/broadcast.xml")
> df
> } catch {
> case ex: FileNotFoundException => {
> println (s"\nFile /tmp/broadcast.xml not found\n")
> None
> }
> case unknown: Exception => {
> println(s"\n Error encountered $unknown\n")
> None
> }
> }
>
> val newDF = df.withColumn("broadcastid", lit(broadcastValue))
>
> But this does not work
>
>
>
> scala> try {
> | val df = spark.read.
> | format("com.databricks.spark.xml").
> | option("rootTag", "hierarchy").
> | option("rowTag", "sms_request").
> | load("/tmp/broadcast.xml")
> | Some(df)
> | } catch {
> | case ex: FileNotFoundException => {
> | println (s"\nFile /tmp/broadcast.xml not found\n")
> | None
> | }
> | case unknown: Exception => {
> | println(s"\n Error encountered $unknown\n")
> | None
> | }
> | }
> res6: Option[org.apache.spark.sql.DataFrame] = Some([brand: string,
> ocis_party_id: bigint ... 6 more fields])
>
> scala>
>
> scala> df.printSchema
> <console>:48: error: not found: value df
> df.printSchema
>
> data frame seems to be lost!
>
>
>
> Thanks,
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Tue, 5 May 2020 at 18:08, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
> Thanks Todd. This is what I did before creating DF on top of that file
>
>
>
> var exists = true
>
> exists = xmlDirExists(broadcastStagingConfig.xmlFilePath)
>
> if(!exists) {
>
> println(s"\n Error: The xml file ${ broadcastStagingConfig.xmlFilePath}
> does not exist, aborting!\n")
>
> sys.exit(1)
>
> }
>
> .
>
> .
>
> def xmlFileExists(hdfsDirectory: String): Boolean = {
>
> val hadoopConf = new org.apache.hadoop.conf.Configuration()
>
> val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
>
> fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))
>
> }
>
>
>
> And checked it. It works.
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Tue, 5 May 2020 at 17:54, Todd Nist <ts...@gmail.com> wrote:
>
> Could you do something like this prior to calling the action.
>
>
>
> // Create FileSystem object from Hadoop Configuration
>
> val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
>
> // This methods returns Boolean (true - if file exists, false - if file
> doesn't exist
>
> val fileExists = fs.exists(new Path("<parh_to_file>"))
>
> *if* (fileExists) println("File exists!")
>
> *else* println("File doesn't exist!")
>
>
>
> Not sure that will help you or not, just a thought.
>
>
>
> -Todd
>
>
>
>
>
>
>
>
>
> On Tue, May 5, 2020 at 11:45 AM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
> Thanks Brandon!
>
>
>
> i should have remembered that.
>
>
>
> basically the code gets out with sys.exit(1) if it cannot find the file
>
>
>
> I guess there is no easy way of validating DF except actioning it by
> show(1,0) etc and checking if it works?
>
>
>
> Regards,
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Tue, 5 May 2020 at 16:41, Brandon Geise <br...@gmail.com> wrote:
>
> You could use the Hadoop API and check if the file exists.
>
>
>
> *From: *Mich Talebzadeh <mi...@gmail.com>
> *Date: *Tuesday, May 5, 2020 at 11:25 AM
> *To: *"user @spark" <us...@spark.apache.org>
> *Subject: *Exception handling in Spark
>
>
>
> Hi,
>
>
>
> As I understand exception handling in Spark only makes sense if one
> attempts an action as opposed to lazy transformations?
>
>
>
> Let us assume that I am reading an XML file from the HDFS directory and
> create a dataframe DF on it
>
>
>
> val broadcastValue = "123456789" // I assume this will be sent as a
> constant for the batch
>
> // Create a DF on top of XML
> val df = spark.read.
> format("com.databricks.spark.xml").
> option("rootTag", "hierarchy").
> option("rowTag", "sms_request").
> load("/tmp/broadcast.xml")
>
> val newDF = df.withColumn("broadcastid", lit(broadcastValue))
>
> newDF.createOrReplaceTempView("tmp")
>
> // Put data in Hive table
> //
> sqltext = """
> INSERT INTO TABLE michtest.BroadcastStaging PARTITION
> (broadcastid="123456", brand)
> SELECT
> ocis_party_id AS partyId
> , target_mobile_no AS phoneNumber
> , brand
> , broadcastid
> FROM tmp
> """
> //
>
> // Here I am performing a collection
>
> try {
>
> spark.sql(sqltext)
>
> } catch {
>
> case e: SQLException => e.printStackTrace
>
> sys.exit()
>
> }
>
>
>
> Now the issue I have is that what if the xml file /tmp/broadcast.xml does
> not exist or deleted? I won't be able to catch the error until the hive
> table is populated. Of course I can write a shell script to check if the
> file exist before running the job or put small collection like
> df.show(1,0). Are there more general alternatives?
>
>
>
> Thanks
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
Re: Exception handling in Spark
Posted by Brandon Geise <br...@gmail.com>.
This is what I had in mind. Can you give this approach a try?
val df = Try(spark.read.csv("")) match {
case Success(df) => df
case Failure(e) => throw new Exception("foo")
}
From: Mich Talebzadeh <mi...@gmail.com>
Date: Tuesday, May 5, 2020 at 5:17 PM
To: Todd Nist <ts...@gmail.com>
Cc: Brandon Geise <br...@gmail.com>, "user @spark" <us...@spark.apache.org>
Subject: Re: Exception handling in Spark
I am trying this approach
val broadcastValue = "123456789" // I assume this will be sent as a constant for the batch
// Create a DF on top of XML
try {
val df = spark.read.
format("com.databricks.spark.xml").
option("rootTag", "hierarchy").
option("rowTag", "sms_request").
load("/tmp/broadcast.xml")
df
} catch {
case ex: FileNotFoundException => {
println (s"\nFile /tmp/broadcast.xml not found\n")
None
}
case unknown: Exception => {
println(s"\n Error encountered $unknown\n")
None
}
}
val newDF = df.withColumn("broadcastid", lit(broadcastValue))
But this does not work
scala> try {
| val df = spark.read.
| format("com.databricks.spark.xml").
| option("rootTag", "hierarchy").
| option("rowTag", "sms_request").
| load("/tmp/broadcast.xml")
| Some(df)
| } catch {
| case ex: FileNotFoundException => {
| println (s"\nFile /tmp/broadcast.xml not found\n")
| None
| }
| case unknown: Exception => {
| println(s"\n Error encountered $unknown\n")
| None
| }
| }
res6: Option[org.apache.spark.sql.DataFrame] = Some([brand: string, ocis_party_id: bigint ... 6 more fields])
scala>
scala> df.printSchema
<console>:48: error: not found: value df
df.printSchema
data frame seems to be lost!
Thanks,
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Tue, 5 May 2020 at 18:08, Mich Talebzadeh <mi...@gmail.com> wrote:
Thanks Todd. This is what I did before creating DF on top of that file
var exists = true
exists = xmlDirExists(broadcastStagingConfig.xmlFilePath)
if(!exists) {
println(s"\n Error: The xml file ${ broadcastStagingConfig.xmlFilePath} does not exist, aborting!\n")
sys.exit(1)
}
.
.
def xmlFileExists(hdfsDirectory: String): Boolean = {
val hadoopConf = new org.apache.hadoop.conf.Configuration()
val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))
}
And checked it. It works.
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Tue, 5 May 2020 at 17:54, Todd Nist <ts...@gmail.com> wrote:
Could you do something like this prior to calling the action.
// Create FileSystem object from Hadoop Configuration
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
// This methods returns Boolean (true - if file exists, false - if file doesn't exist
val fileExists = fs.exists(new Path("<parh_to_file>"))
if (fileExists) println("File exists!")
else println("File doesn't exist!")
Not sure that will help you or not, just a thought.
-Todd
On Tue, May 5, 2020 at 11:45 AM Mich Talebzadeh <mi...@gmail.com> wrote:
Thanks Brandon!
i should have remembered that.
basically the code gets out with sys.exit(1) if it cannot find the file
I guess there is no easy way of validating DF except actioning it by show(1,0) etc and checking if it works?
Regards,
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Tue, 5 May 2020 at 16:41, Brandon Geise <br...@gmail.com> wrote:
You could use the Hadoop API and check if the file exists.
From: Mich Talebzadeh <mi...@gmail.com>
Date: Tuesday, May 5, 2020 at 11:25 AM
To: "user @spark" <us...@spark.apache.org>
Subject: Exception handling in Spark
Hi,
As I understand exception handling in Spark only makes sense if one attempts an action as opposed to lazy transformations?
Let us assume that I am reading an XML file from the HDFS directory and create a dataframe DF on it
val broadcastValue = "123456789" // I assume this will be sent as a constant for the batch
// Create a DF on top of XML
val df = spark.read.
format("com.databricks.spark.xml").
option("rootTag", "hierarchy").
option("rowTag", "sms_request").
load("/tmp/broadcast.xml")
val newDF = df.withColumn("broadcastid", lit(broadcastValue))
newDF.createOrReplaceTempView("tmp")
// Put data in Hive table
//
sqltext = """
INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastid="123456", brand)
SELECT
ocis_party_id AS partyId
, target_mobile_no AS phoneNumber
, brand
, broadcastid
FROM tmp
"""
//
// Here I am performing a collection
try {
spark.sql(sqltext)
} catch {
case e: SQLException => e.printStackTrace
sys.exit()
}
Now the issue I have is that what if the xml file /tmp/broadcast.xml does not exist or deleted? I won't be able to catch the error until the hive table is populated. Of course I can write a shell script to check if the file exist before running the job or put small collection like df.show(1,0). Are there more general alternatives?
Thanks
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Re: Exception handling in Spark
Posted by Mich Talebzadeh <mi...@gmail.com>.
I am trying this approach
val broadcastValue = "123456789" // I assume this will be sent as a
constant for the batch
// Create a DF on top of XML
try {
val df = spark.read.
format("com.databricks.spark.xml").
option("rootTag", "hierarchy").
option("rowTag", "sms_request").
load("/tmp/broadcast.xml")
df
} catch {
case ex: FileNotFoundException => {
println (s"\nFile /tmp/broadcast.xml not found\n")
None
}
case unknown: Exception => {
println(s"\n Error encountered $unknown\n")
None
}
}
val newDF = df.withColumn("broadcastid", lit(broadcastValue))
But this does not work
scala> try {
| val df = spark.read.
| format("com.databricks.spark.xml").
| option("rootTag", "hierarchy").
| option("rowTag", "sms_request").
| load("/tmp/broadcast.xml")
| Some(df)
| } catch {
| case ex: FileNotFoundException => {
| println (s"\nFile /tmp/broadcast.xml not found\n")
| None
| }
| case unknown: Exception => {
| println(s"\n Error encountered $unknown\n")
| None
| }
| }
res6: Option[org.apache.spark.sql.DataFrame] = Some([brand: string,
ocis_party_id: bigint ... 6 more fields])
scala>
scala> df.printSchema
<console>:48: error: not found: value df
df.printSchema
data frame seems to be lost!
Thanks,
Dr Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Tue, 5 May 2020 at 18:08, Mich Talebzadeh <mi...@gmail.com>
wrote:
> Thanks Todd. This is what I did before creating DF on top of that file
>
> var exists = true
> exists = xmlDirExists(broadcastStagingConfig.xmlFilePath)
> if(!exists) {
> println(s"\n Error: The xml file ${ broadcastStagingConfig.xmlFilePath}
> does not exist, aborting!\n")
> sys.exit(1)
> }
> .
> .
> def xmlFileExists(hdfsDirectory: String): Boolean = {
> val hadoopConf = new org.apache.hadoop.conf.Configuration()
> val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
> fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))
> }
>
> And checked it. It works.
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 5 May 2020 at 17:54, Todd Nist <ts...@gmail.com> wrote:
>
>> Could you do something like this prior to calling the action.
>>
>> // Create FileSystem object from Hadoop Configuration
>> val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
>> // This methods returns Boolean (true - if file exists, false - if file
>> doesn't exist
>> val fileExists = fs.exists(new Path("<parh_to_file>"))
>> if (fileExists) println("File exists!")
>> else println("File doesn't exist!")
>>
>> Not sure that will help you or not, just a thought.
>>
>> -Todd
>>
>>
>>
>>
>> On Tue, May 5, 2020 at 11:45 AM Mich Talebzadeh <
>> mich.talebzadeh@gmail.com> wrote:
>>
>>> Thanks Brandon!
>>>
>>> i should have remembered that.
>>>
>>> basically the code gets out with sys.exit(1) if it cannot find the file
>>>
>>> I guess there is no easy way of validating DF except actioning it by
>>> show(1,0) etc and checking if it works?
>>>
>>> Regards,
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Tue, 5 May 2020 at 16:41, Brandon Geise <br...@gmail.com>
>>> wrote:
>>>
>>>> You could use the Hadoop API and check if the file exists.
>>>>
>>>>
>>>>
>>>> *From: *Mich Talebzadeh <mi...@gmail.com>
>>>> *Date: *Tuesday, May 5, 2020 at 11:25 AM
>>>> *To: *"user @spark" <us...@spark.apache.org>
>>>> *Subject: *Exception handling in Spark
>>>>
>>>>
>>>>
>>>> Hi,
>>>>
>>>>
>>>>
>>>> As I understand exception handling in Spark only makes sense if one
>>>> attempts an action as opposed to lazy transformations?
>>>>
>>>>
>>>>
>>>> Let us assume that I am reading an XML file from the HDFS directory
>>>> and create a dataframe DF on it
>>>>
>>>>
>>>>
>>>> val broadcastValue = "123456789" // I assume this will be sent as a
>>>> constant for the batch
>>>>
>>>> // Create a DF on top of XML
>>>> val df = spark.read.
>>>> format("com.databricks.spark.xml").
>>>> option("rootTag", "hierarchy").
>>>> option("rowTag", "sms_request").
>>>> load("/tmp/broadcast.xml")
>>>>
>>>> val newDF = df.withColumn("broadcastid", lit(broadcastValue))
>>>>
>>>> newDF.createOrReplaceTempView("tmp")
>>>>
>>>> // Put data in Hive table
>>>> //
>>>> sqltext = """
>>>> INSERT INTO TABLE michtest.BroadcastStaging PARTITION
>>>> (broadcastid="123456", brand)
>>>> SELECT
>>>> ocis_party_id AS partyId
>>>> , target_mobile_no AS phoneNumber
>>>> , brand
>>>> , broadcastid
>>>> FROM tmp
>>>> """
>>>> //
>>>>
>>>> // Here I am performing a collection
>>>>
>>>> try {
>>>>
>>>> spark.sql(sqltext)
>>>>
>>>> } catch {
>>>>
>>>> case e: SQLException => e.printStackTrace
>>>>
>>>> sys.exit()
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>> Now the issue I have is that what if the xml file /tmp/broadcast.xml
>>>> does not exist or deleted? I won't be able to catch the error until the
>>>> hive table is populated. Of course I can write a shell script to check if
>>>> the file exist before running the job or put small collection like
>>>> df.show(1,0). Are there more general alternatives?
>>>>
>>>>
>>>>
>>>> Thanks
>>>>
>>>>
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>
Re: Exception handling in Spark
Posted by Mich Talebzadeh <mi...@gmail.com>.
Thanks Todd. This is what I did before creating DF on top of that file
var exists = true
exists = xmlDirExists(broadcastStagingConfig.xmlFilePath)
if(!exists) {
println(s"\n Error: The xml file ${ broadcastStagingConfig.xmlFilePath}
does not exist, aborting!\n")
sys.exit(1)
}
.
.
def xmlFileExists(hdfsDirectory: String): Boolean = {
val hadoopConf = new org.apache.hadoop.conf.Configuration()
val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))
}
And checked it. It works.
Dr Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Tue, 5 May 2020 at 17:54, Todd Nist <ts...@gmail.com> wrote:
> Could you do something like this prior to calling the action.
>
> // Create FileSystem object from Hadoop Configuration
> val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
> // This methods returns Boolean (true - if file exists, false - if file
> doesn't exist
> val fileExists = fs.exists(new Path("<parh_to_file>"))
> if (fileExists) println("File exists!")
> else println("File doesn't exist!")
>
> Not sure that will help you or not, just a thought.
>
> -Todd
>
>
>
>
> On Tue, May 5, 2020 at 11:45 AM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Thanks Brandon!
>>
>> i should have remembered that.
>>
>> basically the code gets out with sys.exit(1) if it cannot find the file
>>
>> I guess there is no easy way of validating DF except actioning it by
>> show(1,0) etc and checking if it works?
>>
>> Regards,
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 5 May 2020 at 16:41, Brandon Geise <br...@gmail.com>
>> wrote:
>>
>>> You could use the Hadoop API and check if the file exists.
>>>
>>>
>>>
>>> *From: *Mich Talebzadeh <mi...@gmail.com>
>>> *Date: *Tuesday, May 5, 2020 at 11:25 AM
>>> *To: *"user @spark" <us...@spark.apache.org>
>>> *Subject: *Exception handling in Spark
>>>
>>>
>>>
>>> Hi,
>>>
>>>
>>>
>>> As I understand exception handling in Spark only makes sense if one
>>> attempts an action as opposed to lazy transformations?
>>>
>>>
>>>
>>> Let us assume that I am reading an XML file from the HDFS directory and
>>> create a dataframe DF on it
>>>
>>>
>>>
>>> val broadcastValue = "123456789" // I assume this will be sent as a
>>> constant for the batch
>>>
>>> // Create a DF on top of XML
>>> val df = spark.read.
>>> format("com.databricks.spark.xml").
>>> option("rootTag", "hierarchy").
>>> option("rowTag", "sms_request").
>>> load("/tmp/broadcast.xml")
>>>
>>> val newDF = df.withColumn("broadcastid", lit(broadcastValue))
>>>
>>> newDF.createOrReplaceTempView("tmp")
>>>
>>> // Put data in Hive table
>>> //
>>> sqltext = """
>>> INSERT INTO TABLE michtest.BroadcastStaging PARTITION
>>> (broadcastid="123456", brand)
>>> SELECT
>>> ocis_party_id AS partyId
>>> , target_mobile_no AS phoneNumber
>>> , brand
>>> , broadcastid
>>> FROM tmp
>>> """
>>> //
>>>
>>> // Here I am performing a collection
>>>
>>> try {
>>>
>>> spark.sql(sqltext)
>>>
>>> } catch {
>>>
>>> case e: SQLException => e.printStackTrace
>>>
>>> sys.exit()
>>>
>>> }
>>>
>>>
>>>
>>> Now the issue I have is that what if the xml file /tmp/broadcast.xml
>>> does not exist or deleted? I won't be able to catch the error until the
>>> hive table is populated. Of course I can write a shell script to check if
>>> the file exist before running the job or put small collection like
>>> df.show(1,0). Are there more general alternatives?
>>>
>>>
>>>
>>> Thanks
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>
Re: Exception handling in Spark
Posted by Todd Nist <ts...@gmail.com>.
Could you do something like this prior to calling the action.
// Create FileSystem object from Hadoop Configuration
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
// This methods returns Boolean (true - if file exists, false - if file
doesn't exist
val fileExists = fs.exists(new Path("<parh_to_file>"))
if (fileExists) println("File exists!")
else println("File doesn't exist!")
Not sure that will help you or not, just a thought.
-Todd
On Tue, May 5, 2020 at 11:45 AM Mich Talebzadeh <mi...@gmail.com>
wrote:
> Thanks Brandon!
>
> i should have remembered that.
>
> basically the code gets out with sys.exit(1) if it cannot find the file
>
> I guess there is no easy way of validating DF except actioning it by
> show(1,0) etc and checking if it works?
>
> Regards,
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 5 May 2020 at 16:41, Brandon Geise <br...@gmail.com> wrote:
>
>> You could use the Hadoop API and check if the file exists.
>>
>>
>>
>> *From: *Mich Talebzadeh <mi...@gmail.com>
>> *Date: *Tuesday, May 5, 2020 at 11:25 AM
>> *To: *"user @spark" <us...@spark.apache.org>
>> *Subject: *Exception handling in Spark
>>
>>
>>
>> Hi,
>>
>>
>>
>> As I understand exception handling in Spark only makes sense if one
>> attempts an action as opposed to lazy transformations?
>>
>>
>>
>> Let us assume that I am reading an XML file from the HDFS directory and
>> create a dataframe DF on it
>>
>>
>>
>> val broadcastValue = "123456789" // I assume this will be sent as a
>> constant for the batch
>>
>> // Create a DF on top of XML
>> val df = spark.read.
>> format("com.databricks.spark.xml").
>> option("rootTag", "hierarchy").
>> option("rowTag", "sms_request").
>> load("/tmp/broadcast.xml")
>>
>> val newDF = df.withColumn("broadcastid", lit(broadcastValue))
>>
>> newDF.createOrReplaceTempView("tmp")
>>
>> // Put data in Hive table
>> //
>> sqltext = """
>> INSERT INTO TABLE michtest.BroadcastStaging PARTITION
>> (broadcastid="123456", brand)
>> SELECT
>> ocis_party_id AS partyId
>> , target_mobile_no AS phoneNumber
>> , brand
>> , broadcastid
>> FROM tmp
>> """
>> //
>>
>> // Here I am performing a collection
>>
>> try {
>>
>> spark.sql(sqltext)
>>
>> } catch {
>>
>> case e: SQLException => e.printStackTrace
>>
>> sys.exit()
>>
>> }
>>
>>
>>
>> Now the issue I have is that what if the xml file /tmp/broadcast.xml
>> does not exist or deleted? I won't be able to catch the error until the
>> hive table is populated. Of course I can write a shell script to check if
>> the file exist before running the job or put small collection like
>> df.show(1,0). Are there more general alternatives?
>>
>>
>>
>> Thanks
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>
Re: Exception handling in Spark
Posted by Mich Talebzadeh <mi...@gmail.com>.
Thanks Brandon!
i should have remembered that.
basically the code gets out with sys.exit(1) if it cannot find the file
I guess there is no easy way of validating DF except actioning it by
show(1,0) etc and checking if it works?
Regards,
Dr Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Tue, 5 May 2020 at 16:41, Brandon Geise <br...@gmail.com> wrote:
> You could use the Hadoop API and check if the file exists.
>
>
>
> *From: *Mich Talebzadeh <mi...@gmail.com>
> *Date: *Tuesday, May 5, 2020 at 11:25 AM
> *To: *"user @spark" <us...@spark.apache.org>
> *Subject: *Exception handling in Spark
>
>
>
> Hi,
>
>
>
> As I understand exception handling in Spark only makes sense if one
> attempts an action as opposed to lazy transformations?
>
>
>
> Let us assume that I am reading an XML file from the HDFS directory and
> create a dataframe DF on it
>
>
>
> val broadcastValue = "123456789" // I assume this will be sent as a
> constant for the batch
>
> // Create a DF on top of XML
> val df = spark.read.
> format("com.databricks.spark.xml").
> option("rootTag", "hierarchy").
> option("rowTag", "sms_request").
> load("/tmp/broadcast.xml")
>
> val newDF = df.withColumn("broadcastid", lit(broadcastValue))
>
> newDF.createOrReplaceTempView("tmp")
>
> // Put data in Hive table
> //
> sqltext = """
> INSERT INTO TABLE michtest.BroadcastStaging PARTITION
> (broadcastid="123456", brand)
> SELECT
> ocis_party_id AS partyId
> , target_mobile_no AS phoneNumber
> , brand
> , broadcastid
> FROM tmp
> """
> //
>
> // Here I am performing a collection
>
> try {
>
> spark.sql(sqltext)
>
> } catch {
>
> case e: SQLException => e.printStackTrace
>
> sys.exit()
>
> }
>
>
>
> Now the issue I have is that what if the xml file /tmp/broadcast.xml does
> not exist or deleted? I won't be able to catch the error until the hive
> table is populated. Of course I can write a shell script to check if the
> file exist before running the job or put small collection like
> df.show(1,0). Are there more general alternatives?
>
>
>
> Thanks
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
Re: Exception handling in Spark
Posted by Brandon Geise <br...@gmail.com>.
You could use the Hadoop API and check if the file exists.
From: Mich Talebzadeh <mi...@gmail.com>
Date: Tuesday, May 5, 2020 at 11:25 AM
To: "user @spark" <us...@spark.apache.org>
Subject: Exception handling in Spark
Hi,
As I understand exception handling in Spark only makes sense if one attempts an action as opposed to lazy transformations?
Let us assume that I am reading an XML file from the HDFS directory and create a dataframe DF on it
val broadcastValue = "123456789" // I assume this will be sent as a constant for the batch
// Create a DF on top of XML
val df = spark.read.
format("com.databricks.spark.xml").
option("rootTag", "hierarchy").
option("rowTag", "sms_request").
load("/tmp/broadcast.xml")
val newDF = df.withColumn("broadcastid", lit(broadcastValue))
newDF.createOrReplaceTempView("tmp")
// Put data in Hive table
//
sqltext = """
INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastid="123456", brand)
SELECT
ocis_party_id AS partyId
, target_mobile_no AS phoneNumber
, brand
, broadcastid
FROM tmp
"""
//
// Here I am performing a collection
try {
spark.sql(sqltext)
} catch {
case e: SQLException => e.printStackTrace
sys.exit()
}
Now the issue I have is that what if the xml file /tmp/broadcast.xml does not exist or deleted? I won't be able to catch the error until the hive table is populated. Of course I can write a shell script to check if the file exist before running the job or put small collection like df.show(1,0). Are there more general alternatives?
Thanks
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.