You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@druid.apache.org by Yuanli Han <yu...@shopee.com> on 2020/05/11 13:33:18 UTC

Re: Streaming updates and deletes

Hi Gian, 

I appreciate you for the thoughts to make such an exciting improvement for Druid.

I would like to share my experience about implementing status updating based on Druid(0.14.x). Last year, I used Druid to analyze realtime logistics data of e-commerce. Imaging that there are several status updates until the order finally arriving customer's doorsteps.

I implemented the status updating functionality through a kind of workaround which introduces a user-defined StatusAggregator. The StatusAggregator handles a complex metric which contains a timestamp and  a status value. To achieve status updating, the aggregator always returns the status value paired with latest timestamp. It works well in my case and it's compatible with druid system through the extension API.

That's the simple use case I want to share. It may be a short term solution for some use cases.

Best regards,
Yuanli Han


On 2020/04/30 07:26:11, Gian Merlino <g....@apache.org> wrote: 
> Hey Druids,> 
> 
> Now that a join operator exists and is well on its way to being useful, I> 
> started thinking about some other pie in the sky ideas. In particular one> 
> that seems very useful is supporting updates and deletes.> 
> 
> Of course, we support updates and deletes today, but only on a> 
> whole-time-chunk basis (you need to rewrite the data for a time chunk in> 
> order to update or delete even a single row). Being able to do it based on> 
> streaming data would open up new use cases.> 
> 
> One that comes up a lot is financial stuff: imagine a dataset where a row> 
> is a transaction (with a transaction ID), and there is a transaction status> 
> field that might go through a few updates until it finally settles. Another> 
> is session analysis: imagine a dataset where a row is a session (with a> 
> session ID) and as new events come in for that session, you want to update> 
> things like the number of events for the session, session duration, etc.> 
> 
> What seems to be common are the following properties.> 
> 
> 1) Updates are done by primary key.> 
> 
> 2) The total number of primary keys is very large (could be in the> 
> trillions).> 
> 
> 3) Each individual key generally gets a small number of updates, relatively> 
> soon after it's inserted, and eventually stops receiving updates.> 
> 
> I don't have a real proposal yet nor the bandwidth to actually work on this> 
> right now. But I wanted to ask if anyone has thoughts on this potential> 
> approach that I think would work for streams with those properties.> 
> 
> Imagine something like this:> 
> 
> - We could encode an update as a delete followed by an insert. For this to> 
> work, both need to be associated with the segment that contained the> 
> original record.> 
> 
> - In deep storage: one segment, instead of being one zip file, is multiple> 
> zip files, identified by a fileNumber. The first file looks like segments> 
> do today. Subsequent files include a normal-looking smoosh file like> 
> today's zips, but also include a bitmap of deleted row IDs from the> 
> previous file. Each zip with a fileNumber > 0 encodes incremental deletes> 
> (in the deleted-row bitmap) and new inserts (in the regular smoosh file).> 
> Let's call these "delta files" and call the one with fileNumber 0 the> 
> "original file". The delta files only contain deletes and inserts for> 
> primary keys that are present in the original file.> 
> 
> - In the metadata store: we'll now have multiple rows per segment in the> 
> segment table (one for the original file and one per delta file). Same for> 
> the pending segment table. The datasource table (where Kafka topic> 
> checkpoints are stored) will need to associate checkpoints with the latest> 
> file number published for a given segment.> 
> 
> - On real-time indexers: they already have code paths that decide what> 
> segments to route incoming records to. We need to add an ability to route> 
> updates and deletes to new delta files for existing segments. But unlike> 
> regular segments, the indexers can't serve queries on top of the delta> 
> files. Queries must be served by the Historicals that are serving those> 
> segments (otherwises, the deltas won't get applied properly). So Indexers> 
> will be creating and publishing these files but not serving them live.> 
> 
> - On historicals: they need to fetch all available delta files from deep> 
> storage for the segments they're serving. They should probably expose the> 
> entire segment (including all delta files) as a single StorageAdapter, so> 
> the query stack doesn't have to change.> 
> 
> Some additional notes:> 
> 
> - I left out the part about how indexers figure out what segment to add an> 
> update or delete record to. There has to be some kind of> 
> primary-key-to-segment mapping to make this work. The most obvious way to> 
> do this is to partition segments by primary key. But segments are currently> 
> partitioned by time, not primary key, and I suspect we wouldn't want to> 
> change that. One reason is that I think we still want to target use cases> 
> where there is a heavy time component, and time partitioning is ideal for> 
> that. Another is that we do expect older records to eventually stop> 
> receiving updates, and if we partition by time, it means most segments> 
> won't be receiving updates, which is good (it limits the performance hit of> 
> handling delta files to a small number of segments). So we'd need another> 
> solution. I haven't thought of a perfect one. I think we'll probably have> 
> to require that the input stream is partitioned by primary key, and take> 
> advantage of that to split up the information that indexers will need in> 
> order to map records to the correct segments. I think this area is the> 
> biggest unknown.> 
> 
> - Assuming we keep partitioning by time, it means the update mechanism> 
> would be able to update any field of a row except the timestamp (updating> 
> the timestamp would invalidate partitioning and sorting assumptions). I'm> 
> not sure if this would be generally OK or if it would rule out certain use> 
> cases.> 
> 
> - I left out what an update record from the stream would look like. Do we> 
> ask people to provide upserts (entire new record), or delta updates> 
> (differences from the previous record)? Or we could decide to support both.> 
> By the way: no matter how we accept incoming updates, I think we should> 
> store them in delta files as entire new records. That way we don't have to> 
> cross-check multiple files to see the latest record.> 
> 
> - This sketch of an implementation isn't actually real-time. It is able to> 
> accept updates and deletes from a stream, but, there is a delay in actually> 
> serving them (a delta file needs to be generated and pushed to> 
> Historicals). One way to make the serving real-time too is for the indexers> 
> to provide an API for historicals to fetch a stream of delta records that> 
> are relevant to their segments. The historicals would need to use that> 
> stream to achieve the same effect as a delta file. Presumably, whatever it> 
> is they're doing with it would be replaced with the "official" delta files> 
> when they're available.> 
> 
> - For correctness, we'll need some mechanism for a historical to know when> 
> it has "enough" delta files to begin serving a segment. Otherwise you'll> 
> see mysterious rollbacks when segments move from one historical to another.> 
> 
> - For efficiency, we'll need to compact delta files into the base segments.> 
> We could use the existing compaction system for this.> 
> 
> - For performance, assuming we present the multifile thing as a single> 
> StorageAdapter, we'll need to be able to generate selectors that are as> 
> efficient as possible. This suggests we will need flags in the delta files> 
> that say whether they use the same dictionary as the original segment or> 
> not. If they do, we can generate a normal selector. If they don't then we> 
> will presumably be allocating new dictionary IDs for new values. They won't> 
> be sorted anymore, so we'll need to generate a selector where> 
> ColumnCapabilities.areDictionaryValuesSorted returns false.> 
> 
> Would love to hear what people think.> 
> 
> Gian> 
> 
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
For additional commands, e-mail: dev-help@druid.apache.org