You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@druid.apache.org by Gian Merlino <gi...@apache.org> on 2020/04/30 07:26:11 UTC

Streaming updates and deletes

Hey Druids,

Now that a join operator exists and is well on its way to being useful, I
started thinking about some other pie in the sky ideas. In particular one
that seems very useful is supporting updates and deletes.

Of course, we support updates and deletes today, but only on a
whole-time-chunk basis (you need to rewrite the data for a time chunk in
order to update or delete even a single row). Being able to do it based on
streaming data would open up new use cases.

One that comes up a lot is financial stuff: imagine a dataset where a row
is a transaction (with a transaction ID), and there is a transaction status
field that might go through a few updates until it finally settles. Another
is session analysis: imagine a dataset where a row is a session (with a
session ID) and as new events come in for that session, you want to update
things like the number of events for the session, session duration, etc.

What seems to be common are the following properties.

1) Updates are done by primary key.

2) The total number of primary keys is very large (could be in the
trillions).

3) Each individual key generally gets a small number of updates, relatively
soon after it's inserted, and eventually stops receiving updates.

I don't have a real proposal yet nor the bandwidth to actually work on this
right now. But I wanted to ask if anyone has thoughts on this potential
approach that I think would work for streams with those properties.

Imagine something like this:

- We could encode an update as a delete followed by an insert. For this to
work, both need to be associated with the segment that contained the
original record.

- In deep storage: one segment, instead of being one zip file, is multiple
zip files, identified by a fileNumber. The first file looks like segments
do today. Subsequent files include a normal-looking smoosh file like
today's zips, but also include a bitmap of deleted row IDs from the
previous file. Each zip with a fileNumber > 0 encodes incremental deletes
(in the deleted-row bitmap) and new inserts (in the regular smoosh file).
Let's call these "delta files" and call the one with fileNumber 0 the
"original file". The delta files only contain deletes and inserts for
primary keys that are present in the original file.

- In the metadata store: we'll now have multiple rows per segment in the
segment table (one for the original file and one per delta file). Same for
the pending segment table. The datasource table (where Kafka topic
checkpoints are stored) will need to associate checkpoints with the latest
file number published for a given segment.

- On real-time indexers: they already have code paths that decide what
segments to route incoming records to. We need to add an ability to route
updates and deletes to new delta files for existing segments. But unlike
regular segments, the indexers can't serve queries on top of the delta
files. Queries must be served by the Historicals that are serving those
segments (otherwises, the deltas won't get applied properly). So Indexers
will be creating and publishing these files but not serving them live.

- On historicals: they need to fetch all available delta files from deep
storage for the segments they're serving. They should probably expose the
entire segment (including all delta files) as a single StorageAdapter, so
the query stack doesn't have to change.

Some additional notes:

- I left out the part about how indexers figure out what segment to add an
update or delete record to. There has to be some kind of
primary-key-to-segment mapping to make this work. The most obvious way to
do this is to partition segments by primary key. But segments are currently
partitioned by time, not primary key, and I suspect we wouldn't want to
change that. One reason is that I think we still want to target use cases
where there is a heavy time component, and time partitioning is ideal for
that. Another is that we do expect older records to eventually stop
receiving updates, and if we partition by time, it means most segments
won't be receiving updates, which is good (it limits the performance hit of
handling delta files to a small number of segments). So we'd need another
solution. I haven't thought of a perfect one. I think we'll probably have
to require that the input stream is partitioned by primary key, and take
advantage of that to split up the information that indexers will need in
order to map records to the correct segments. I think this area is the
biggest unknown.

- Assuming we keep partitioning by time, it means the update mechanism
would be able to update any field of a row except the timestamp (updating
the timestamp would invalidate partitioning and sorting assumptions). I'm
not sure if this would be generally OK or if it would rule out certain use
cases.

- I left out what an update record from the stream would look like. Do we
ask people to provide upserts (entire new record), or delta updates
(differences from the previous record)? Or we could decide to support both.
By the way: no matter how we accept incoming updates, I think we should
store them in delta files as entire new records. That way we don't have to
cross-check multiple files to see the latest record.

- This sketch of an implementation isn't actually real-time. It is able to
accept updates and deletes from a stream, but, there is a delay in actually
serving them (a delta file needs to be generated and pushed to
Historicals). One way to make the serving real-time too is for the indexers
to provide an API for historicals to fetch a stream of delta records that
are relevant to their segments. The historicals would need to use that
stream to achieve the same effect as a delta file. Presumably, whatever it
is they're doing with it would be replaced with the "official" delta files
when they're available.

- For correctness, we'll need some mechanism for a historical to know when
it has "enough" delta files to begin serving a segment. Otherwise you'll
see mysterious rollbacks when segments move from one historical to another.

- For efficiency, we'll need to compact delta files into the base segments.
We could use the existing compaction system for this.

- For performance, assuming we present the multifile thing as a single
StorageAdapter, we'll need to be able to generate selectors that are as
efficient as possible. This suggests we will need flags in the delta files
that say whether they use the same dictionary as the original segment or
not. If they do, we can generate a normal selector. If they don't then we
will presumably be allocating new dictionary IDs for new values. They won't
be sorted anymore, so we'll need to generate a selector where
ColumnCapabilities.areDictionaryValuesSorted returns false.

Would love to hear what people think.

Gian