You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Hamish Whittal <ha...@cloud-fundis.co.za> on 2020/03/01 21:56:56 UTC

Hi there,

I have an hdfs directory with thousands of files. It seems that some of
them - and I don't know which ones - have a problem with their schema and
it's causing my Spark application to fail with this error:

Caused by: org.apache.spark.sql.execution.QueryExecutionException: Parquet
column cannot be converted in file hdfs://
ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-00000-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet.
Column: [price], Expected: double, Found: FIXED_LEN_BYTE_ARRAY

The problem is not only that it's causing the application to fail, but
every time if does fail, I have to copy that file out of the directory and
start the app again.

I thought of trying to use try-except, but I can't seem to get that to work.

Is there any advice anyone can give me because I really can't see myself
going through thousands of files trying to figure out which ones are broken.

Thanks in advance,

hamish

Re:

Posted by Wim Van Leuven <wi...@highestpoint.biz>.
Hey Hamish,

I don't think there is 'automatic fix' for this problem ...
Are you reading those as partitions of a single dataset? Or are you
processing them individually?

As apparently, your incoming data is not stable, you should implement a
preprocessing step on each file to check and, if necessary,  to align the
faulty datasets with what you expect.

HTH
-wim

Re:

Posted by Wim Van Leuven <wi...@highestpoint.biz>.
Ok, good luck!

On Mon, 2 Mar 2020 at 10:04, Hamish Whittal <ha...@cloud-fundis.co.za>
wrote:

> Enrico, Wim (and privately Neil), thanks for the replies. I will give your
> suggestions a whirl.
>
> Basically Wim recommended a pre-processing step to weed out the
> problematic files. I am going to build that into the pipeline. I am not
> sure how the problems are creeping in because this is a regular lift from a
> PGSQL db/table. And so some of these files are correct and some are
> patently wrong.
>
> I'm working around the problem by trying small subsets of the 3000+ files,
> but until I can weed out the problem files the processing is going to fail.
> I need something more bulletproof than what I'm doing. So this is what I'm
> going to try now.
>
> Hamish
>
> On Mon, Mar 2, 2020 at 10:15 AM Enrico Minack <ma...@enrico.minack.dev>
> wrote:
>
>> Looks like the schema of some files is unexpected.
>>
>> You could either run parquet-tools on each of the files and extract the
>> schema to find the problematic files:
>>
>> hdfs -stat "%n" hdfs://
>> ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet
>> <http://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-00000-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet>
>> | while read file
>> do
>>    echo -n "$file: "
>>    hadoop jar parquet-tools-1.9.0.jar schema $file
>> done
>>
>>
>> https://confusedcoders.com/data-engineering/hadoop/how-to-view-content-of-parquet-files-on-s3hdfs-from-hadoop-cluster-using-parquet-tools
>>
>>
>> Or you can use Spark to investigate the parquet files in parallel:
>>
>> spark.sparkContext
>>   .binaryFiles("hdfs://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet <http://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-00000-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet>")
>>   .map { case (path, _) =>
>>     import collection.JavaConverters._    val file = HadoopInputFile.fromPath(new Path(path), new Configuration())
>>     val reader = ParquetFileReader.open(file)
>>     try {
>>       val schema = reader.getFileMetaData().getSchema
>>       (
>>         schema.getName,        schema.getFields.asScala.map(f => (
>>           Option(f.getId).map(_.intValue()),          f.getName,          Option(f.getOriginalType).map(_.name()),          Option(f.getRepetition).map(_.name()))
>>         ).toArray
>>       )
>>     } finally {
>>       reader.close()
>>     }
>>   }
>>   .toDF("schema name", "fields")
>>   .show(false)
>>
>> .binaryFiles provides you all filenames that match the given pattern as
>> an RDD, so the following .map is executed on the Spark executors.
>> The map then opens each parquet file via ParquetFileReader and provides
>> access to its schema and data.
>>
>> I hope this points you in the right direction.
>>
>> Enrico
>>
>>
>> Am 01.03.20 um 22:56 schrieb Hamish Whittal:
>>
>> Hi there,
>>
>> I have an hdfs directory with thousands of files. It seems that some of
>> them - and I don't know which ones - have a problem with their schema and
>> it's causing my Spark application to fail with this error:
>>
>> Caused by: org.apache.spark.sql.execution.QueryExecutionException:
>> Parquet column cannot be converted in file hdfs://
>> ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-00000-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet.
>> Column: [price], Expected: double, Found: FIXED_LEN_BYTE_ARRAY
>>
>> The problem is not only that it's causing the application to fail, but
>> every time if does fail, I have to copy that file out of the directory and
>> start the app again.
>>
>> I thought of trying to use try-except, but I can't seem to get that to
>> work.
>>
>> Is there any advice anyone can give me because I really can't see myself
>> going through thousands of files trying to figure out which ones are broken.
>>
>> Thanks in advance,
>>
>> hamish
>>
>>
>>
>
> --
> Cloud-Fundis.co.za
> Cape Town, South Africa
> +27 79 614 4913
>

Re:

Posted by Hamish Whittal <ha...@cloud-fundis.co.za>.
Enrico, Wim (and privately Neil), thanks for the replies. I will give your
suggestions a whirl.

Basically Wim recommended a pre-processing step to weed out the problematic
files. I am going to build that into the pipeline. I am not sure how the
problems are creeping in because this is a regular lift from a PGSQL
db/table. And so some of these files are correct and some are patently
wrong.

I'm working around the problem by trying small subsets of the 3000+ files,
but until I can weed out the problem files the processing is going to fail.
I need something more bulletproof than what I'm doing. So this is what I'm
going to try now.

Hamish

On Mon, Mar 2, 2020 at 10:15 AM Enrico Minack <ma...@enrico.minack.dev>
wrote:

> Looks like the schema of some files is unexpected.
>
> You could either run parquet-tools on each of the files and extract the
> schema to find the problematic files:
>
> hdfs -stat "%n" hdfs://
> ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet
> <http://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-00000-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet>
> | while read file
> do
>    echo -n "$file: "
>    hadoop jar parquet-tools-1.9.0.jar schema $file
> done
>
>
> https://confusedcoders.com/data-engineering/hadoop/how-to-view-content-of-parquet-files-on-s3hdfs-from-hadoop-cluster-using-parquet-tools
>
>
> Or you can use Spark to investigate the parquet files in parallel:
>
> spark.sparkContext
>   .binaryFiles("hdfs://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet <http://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-00000-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet>")
>   .map { case (path, _) =>
>     import collection.JavaConverters._    val file = HadoopInputFile.fromPath(new Path(path), new Configuration())
>     val reader = ParquetFileReader.open(file)
>     try {
>       val schema = reader.getFileMetaData().getSchema
>       (
>         schema.getName,        schema.getFields.asScala.map(f => (
>           Option(f.getId).map(_.intValue()),          f.getName,          Option(f.getOriginalType).map(_.name()),          Option(f.getRepetition).map(_.name()))
>         ).toArray
>       )
>     } finally {
>       reader.close()
>     }
>   }
>   .toDF("schema name", "fields")
>   .show(false)
>
> .binaryFiles provides you all filenames that match the given pattern as an
> RDD, so the following .map is executed on the Spark executors.
> The map then opens each parquet file via ParquetFileReader and provides
> access to its schema and data.
>
> I hope this points you in the right direction.
>
> Enrico
>
>
> Am 01.03.20 um 22:56 schrieb Hamish Whittal:
>
> Hi there,
>
> I have an hdfs directory with thousands of files. It seems that some of
> them - and I don't know which ones - have a problem with their schema and
> it's causing my Spark application to fail with this error:
>
> Caused by: org.apache.spark.sql.execution.QueryExecutionException: Parquet
> column cannot be converted in file hdfs://
> ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-00000-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet.
> Column: [price], Expected: double, Found: FIXED_LEN_BYTE_ARRAY
>
> The problem is not only that it's causing the application to fail, but
> every time if does fail, I have to copy that file out of the directory and
> start the app again.
>
> I thought of trying to use try-except, but I can't seem to get that to
> work.
>
> Is there any advice anyone can give me because I really can't see myself
> going through thousands of files trying to figure out which ones are broken.
>
> Thanks in advance,
>
> hamish
>
>
>

-- 
Cloud-Fundis.co.za
Cape Town, South Africa
+27 79 614 4913

Re:

Posted by Enrico Minack <ma...@Enrico.Minack.dev>.
Looks like the schema of some files is unexpected.

You could either run parquet-tools on each of the files and extract the 
schema to find the problematic files:

|hdfs |||-stat "%n"| 
|hdfs://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet 
<http://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-00000-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet>| 
while read file
do
    echo -n "$file: "
    hadoop jar parquet-tools-1.9.0.jar schema $file
done

https://confusedcoders.com/data-engineering/hadoop/how-to-view-content-of-parquet-files-on-s3hdfs-from-hadoop-cluster-using-parquet-tools

||


Or you can use Spark to investigate the parquet files in parallel:

spark.sparkContext
   .binaryFiles("||hdfs://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet 
<http://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-00000-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet>")
   .map {case (path, _) =>
     import collection.JavaConverters._
val file = HadoopInputFile.fromPath(new Path(path), new Configuration())
     val reader = ParquetFileReader.open(file)
     try {
       val schema = reader.getFileMetaData().getSchema
       (
         schema.getName, schema.getFields.asScala.map(f => (
           Option(f.getId).map(_.intValue()), f.getName, Option(f.getOriginalType).map(_.name()), Option(f.getRepetition).map(_.name()))
         ).toArray
       )
     }finally {
       reader.close()
     }
   }
   .toDF("schema name", "fields")
   .show(false)

.binaryFiles provides you all filenames that match the given pattern as 
an RDD, so the following .map is executed on the Spark executors.
The map then opens each parquet file via ParquetFileReader and provides 
access to its schema and data.

I hope this points you in the right direction.

Enrico


Am 01.03.20 um 22:56 schrieb Hamish Whittal:
> Hi there,
>
> I have an hdfs directory with thousands of files. It seems that some 
> of them - and I don't know which ones - have a problem with their 
> schema and it's causing my Spark application to fail with this error:
>
> Caused by: org.apache.spark.sql.execution.QueryExecutionException: 
> Parquet column cannot be converted in file 
> hdfs://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-00000-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet 
> <http://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-00000-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet>. 
> Column: [price], Expected: double, Found: FIXED_LEN_BYTE_ARRAY
>
> The problem is not only that it's causing the application to fail, but 
> every time if does fail, I have to copy that file out of the directory 
> and start the app again.
>
> I thought of trying to use try-except, but I can't seem to get that to 
> work.
>
> Is there any advice anyone can give me because I really can't see 
> myself going through thousands of files trying to figure out which 
> ones are broken.
>
> Thanks in advance,
>
> hamish