You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Adam Gilmore <dr...@gmail.com> on 2014/12/22 06:11:14 UTC

Parquet schema changes

Hi all,

I understand that parquet allows for schema versioning automatically in the
format; however, I'm not sure whether Spark supports this.

I'm saving a SchemaRDD to a parquet file, registering it as a table, then
doing an insertInto with a SchemaRDD with an extra column.

The second SchemaRDD does in fact get inserted, but the extra column isn't
present when I try to query it with Spark SQL.

Is there anything I can do to get this working how I'm hoping?

Re: Parquet schema changes

Posted by Adam Gilmore <dr...@gmail.com>.
Fantastic - glad to see that it's in the pipeline!

On Wed, Jan 7, 2015 at 11:27 AM, Michael Armbrust <mi...@databricks.com>
wrote:

> I want to support this but we don't yet.  Here is the JIRA:
> https://issues.apache.org/jira/browse/SPARK-3851
>
> On Tue, Jan 6, 2015 at 5:23 PM, Adam Gilmore <dr...@gmail.com>
> wrote:
>
>> Anyone got any further thoughts on this?  I saw the _metadata file seems
>> to store the schema of every single part (i.e. file) in the parquet
>> directory, so in theory it should be possible.
>>
>> Effectively, our use case is that we have a stack of JSON that we receive
>> and we want to encode to Parquet for high performance, but there is
>> potential of new fields being added to the JSON structure, so we want to be
>> able to handle that every time we encode to Parquet (we'll be doing it
>> "incrementally" for performance).
>>
>> On Mon, Jan 5, 2015 at 3:44 PM, Adam Gilmore <dr...@gmail.com>
>> wrote:
>>
>>> I saw that in the source, which is why I was wondering.
>>>
>>> I was mainly reading:
>>>
>>> http://blog.cloudera.com/blog/2013/10/parquet-at-salesforce-com/
>>>
>>> "A query that tries to parse the organizationId and userId from the 2
>>> logTypes should be able to do so correctly, though they are positioned
>>> differently in the schema. With Parquet, it’s not a problem. It will merge
>>> ‘A’ and ‘V’ schemas and project columns accordingly. It does so by
>>> maintaining a file schema in addition to merged schema and parsing the
>>> columns by referencing the 2."
>>>
>>> I know that each part file can have its own schema, but I saw in the
>>> implementation for Spark, if there was no metadata file, it'd just pick the
>>> first file and use that schema across the board.  I'm not quite sure how
>>> other implementations like Impala etc. deal with this, but I was really
>>> hoping there'd be a way to "version" the schema as new records are added
>>> and just project it through.
>>>
>>> Would be a godsend for semi-structured data.
>>>
>>> On Tue, Dec 23, 2014 at 3:33 PM, Cheng Lian <li...@gmail.com>
>>> wrote:
>>>
>>>>  I must missed something important here, could you please provide more
>>>> clue on Parquet “schema versioning”? I wasn’t aware of this feature (which
>>>> sounds really useful).
>>>>
>>>> Especially, are you referring the following scenario:
>>>>
>>>>    1. Write some data whose schema is A to “t.parquet”, resulting a
>>>>    file “t.parquet/parquet-r-1.part” on HDFS
>>>>    2. Append more data whose schema B “contains” A, but has more
>>>>    columns to “t.parquet”, resulting another file “t.parquet/parquet-r-2.part”
>>>>    on HDFS
>>>>    3. Now read “t.parquet”, and schema A and B are expected to be
>>>>    merged
>>>>
>>>> If this is the case, then current Spark SQL doesn’t support this. We
>>>> assume schemas of all data within a single Parquet file (which is an HDFS
>>>> directory with multiple part-files) are identical.
>>>>
>>>> On 12/22/14 1:11 PM, Adam Gilmore wrote:
>>>>
>>>>    Hi all,
>>>>
>>>>  I understand that parquet allows for schema versioning automatically
>>>> in the format; however, I'm not sure whether Spark supports this.
>>>>
>>>>  I'm saving a SchemaRDD to a parquet file, registering it as a table,
>>>> then doing an insertInto with a SchemaRDD with an extra column.
>>>>
>>>>  The second SchemaRDD does in fact get inserted, but the extra column
>>>> isn't present when I try to query it with Spark SQL.
>>>>
>>>>  Is there anything I can do to get this working how I'm hoping?
>>>>
>>>>   ​
>>>>
>>>
>>>
>>
>

Re: Parquet schema changes

Posted by Michael Armbrust <mi...@databricks.com>.
I want to support this but we don't yet.  Here is the JIRA:
https://issues.apache.org/jira/browse/SPARK-3851

On Tue, Jan 6, 2015 at 5:23 PM, Adam Gilmore <dr...@gmail.com> wrote:

> Anyone got any further thoughts on this?  I saw the _metadata file seems
> to store the schema of every single part (i.e. file) in the parquet
> directory, so in theory it should be possible.
>
> Effectively, our use case is that we have a stack of JSON that we receive
> and we want to encode to Parquet for high performance, but there is
> potential of new fields being added to the JSON structure, so we want to be
> able to handle that every time we encode to Parquet (we'll be doing it
> "incrementally" for performance).
>
> On Mon, Jan 5, 2015 at 3:44 PM, Adam Gilmore <dr...@gmail.com>
> wrote:
>
>> I saw that in the source, which is why I was wondering.
>>
>> I was mainly reading:
>>
>> http://blog.cloudera.com/blog/2013/10/parquet-at-salesforce-com/
>>
>> "A query that tries to parse the organizationId and userId from the 2
>> logTypes should be able to do so correctly, though they are positioned
>> differently in the schema. With Parquet, it’s not a problem. It will merge
>> ‘A’ and ‘V’ schemas and project columns accordingly. It does so by
>> maintaining a file schema in addition to merged schema and parsing the
>> columns by referencing the 2."
>>
>> I know that each part file can have its own schema, but I saw in the
>> implementation for Spark, if there was no metadata file, it'd just pick the
>> first file and use that schema across the board.  I'm not quite sure how
>> other implementations like Impala etc. deal with this, but I was really
>> hoping there'd be a way to "version" the schema as new records are added
>> and just project it through.
>>
>> Would be a godsend for semi-structured data.
>>
>> On Tue, Dec 23, 2014 at 3:33 PM, Cheng Lian <li...@gmail.com>
>> wrote:
>>
>>>  I must missed something important here, could you please provide more
>>> clue on Parquet “schema versioning”? I wasn’t aware of this feature (which
>>> sounds really useful).
>>>
>>> Especially, are you referring the following scenario:
>>>
>>>    1. Write some data whose schema is A to “t.parquet”, resulting a
>>>    file “t.parquet/parquet-r-1.part” on HDFS
>>>    2. Append more data whose schema B “contains” A, but has more
>>>    columns to “t.parquet”, resulting another file “t.parquet/parquet-r-2.part”
>>>    on HDFS
>>>    3. Now read “t.parquet”, and schema A and B are expected to be merged
>>>
>>> If this is the case, then current Spark SQL doesn’t support this. We
>>> assume schemas of all data within a single Parquet file (which is an HDFS
>>> directory with multiple part-files) are identical.
>>>
>>> On 12/22/14 1:11 PM, Adam Gilmore wrote:
>>>
>>>    Hi all,
>>>
>>>  I understand that parquet allows for schema versioning automatically
>>> in the format; however, I'm not sure whether Spark supports this.
>>>
>>>  I'm saving a SchemaRDD to a parquet file, registering it as a table,
>>> then doing an insertInto with a SchemaRDD with an extra column.
>>>
>>>  The second SchemaRDD does in fact get inserted, but the extra column
>>> isn't present when I try to query it with Spark SQL.
>>>
>>>  Is there anything I can do to get this working how I'm hoping?
>>>
>>>   ​
>>>
>>
>>
>

Re: Parquet schema changes

Posted by Adam Gilmore <dr...@gmail.com>.
Anyone got any further thoughts on this?  I saw the _metadata file seems to
store the schema of every single part (i.e. file) in the parquet directory,
so in theory it should be possible.

Effectively, our use case is that we have a stack of JSON that we receive
and we want to encode to Parquet for high performance, but there is
potential of new fields being added to the JSON structure, so we want to be
able to handle that every time we encode to Parquet (we'll be doing it
"incrementally" for performance).

On Mon, Jan 5, 2015 at 3:44 PM, Adam Gilmore <dr...@gmail.com> wrote:

> I saw that in the source, which is why I was wondering.
>
> I was mainly reading:
>
> http://blog.cloudera.com/blog/2013/10/parquet-at-salesforce-com/
>
> "A query that tries to parse the organizationId and userId from the 2
> logTypes should be able to do so correctly, though they are positioned
> differently in the schema. With Parquet, it’s not a problem. It will merge
> ‘A’ and ‘V’ schemas and project columns accordingly. It does so by
> maintaining a file schema in addition to merged schema and parsing the
> columns by referencing the 2."
>
> I know that each part file can have its own schema, but I saw in the
> implementation for Spark, if there was no metadata file, it'd just pick the
> first file and use that schema across the board.  I'm not quite sure how
> other implementations like Impala etc. deal with this, but I was really
> hoping there'd be a way to "version" the schema as new records are added
> and just project it through.
>
> Would be a godsend for semi-structured data.
>
> On Tue, Dec 23, 2014 at 3:33 PM, Cheng Lian <li...@gmail.com> wrote:
>
>>  I must missed something important here, could you please provide more
>> clue on Parquet “schema versioning”? I wasn’t aware of this feature (which
>> sounds really useful).
>>
>> Especially, are you referring the following scenario:
>>
>>    1. Write some data whose schema is A to “t.parquet”, resulting a file
>>    “t.parquet/parquet-r-1.part” on HDFS
>>    2. Append more data whose schema B “contains” A, but has more columns
>>    to “t.parquet”, resulting another file “t.parquet/parquet-r-2.part” on HDFS
>>    3. Now read “t.parquet”, and schema A and B are expected to be merged
>>
>> If this is the case, then current Spark SQL doesn’t support this. We
>> assume schemas of all data within a single Parquet file (which is an HDFS
>> directory with multiple part-files) are identical.
>>
>> On 12/22/14 1:11 PM, Adam Gilmore wrote:
>>
>>    Hi all,
>>
>>  I understand that parquet allows for schema versioning automatically in
>> the format; however, I'm not sure whether Spark supports this.
>>
>>  I'm saving a SchemaRDD to a parquet file, registering it as a table,
>> then doing an insertInto with a SchemaRDD with an extra column.
>>
>>  The second SchemaRDD does in fact get inserted, but the extra column
>> isn't present when I try to query it with Spark SQL.
>>
>>  Is there anything I can do to get this working how I'm hoping?
>>
>>   ​
>>
>
>

Re: Parquet schema changes

Posted by Adam Gilmore <dr...@gmail.com>.
I saw that in the source, which is why I was wondering.

I was mainly reading:

http://blog.cloudera.com/blog/2013/10/parquet-at-salesforce-com/

"A query that tries to parse the organizationId and userId from the 2
logTypes should be able to do so correctly, though they are positioned
differently in the schema. With Parquet, it’s not a problem. It will merge
‘A’ and ‘V’ schemas and project columns accordingly. It does so by
maintaining a file schema in addition to merged schema and parsing the
columns by referencing the 2."

I know that each part file can have its own schema, but I saw in the
implementation for Spark, if there was no metadata file, it'd just pick the
first file and use that schema across the board.  I'm not quite sure how
other implementations like Impala etc. deal with this, but I was really
hoping there'd be a way to "version" the schema as new records are added
and just project it through.

Would be a godsend for semi-structured data.

On Tue, Dec 23, 2014 at 3:33 PM, Cheng Lian <li...@gmail.com> wrote:

>  I must missed something important here, could you please provide more
> clue on Parquet “schema versioning”? I wasn’t aware of this feature (which
> sounds really useful).
>
> Especially, are you referring the following scenario:
>
>    1. Write some data whose schema is A to “t.parquet”, resulting a file
>    “t.parquet/parquet-r-1.part” on HDFS
>    2. Append more data whose schema B “contains” A, but has more columns
>    to “t.parquet”, resulting another file “t.parquet/parquet-r-2.part” on HDFS
>    3. Now read “t.parquet”, and schema A and B are expected to be merged
>
> If this is the case, then current Spark SQL doesn’t support this. We
> assume schemas of all data within a single Parquet file (which is an HDFS
> directory with multiple part-files) are identical.
>
> On 12/22/14 1:11 PM, Adam Gilmore wrote:
>
>    Hi all,
>
>  I understand that parquet allows for schema versioning automatically in
> the format; however, I'm not sure whether Spark supports this.
>
>  I'm saving a SchemaRDD to a parquet file, registering it as a table,
> then doing an insertInto with a SchemaRDD with an extra column.
>
>  The second SchemaRDD does in fact get inserted, but the extra column
> isn't present when I try to query it with Spark SQL.
>
>  Is there anything I can do to get this working how I'm hoping?
>
>   ​
>

Re: Parquet schema changes

Posted by Cheng Lian <li...@gmail.com>.
I must missed something important here, could you please provide more 
clue on Parquet “schema versioning”? I wasn’t aware of this feature 
(which sounds really useful).

Especially, are you referring the following scenario:

 1. Write some data whose schema is A to “t.parquet”, resulting a file
    “t.parquet/parquet-r-1.part” on HDFS
 2. Append more data whose schema B “contains” A, but has more columns
    to “t.parquet”, resulting another file “t.parquet/parquet-r-2.part”
    on HDFS
 3. Now read “t.parquet”, and schema A and B are expected to be merged

If this is the case, then current Spark SQL doesn’t support this. We 
assume schemas of all data within a single Parquet file (which is an 
HDFS directory with multiple part-files) are identical.

On 12/22/14 1:11 PM, Adam Gilmore wrote:

> Hi all,
>
> I understand that parquet allows for schema versioning automatically 
> in the format; however, I'm not sure whether Spark supports this.
>
> I'm saving a SchemaRDD to a parquet file, registering it as a table, 
> then doing an insertInto with a SchemaRDD with an extra column.
>
> The second SchemaRDD does in fact get inserted, but the extra column 
> isn't present when I try to query it with Spark SQL.
>
> Is there anything I can do to get this working how I'm hoping?

​

Re: Parquet schema changes

Posted by Yana Kadiyska <ya...@gmail.com>.
So, there might be a shorter path to success, I'd be curious too. What I
was able to do is

1. Create the RDD
2. Apply a schema that is 1 column wider
3. register as table
4. insert new data with 1 extra column

I believe you'd have to do step 2 -- if you're inserting into a schema, and
you have extra columns, it would be logical that they get dropped. I
believe in a scenario where this is done over time you'd have a step 1a,
where you register your table, but once your schema grows, you'd have to
register the table again, this time from a schemaRDD that has more columns


On Mon, Dec 22, 2014 at 12:11 AM, Adam Gilmore <dr...@gmail.com>
wrote:

> Hi all,
>
> I understand that parquet allows for schema versioning automatically in
> the format; however, I'm not sure whether Spark supports this.
>
> I'm saving a SchemaRDD to a parquet file, registering it as a table, then
> doing an insertInto with a SchemaRDD with an extra column.
>
> The second SchemaRDD does in fact get inserted, but the extra column isn't
> present when I try to query it with Spark SQL.
>
> Is there anything I can do to get this working how I'm hoping?
>