You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by Peter Giles <gi...@uw.edu> on 2021/07/08 18:59:25 UTC

Can iceberg write to new files in the table for each micro-batch?

Hi all, I have a non-iceberg Spark streaming process that I'm trying
to re-engineer, and am running into some trouble making it happen
using Iceberg.  I think I'm using a fairly common pattern, so I wonder
if someone here can give me a tip on how to go about it.  I'll try to
be concise but give enough detail to convey the problem:

It's a spark streaming app that takes micro-batches of data from a
kafka topic, does some transformation on it, sorts the in-flight data,
 and then writes the micro-batch out into an existing ORC "table" (not
a hive table, just an HDFS directory that contains all of the
partitions).  The table is partitioned by date+hour, and within each
partition it's ordered by a string field.  There is a constant stream
of incoming data, and it's a whole lot of volume, so micro-batches are
being processed frequently, each of which creates an additional set of
ORC files within the table.  This results in lots of files being
created, way more than is optimal, so after a time, when all the data
for an hour has finally been written out, a separate job
"compacts"/coalesces that hour of data (in other words, it gets
re-written to a smaller number of ORC files).

Why do it this way?
* Data is available for search/analysis almost immediately. All the
previous hours of data, having been compacted, are well optimized, and
having one poorly optimized hour is fine in trade for being able to
access the most recent data too.
* Writing many smaller ORC files for the current hour allows each file
to keep the correct ordering, which turns out to be important:  Using
ORC's bloom filters (AKA "light indexes") in combination with the
sorted data vastly improves search performance.

The major pain point is "compaction": because that process rewrites
the hour of data and then replaces the existing files, it will break
any already running analyses that happen to need rows from that hour.
I want to refactor to use Iceberg so that I can seamlessly do those
compactions thanks to snapshots.

What I *think* I need is a way to get Iceberg to create new files
within the table for each micro-batch.  At first I thought that
perhaps the SparkPartitionedFanoutWriter might be the right tool, but
(a) it doesn't seem to support ORC, and (b), if I'm reading it right,
it wants to use a size threshold to decide when to write to additional
files, which isn't what I need.  Is there a simple answer here, or
would I need a new feature in Iceberg to support this use case?  Or
maybe this is an outdated pattern, and I should be doing it a
different way?

Thank you for bearing with me.  Any suggestions are appreciated.

- Peter

Re: Can iceberg write to new files in the table for each micro-batch?

Posted by Ryan Blue <bl...@tabular.io>.
Hi Peter,

You should be able to use Spark structured streaming to write micro batches
to an Iceberg table. That's documented on the Iceberg site under Structured
Streaming <https://iceberg.apache.org/spark-structured-streaming/>, and you
can check out the tests
<https://github.com/apache/iceberg/blob/master/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java#L86>
if you want full examples. That should support ORC just fine.

For compaction, check out the new MERGE INTO command, which can use a
"replace" operation to rewrite your data. That will help any streaming
consumers because replace snapshots can be ignored because they don't
change the table's data.

I hope that covers it, but please reply if you're still running into any
issues.

Ryan

On Thu, Jul 8, 2021 at 12:02 PM Peter Giles <gi...@uw.edu> wrote:

> Hi all, I have a non-iceberg Spark streaming process that I'm trying
> to re-engineer, and am running into some trouble making it happen
> using Iceberg.  I think I'm using a fairly common pattern, so I wonder
> if someone here can give me a tip on how to go about it.  I'll try to
> be concise but give enough detail to convey the problem:
>
> It's a spark streaming app that takes micro-batches of data from a
> kafka topic, does some transformation on it, sorts the in-flight data,
>  and then writes the micro-batch out into an existing ORC "table" (not
> a hive table, just an HDFS directory that contains all of the
> partitions).  The table is partitioned by date+hour, and within each
> partition it's ordered by a string field.  There is a constant stream
> of incoming data, and it's a whole lot of volume, so micro-batches are
> being processed frequently, each of which creates an additional set of
> ORC files within the table.  This results in lots of files being
> created, way more than is optimal, so after a time, when all the data
> for an hour has finally been written out, a separate job
> "compacts"/coalesces that hour of data (in other words, it gets
> re-written to a smaller number of ORC files).
>
> Why do it this way?
> * Data is available for search/analysis almost immediately. All the
> previous hours of data, having been compacted, are well optimized, and
> having one poorly optimized hour is fine in trade for being able to
> access the most recent data too.
> * Writing many smaller ORC files for the current hour allows each file
> to keep the correct ordering, which turns out to be important:  Using
> ORC's bloom filters (AKA "light indexes") in combination with the
> sorted data vastly improves search performance.
>
> The major pain point is "compaction": because that process rewrites
> the hour of data and then replaces the existing files, it will break
> any already running analyses that happen to need rows from that hour.
> I want to refactor to use Iceberg so that I can seamlessly do those
> compactions thanks to snapshots.
>
> What I *think* I need is a way to get Iceberg to create new files
> within the table for each micro-batch.  At first I thought that
> perhaps the SparkPartitionedFanoutWriter might be the right tool, but
> (a) it doesn't seem to support ORC, and (b), if I'm reading it right,
> it wants to use a size threshold to decide when to write to additional
> files, which isn't what I need.  Is there a simple answer here, or
> would I need a new feature in Iceberg to support this use case?  Or
> maybe this is an outdated pattern, and I should be doing it a
> different way?
>
> Thank you for bearing with me.  Any suggestions are appreciated.
>
> - Peter
>


-- 
Ryan Blue
Tabular