You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Pavel Knoblokh <kn...@gmail.com> on 2017/10/02 11:37:31 UTC

Re: Saving dataframes with partitionBy: append partitions, overwrite within each

If your processing task inherently processes input data by month you
may want to "manually" partition the output data by month as well as
by day, that is to save it with a file name including the given month,
i.e. "dataset.parquet/month=01". Then you will be able to use the
overwrite mode with each month partition. Hope this could be of some
help.

-- 
Pavel Knoblokh

On Fri, Sep 29, 2017 at 5:31 PM, peay <pe...@protonmail.com> wrote:
> Hello,
>
> I am trying to use
> data_frame.write.partitionBy("day").save("dataset.parquet") to write a
> dataset while splitting by day.
>
> I would like to run a Spark job  to process, e.g., a month:
> dataset.parquet/day=2017-01-01/...
> ...
>
> and then run another Spark job to add another month using the same folder
> structure, getting me
> dataset.parquet/day=2017-01-01/
> ...
> dataset.parquet/day=2017-02-01/
> ...
>
> However:
> - with save mode "overwrite", when I process the second month, all of
> dataset.parquet/ gets removed and I lose whatever was already computed for
> the previous month.
> - with save mode "append", then I can't get idempotence: if I run the job to
> process a given month twice, I'll get duplicate data in all the subfolders
> for that month.
>
> Is there a way to do "append in terms of the subfolders from partitionBy,
> but overwrite within each such partitions? Any help would be appreciated.
>
> Thanks!



-- 
Pavel Knoblokh

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Saving dataframes with partitionBy: append partitions, overwrite within each

Posted by Nirav Patel <np...@xactlycorp.com>.
I tried following to explicitly specify partition columns in sql statement
and also tried different cases (upper and lower) fro partition columns.

insert overwrite table $tableName PARTITION(P1, P2) select A, B, C, P1, P2
from updateTable.

Still getting:

Caused by:
org.apache.hadoop.hive.ql.metadata.Table$ValidationFailureSemanticException:
Partition spec {p1=, p2=, P1=1085, P2=164590861} contains non-partition
columns



On Thu, Aug 2, 2018 at 11:37 AM, Nirav Patel <np...@xactlycorp.com> wrote:

> Thanks Koert. I'll check that out when we can update to 2.3
>
> Meanwhile, I am trying hive sql (INSERT OVERWRITE) statement to insert
> overwrite multiple partitions. (without loosing existing ones)
>
> It's giving me issues around partition columns.
>
>     dataFrame.createOrReplaceTempView("updateTable") //here dataframe
> contains values from multiple partitions.
>
> dataFrame also have partition columns but I can't get any of following to
> execute:
>
> insert overwrite table $tableName PARTITION(P1, P2) select * from
> updateTable.
>
> org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.
> metadata.Table.ValidationFailureSemanticException: Partition spec {p1=,
> p2=, P1=__HIVE_DEFAULT_PARTITION__, P2=1} contains non-partition columns;
>
>
> Is above a right approach to update multiple partitions? Or should I be
> more specific updating each partition with separate command like following:
>
> //Pseudo code; yet to try
>
> df.createOrReplaceTempView("updateTable")
> df.rdd.groupBy(P1, P2).map { (key, Iterable[Row]) =>
>
>
>   spark.sql("INSERT OVERWRITE TABLE stats
>   PARTITION(P1 = key._1, P2 = key._2)
>   SELECT * from updateTable where P1 = key._1 and P2 = key._2")
> }
>
> Regards,
> Nirav
>
>
> On Wed, Aug 1, 2018 at 4:18 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> this works for dataframes with spark 2.3 by changing a global setting,
>> and will be configurable per write in 2.4
>> see:
>> https://issues.apache.org/jira/browse/SPARK-20236
>> https://issues.apache.org/jira/browse/SPARK-24860
>>
>> On Wed, Aug 1, 2018 at 3:11 PM, Nirav Patel <np...@xactlycorp.com>
>> wrote:
>>
>>> Hi Peay,
>>>
>>> Have you find better solution yet? I am having same issue.
>>>
>>> Following says it works with spark 2.1 onward but only when you use
>>> sqlContext and not Dataframe
>>> https://medium.com/@anuvrat/writing-into-dynamic-partitions-
>>> using-spark-2e2b818a007a
>>>
>>> Thanks,
>>> Nirav
>>>
>>> On Mon, Oct 2, 2017 at 4:37 AM, Pavel Knoblokh <kn...@gmail.com>
>>> wrote:
>>>
>>>> If your processing task inherently processes input data by month you
>>>> may want to "manually" partition the output data by month as well as
>>>> by day, that is to save it with a file name including the given month,
>>>> i.e. "dataset.parquet/month=01". Then you will be able to use the
>>>> overwrite mode with each month partition. Hope this could be of some
>>>> help.
>>>>
>>>> --
>>>> Pavel Knoblokh
>>>>
>>>> On Fri, Sep 29, 2017 at 5:31 PM, peay <pe...@protonmail.com> wrote:
>>>> > Hello,
>>>> >
>>>> > I am trying to use
>>>> > data_frame.write.partitionBy("day").save("dataset.parquet") to write
>>>> a
>>>> > dataset while splitting by day.
>>>> >
>>>> > I would like to run a Spark job  to process, e.g., a month:
>>>> > dataset.parquet/day=2017-01-01/...
>>>> > ...
>>>> >
>>>> > and then run another Spark job to add another month using the same
>>>> folder
>>>> > structure, getting me
>>>> > dataset.parquet/day=2017-01-01/
>>>> > ...
>>>> > dataset.parquet/day=2017-02-01/
>>>> > ...
>>>> >
>>>> > However:
>>>> > - with save mode "overwrite", when I process the second month, all of
>>>> > dataset.parquet/ gets removed and I lose whatever was already
>>>> computed for
>>>> > the previous month.
>>>> > - with save mode "append", then I can't get idempotence: if I run the
>>>> job to
>>>> > process a given month twice, I'll get duplicate data in all the
>>>> subfolders
>>>> > for that month.
>>>> >
>>>> > Is there a way to do "append in terms of the subfolders from
>>>> partitionBy,
>>>> > but overwrite within each such partitions? Any help would be
>>>> appreciated.
>>>> >
>>>> > Thanks!
>>>>
>>>>
>>>>
>>>> --
>>>> Pavel Knoblokh
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>
>>>>
>>>
>>>
>>>
>>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>>
>>> <https://www.instagram.com/xactlycorp/>
>>> <https://www.linkedin.com/company/xactly-corporation>
>>> <https://twitter.com/Xactly>   <https://www.facebook.com/XactlyCorp>
>>> <http://www.youtube.com/xactlycorporation>
>>
>>
>>
>

-- 


 <http://www.xactlycorp.com/email-click/>

 
<https://www.instagram.com/xactlycorp/>   
<https://www.linkedin.com/company/xactly-corporation>   
<https://twitter.com/Xactly>   <https://www.facebook.com/XactlyCorp>   
<http://www.youtube.com/xactlycorporation>

Re: Saving dataframes with partitionBy: append partitions, overwrite within each

Posted by Nirav Patel <np...@xactlycorp.com>.
Thanks Koert. I'll check that out when we can update to 2.3

Meanwhile, I am trying hive sql (INSERT OVERWRITE) statement to insert
overwrite multiple partitions. (without loosing existing ones)

It's giving me issues around partition columns.

    dataFrame.createOrReplaceTempView("updateTable") //here dataframe
contains values from multiple partitions.

dataFrame also have partition columns but I can't get any of following to
execute:

insert overwrite table $tableName PARTITION(P1, P2) select * from
updateTable.

org.apache.spark.sql.AnalysisException:
org.apache.hadoop.hive.ql.metadata.Table.ValidationFailureSemanticException:
Partition spec {p1=, p2=, P1=__HIVE_DEFAULT_PARTITION__, P2=1} contains
non-partition columns;


Is above a right approach to update multiple partitions? Or should I be
more specific updating each partition with separate command like following:

//Pseudo code; yet to try

df.createOrReplaceTempView("updateTable")
df.rdd.groupBy(P1, P2).map { (key, Iterable[Row]) =>


  spark.sql("INSERT OVERWRITE TABLE stats
  PARTITION(P1 = key._1, P2 = key._2)
  SELECT * from updateTable where P1 = key._1 and P2 = key._2")
}

Regards,
Nirav


On Wed, Aug 1, 2018 at 4:18 PM, Koert Kuipers <ko...@tresata.com> wrote:

> this works for dataframes with spark 2.3 by changing a global setting, and
> will be configurable per write in 2.4
> see:
> https://issues.apache.org/jira/browse/SPARK-20236
> https://issues.apache.org/jira/browse/SPARK-24860
>
> On Wed, Aug 1, 2018 at 3:11 PM, Nirav Patel <np...@xactlycorp.com> wrote:
>
>> Hi Peay,
>>
>> Have you find better solution yet? I am having same issue.
>>
>> Following says it works with spark 2.1 onward but only when you use
>> sqlContext and not Dataframe
>> https://medium.com/@anuvrat/writing-into-dynamic-partitions-
>> using-spark-2e2b818a007a
>>
>> Thanks,
>> Nirav
>>
>> On Mon, Oct 2, 2017 at 4:37 AM, Pavel Knoblokh <kn...@gmail.com>
>> wrote:
>>
>>> If your processing task inherently processes input data by month you
>>> may want to "manually" partition the output data by month as well as
>>> by day, that is to save it with a file name including the given month,
>>> i.e. "dataset.parquet/month=01". Then you will be able to use the
>>> overwrite mode with each month partition. Hope this could be of some
>>> help.
>>>
>>> --
>>> Pavel Knoblokh
>>>
>>> On Fri, Sep 29, 2017 at 5:31 PM, peay <pe...@protonmail.com> wrote:
>>> > Hello,
>>> >
>>> > I am trying to use
>>> > data_frame.write.partitionBy("day").save("dataset.parquet") to write a
>>> > dataset while splitting by day.
>>> >
>>> > I would like to run a Spark job  to process, e.g., a month:
>>> > dataset.parquet/day=2017-01-01/...
>>> > ...
>>> >
>>> > and then run another Spark job to add another month using the same
>>> folder
>>> > structure, getting me
>>> > dataset.parquet/day=2017-01-01/
>>> > ...
>>> > dataset.parquet/day=2017-02-01/
>>> > ...
>>> >
>>> > However:
>>> > - with save mode "overwrite", when I process the second month, all of
>>> > dataset.parquet/ gets removed and I lose whatever was already computed
>>> for
>>> > the previous month.
>>> > - with save mode "append", then I can't get idempotence: if I run the
>>> job to
>>> > process a given month twice, I'll get duplicate data in all the
>>> subfolders
>>> > for that month.
>>> >
>>> > Is there a way to do "append in terms of the subfolders from
>>> partitionBy,
>>> > but overwrite within each such partitions? Any help would be
>>> appreciated.
>>> >
>>> > Thanks!
>>>
>>>
>>>
>>> --
>>> Pavel Knoblokh
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>
>>>
>>
>>
>>
>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>
>> <https://www.instagram.com/xactlycorp/>
>> <https://www.linkedin.com/company/xactly-corporation>
>> <https://twitter.com/Xactly>   <https://www.facebook.com/XactlyCorp>
>> <http://www.youtube.com/xactlycorporation>
>
>
>

-- 


 <http://www.xactlycorp.com/email-click/>

 
<https://www.instagram.com/xactlycorp/>   
<https://www.linkedin.com/company/xactly-corporation>   
<https://twitter.com/Xactly>   <https://www.facebook.com/XactlyCorp>   
<http://www.youtube.com/xactlycorporation>

Re: Saving dataframes with partitionBy: append partitions, overwrite within each

Posted by Koert Kuipers <ko...@tresata.com>.
this works for dataframes with spark 2.3 by changing a global setting, and
will be configurable per write in 2.4
see:
https://issues.apache.org/jira/browse/SPARK-20236
https://issues.apache.org/jira/browse/SPARK-24860

On Wed, Aug 1, 2018 at 3:11 PM, Nirav Patel <np...@xactlycorp.com> wrote:

> Hi Peay,
>
> Have you find better solution yet? I am having same issue.
>
> Following says it works with spark 2.1 onward but only when you use
> sqlContext and not Dataframe
> https://medium.com/@anuvrat/writing-into-dynamic-partitions-using-spark-
> 2e2b818a007a
>
> Thanks,
> Nirav
>
> On Mon, Oct 2, 2017 at 4:37 AM, Pavel Knoblokh <kn...@gmail.com> wrote:
>
>> If your processing task inherently processes input data by month you
>> may want to "manually" partition the output data by month as well as
>> by day, that is to save it with a file name including the given month,
>> i.e. "dataset.parquet/month=01". Then you will be able to use the
>> overwrite mode with each month partition. Hope this could be of some
>> help.
>>
>> --
>> Pavel Knoblokh
>>
>> On Fri, Sep 29, 2017 at 5:31 PM, peay <pe...@protonmail.com> wrote:
>> > Hello,
>> >
>> > I am trying to use
>> > data_frame.write.partitionBy("day").save("dataset.parquet") to write a
>> > dataset while splitting by day.
>> >
>> > I would like to run a Spark job  to process, e.g., a month:
>> > dataset.parquet/day=2017-01-01/...
>> > ...
>> >
>> > and then run another Spark job to add another month using the same
>> folder
>> > structure, getting me
>> > dataset.parquet/day=2017-01-01/
>> > ...
>> > dataset.parquet/day=2017-02-01/
>> > ...
>> >
>> > However:
>> > - with save mode "overwrite", when I process the second month, all of
>> > dataset.parquet/ gets removed and I lose whatever was already computed
>> for
>> > the previous month.
>> > - with save mode "append", then I can't get idempotence: if I run the
>> job to
>> > process a given month twice, I'll get duplicate data in all the
>> subfolders
>> > for that month.
>> >
>> > Is there a way to do "append in terms of the subfolders from
>> partitionBy,
>> > but overwrite within each such partitions? Any help would be
>> appreciated.
>> >
>> > Thanks!
>>
>>
>>
>> --
>> Pavel Knoblokh
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>
>>
>
>
>
> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>
> <https://www.instagram.com/xactlycorp/>
> <https://www.linkedin.com/company/xactly-corporation>
> <https://twitter.com/Xactly>   <https://www.facebook.com/XactlyCorp>
> <http://www.youtube.com/xactlycorporation>

Re: Saving dataframes with partitionBy: append partitions, overwrite within each

Posted by Nirav Patel <np...@xactlycorp.com>.
Hi Peay,

Have you find better solution yet? I am having same issue.

Following says it works with spark 2.1 onward but only when you use
sqlContext and not Dataframe
https://medium.com/@anuvrat/writing-into-dynamic-partitions-using-spark-2e2b818a007a

Thanks,
Nirav

On Mon, Oct 2, 2017 at 4:37 AM, Pavel Knoblokh <kn...@gmail.com> wrote:

> If your processing task inherently processes input data by month you
> may want to "manually" partition the output data by month as well as
> by day, that is to save it with a file name including the given month,
> i.e. "dataset.parquet/month=01". Then you will be able to use the
> overwrite mode with each month partition. Hope this could be of some
> help.
>
> --
> Pavel Knoblokh
>
> On Fri, Sep 29, 2017 at 5:31 PM, peay <pe...@protonmail.com> wrote:
> > Hello,
> >
> > I am trying to use
> > data_frame.write.partitionBy("day").save("dataset.parquet") to write a
> > dataset while splitting by day.
> >
> > I would like to run a Spark job  to process, e.g., a month:
> > dataset.parquet/day=2017-01-01/...
> > ...
> >
> > and then run another Spark job to add another month using the same folder
> > structure, getting me
> > dataset.parquet/day=2017-01-01/
> > ...
> > dataset.parquet/day=2017-02-01/
> > ...
> >
> > However:
> > - with save mode "overwrite", when I process the second month, all of
> > dataset.parquet/ gets removed and I lose whatever was already computed
> for
> > the previous month.
> > - with save mode "append", then I can't get idempotence: if I run the
> job to
> > process a given month twice, I'll get duplicate data in all the
> subfolders
> > for that month.
> >
> > Is there a way to do "append in terms of the subfolders from partitionBy,
> > but overwrite within each such partitions? Any help would be appreciated.
> >
> > Thanks!
>
>
>
> --
> Pavel Knoblokh
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>

-- 


 <http://www.xactlycorp.com/email-click/>

 
<https://www.instagram.com/xactlycorp/>   
<https://www.linkedin.com/company/xactly-corporation>   
<https://twitter.com/Xactly>   <https://www.facebook.com/XactlyCorp>   
<http://www.youtube.com/xactlycorporation>