You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by Dave Sugden <da...@shopify.com.INVALID> on 2019/08/14 16:03:55 UTC

Iceberg Spark PartitionedWriter with Kafka source

Hi,
We would like to be able to use the iceberg spark Datasource
(IcebergSource) to write kafka sourced streaming dataframes.

In tests, we are able to succesfully create a partitioned table and write
when using the MemoryStream, but when using a kafka source:

*spark.readStream.format("kafka")*

and writing to iceberg:








*dataFrame.writeStream          .format("catwalk-iceberg")
.outputMode(OutputMode.Append)          .trigger(Trigger.Once)
.option("path", uri.toString)          .option("checkpointLocation",
Paths.get(uri.toString, "checkpoint").toString)          .start
.awaitTermination*

we get this exception:

Caused by: java.lang.IllegalStateException: Already closed file for
partition: happened_at_day=2000-01-01
at
org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:389)
at
org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:350)

Before I dig deeper, is this something that has worked for anyone?

Thanks!

Re: Iceberg Spark PartitionedWriter with Kafka source

Posted by Dave Sugden <da...@shopify.com.INVALID>.
The repartition by happened_at_day worked. Thanks!


On Wed, Aug 14, 2019 at 12:53 PM Ryan Blue <rb...@netflix.com.invalid>
wrote:

> Anton's solution is right. Iceberg doesn't allow any task to open more
> than one file in a partition to avoid creating a huge number of output
> files.
>
> We generally recommend applying a global sort to data when writing from
> batch, or adding a repartition to ensure that each task gets the data for
> just one partition. That may not work here, though.
>
> Looks like the problem here is that your data has records for 2000-01-01
> mixed with, say, 2000-01-02. Iceberg closes the 2000-01-01 file to open a
> file for 2000-01-02. Then it needs 2000-01-01 again, finds that it's
> already closed that file, and throws the exception.
>
> If you can repartition by happened_at_day, that would fix it. Otherwise, I
> think it may be a good idea to keep files open in the streaming writer. We
> don't do that in batch because it can take so much memory, but in streaming
> you can't necessarily add a sortWithinPartitions to group the data together.
>
> On Wed, Aug 14, 2019 at 9:12 AM Anton Okolnychyi
> <ao...@apple.com.invalid> wrote:
>
>> Hi,
>>
>> The exception you see is because of a check in Iceberg that prevents the
>> same Spark task to write too many small files for the same partition. It is
>> the same for batch and stream writes. To avoid that, you should collocate
>> all records for the same data partition within your Spark partition. That
>> can be done by sorting the data by partition columns either globally or
>> within partitions.
>>
>> Spark file source does the same but implicitly [1].
>>
>> Hope that helps,
>> Anton
>>
>> [1] -
>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L179
>>
>>
>> On 14 Aug 2019, at 17:03, Dave Sugden <da...@shopify.com.INVALID>
>> wrote:
>>
>> Hi,
>> We would like to be able to use the iceberg spark Datasource
>> (IcebergSource) to write kafka sourced streaming dataframes.
>>
>> In tests, we are able to succesfully create a partitioned table and write
>> when using the MemoryStream, but when using a kafka source:
>>
>> *spark.readStream.format("kafka")*
>>
>> and writing to iceberg:
>>
>>
>>
>>
>>
>>
>>
>>
>> *dataFrame.writeStream          .format("catwalk-iceberg")
>> .outputMode(OutputMode.Append)          .trigger(Trigger.Once)
>> .option("path", uri.toString)          .option("checkpointLocation",
>> Paths.get(uri.toString, "checkpoint").toString)          .start
>> .awaitTermination*
>>
>> we get this exception:
>>
>> Caused by: java.lang.IllegalStateException: Already closed file for
>> partition: happened_at_day=2000-01-01
>> at
>> org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:389)
>> at
>> org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:350)
>>
>> Before I dig deeper, is this something that has worked for anyone?
>>
>> Thanks!
>>
>>
>>
>>
>>
>>
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Re: Iceberg Spark PartitionedWriter with Kafka source

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Anton's solution is right. Iceberg doesn't allow any task to open more than
one file in a partition to avoid creating a huge number of output files.

We generally recommend applying a global sort to data when writing from
batch, or adding a repartition to ensure that each task gets the data for
just one partition. That may not work here, though.

Looks like the problem here is that your data has records for 2000-01-01
mixed with, say, 2000-01-02. Iceberg closes the 2000-01-01 file to open a
file for 2000-01-02. Then it needs 2000-01-01 again, finds that it's
already closed that file, and throws the exception.

If you can repartition by happened_at_day, that would fix it. Otherwise, I
think it may be a good idea to keep files open in the streaming writer. We
don't do that in batch because it can take so much memory, but in streaming
you can't necessarily add a sortWithinPartitions to group the data together.

On Wed, Aug 14, 2019 at 9:12 AM Anton Okolnychyi
<ao...@apple.com.invalid> wrote:

> Hi,
>
> The exception you see is because of a check in Iceberg that prevents the
> same Spark task to write too many small files for the same partition. It is
> the same for batch and stream writes. To avoid that, you should collocate
> all records for the same data partition within your Spark partition. That
> can be done by sorting the data by partition columns either globally or
> within partitions.
>
> Spark file source does the same but implicitly [1].
>
> Hope that helps,
> Anton
>
> [1] -
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L179
>
>
> On 14 Aug 2019, at 17:03, Dave Sugden <da...@shopify.com.INVALID>
> wrote:
>
> Hi,
> We would like to be able to use the iceberg spark Datasource
> (IcebergSource) to write kafka sourced streaming dataframes.
>
> In tests, we are able to succesfully create a partitioned table and write
> when using the MemoryStream, but when using a kafka source:
>
> *spark.readStream.format("kafka")*
>
> and writing to iceberg:
>
>
>
>
>
>
>
>
> *dataFrame.writeStream          .format("catwalk-iceberg")
> .outputMode(OutputMode.Append)          .trigger(Trigger.Once)
> .option("path", uri.toString)          .option("checkpointLocation",
> Paths.get(uri.toString, "checkpoint").toString)          .start
> .awaitTermination*
>
> we get this exception:
>
> Caused by: java.lang.IllegalStateException: Already closed file for
> partition: happened_at_day=2000-01-01
> at
> org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:389)
> at
> org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:350)
>
> Before I dig deeper, is this something that has worked for anyone?
>
> Thanks!
>
>
>
>
>
>
>

-- 
Ryan Blue
Software Engineer
Netflix

Re: Iceberg Spark PartitionedWriter with Kafka source

Posted by Anton Okolnychyi <ao...@apple.com.INVALID>.
Hi,

The exception you see is because of a check in Iceberg that prevents the same Spark task to write too many small files for the same partition. It is the same for batch and stream writes. To avoid that, you should collocate all records for the same data partition within your Spark partition. That can be done by sorting the data by partition columns either globally or within partitions.

Spark file source does the same but implicitly [1].

Hope that helps,
Anton

[1] - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L179 <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L179> 

> On 14 Aug 2019, at 17:03, Dave Sugden <da...@shopify.com.INVALID> wrote:
> 
> Hi,
> We would like to be able to use the iceberg spark Datasource (IcebergSource) to write kafka sourced streaming dataframes.
> 
> In tests, we are able to succesfully create a partitioned table and write when using the MemoryStream, but when using a kafka source:
> 
> spark.readStream.format("kafka")
> 
> and writing to iceberg:
> 
> dataFrame.writeStream
>           .format("catwalk-iceberg")
>           .outputMode(OutputMode.Append)
>           .trigger(Trigger.Once)
>           .option("path", uri.toString)
>           .option("checkpointLocation", Paths.get(uri.toString, "checkpoint").toString)
>           .start
>           .awaitTermination
> 
> we get this exception:
> 
> Caused by: java.lang.IllegalStateException: Already closed file for partition: happened_at_day=2000-01-01
> at org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:389)
> at org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:350)
> 
> Before I dig deeper, is this something that has worked for anyone?
> 
> Thanks!
> 
> 
> 
> 
>