You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Armin Schnabel <ar...@cyface.de> on 2022/10/05 11:25:32 UTC
Support Checkpointing in MongoDB Changestream Source(s)
Dear Flink community,
my Flink pipeline listens on inserts into MongoDB GridFS and processes the
inserted document which is then written into a MongoDB Sink again, which is
working so far.
Now I want to enable Checkpointing but after reading the documentations,
training slides, stackoverflow, archives, etc. for a day I still don't know
how to make our source support checkpointing.
I.e. my main question is:
1. How can I enable our source to be rewindable so that the Changestreams
are continued where the pipeline crashes? (see pipeline code below)
Side questions/notes:
2. Is 1. possible with reasonable amount of work or would you recommend
adding Kafka between Flink and MongoDB?
* (We're planning on replace MongoDB GridFS with Google Object Storage
soon, as our documents, currently being a few MB will become GB-large,
soon.)*
3. I noticed that there are existing community MongoDB Connectors, but I
don't think they're made for the Changestream use-case of ours:
a. [FLINK-6573] PR#20848
b. gihub/mongo-flink/mongo-flink
CODE:
Watch GridFS inserts on both collections, fs.files and fs.chunks with:
```
class MongoDBSource extends
RichSourceFunction<ChangeStreamDocument<Document>> {
void run(...) {
collection.watch(operationType: insert) .iterator()
.forEachRemaining(ctx::collect)
}
}
```
Join the two changestreams and process the events:
```
DataStream<XYZ> stream(...) {
chunks = new MongoDBSource(fs.chunks)
files = new MongoDBSource(fs.files)
env.addSource(chunks).addSource(files)
files.join(chunks).where(filesKeySelector).where(chunksFilesKeySelector)
.window(withGap(1s))
.apply(new JoinFunction() { join() { return new Tuple2<>(filesChange,
chunkChange) }
.process(new DocumentsMerger())
}
```
Load the files from GridFS to process it:
```
class DocumentsMerger extends new
ProcessFunction<Tuple2<ChangeStreamDocument<>, ChangeStreamDocument<>>,
XYZ> {
processElement(changeTupel, ctx, Collector<XYZ> out) {
// Wait for GridFS `fs.files` and all GridFS `fs.chunks` to be available
gridFSBucket.find(files_id)
if (measurementChunkIndex != numOfMeasurementChunks - 1) return
dataStream = gridFSBucket.openDownloadStream(files_id)
XYZ = dataStream.process()
out.collect(XYZ)
}
}
```
I hope this is the right place for this question, but I can also move this
question to Stackoverflow and reference the mailing list there if you want,
to be easier found by others.
Regards
Armin