You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Steven Wu <st...@gmail.com> on 2022/08/16 16:53:00 UTC

Sink V2 interface replacement for GlobalCommitter

In the V1 sink interface, there is a GlobalCommitter for Iceberg. With the
V2 sink interface,  GlobalCommitter has been deprecated by
WithPostCommitTopology. I thought the post commit stage is mainly for async
maintenance (like compaction).

Are we supposed to do sth similar to the GlobalCommittingSinkAdapter? It
seems like a temporary transition plan for bridging v1 sinks to v2
interfaces.

private class GlobalCommittingSinkAdapter extends TwoPhaseCommittingSinkAdapter
        implements WithPostCommitTopology<InputT, CommT> {
    @Override
    public void
addPostCommitTopology(DataStream<CommittableMessage<CommT>>
committables) {
       StandardSinkTopologies.addGlobalCommitter(
                committables,
                GlobalCommitterAdapter::new,
                () -> sink.getCommittableSerializer().get());
    }
}


In the Iceberg PR [1] for adopting the new sink interface, Liwei used the
"global" partitioner to force all committables go to a single committer
task 0. It will effectively force a global committer disguised in the
parallel committers. It is a little weird and also can lead to questions
why other committer tasks are not getting any messages. Plus, it will
disable the future capability of small file compaction stage post commit.
Hence, I am asking what is the right approach to achieve global committer
behavior.

Thanks,
Steven

[1] https://github.com/apache/iceberg/pull/4904/files#r946975047

Re: Sink V2 interface replacement for GlobalCommitter

Posted by Yun Gao <yu...@aliyun.com.INVALID>.
Hi Liwei,
From my side I think it might require case-by-case discussion. 
If there are missed information that blocked some scenarios, 
we may have dedicated issues / discussion for the scenario and
and have more in-depth discussion for these scenarios. 
Best,
Yun Gao
------------------------------------------------------------------
From:liwei li <hi...@gmail.com>
Send Time:2022 Oct. 10 (Mon.) 13:22
To:Steven Wu <st...@gmail.com>
Cc:Krzysztof Chmielewski <kr...@gmail.com>; Yun Gao <yu...@aliyun.com>; dev <de...@flink.apache.org>
Subject:Re: Sink V2 interface replacement for GlobalCommitter
Thanks for the discussion. 
Favor schemes that allow for custom committer parallelism. This will help us better use the new unified sink.
As an added point, in the current scheme, if I want to get some task information, such as jobid operatorid, in writer and commiter methods, it is not easy. flink does not expose this information. But this information is useful for saving snapshot information, naming files, and so on. Should we consider improving it? Of course. That's not what's in this thread, but I'm bringing it up for reference.
Thanks,
Liwei
On Thu, Sep 29, 2022 at 02:44 Steven Wu <stevenz3wu@gmail.com <mailto:stevenz3wu@gmail.com >> wrote:
Yun,
> `writer (parallelism = n) -> committer (parallelism = 1)`
Yes, a single-parallelism committer would work for Iceberg (and probably other similar global transaction storage). This is how Iceberg sink is implemented today.
> When we are refactoring the job finish process in FLIP-147 toensures all the records could be committed at the end of boundedstreaming job, we have to desert the support for the cascade commits, which makes the cascade commit of `committer -> global committer` not work in all cases. 
This is not a concern for Iceberg, as Iceberg doesn't need cascading "committers -> global committers".
Thanks,
Steven
On Wed, Sep 28, 2022 at 9:21 AM Yun Gao <yungao.gy@aliyun.com <mailto:yungao.gy@aliyun.com >> wrote:
Hi all,
Very sorry for the long delay for it took a bit of time to do the investigation. 
@Krzysztof 
For the current unexpected behavior of GlobalCommitter
after failover, it is indeed caused by the bugs in implementation,
I filed an issue[1] for these bugs and I think we may fix the issue 
for 1.15.x, 1.16.x and 1.17.x before the next minor releases.
@Steven @Krzysztof
For the long-term solution >= 1.17, for supporting changing the parallelism
of the Committer operator, I initially meant that we could set the
parallelism of the committer operator, thus it could support arbitrary parallelism:
1. For ordinary sink that does not require global committer, it is still `writer (parallelism = n) -> 
 committer (parallelism = n)`. 
2. If the committer is not required in the implementation of v1, now it could be
 reimplemented with `writer (parallelism = n) -> committer (parallelism = 1)`.
3. If the committer is required in the implementation of v1, now it has several ways
 to re-implement on v2, like `writer -> pre-committer topology to do the rename ->
 commiter (parallelism = 1)`.
Do you think this would be reasonable?
Also @Krzysztof I have the same question with Steven that is it possible directly write to
the formal files and skip the step of renaming? Since before the meta is written to the Delta Log
I guess the files are not visible to the users, thus it is safe to directly write to the formal files?
Best,
Yun Gao
[1] https://issues.apache.org/jira/browse/FLINK-29459 <https://issues.apache.org/jira/browse/FLINK-29459 >
------------------------------------------------------------------
From:Steven Wu <stevenz3wu@gmail.com <mailto:stevenz3wu@gmail.com >>
Send Time:2022 Sep. 14 (Wed.) 21:33
To:Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com >>
Cc:dev <dev@flink.apache.org <mailto:dev@flink.apache.org >>; Yun Gao <yungao.gy@aliyun.com <mailto:yungao.gy@aliyun.com >>; hililiwei <hililiwei@gmail.com <mailto:hililiwei@gmail.com >>
Subject:Re: Sink V2 interface replacement for GlobalCommitter
Krzysztof, no worries. We are discussing the same topic (how to support storage with globally transactional commits).
> In Delta Sink connector we actually use both Committer [1] and GlobalCommitter [2]. The former, since we are using Flink's Parquet file writers is doing a very simple job of "of renaming the hidden file to make it visible and removing from the name some 'in-progress file' marker". The GlobalCommitter is committing data to the Delta Log.
Curious if the writers can write the visible files directly (vs hidden files first then renamed by committer). Since there is a global committer to commit the data files when Flink checkpoint completes, job failure or restart shouldn't cause data file dups or loss. I probably missed some context here.
On Wed, Sep 14, 2022 at 5:20 AM Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com >> wrote:
Hi Yun,
Thanks for your input.
In Delta Sink connector we actually use both Committer [1] and GlobalCommitter [2]. The former, since we are using Flink's Parquet file writers is doing a very simple job of "of renaming the hidden file to make it visible and removing from the name some 'in-progress file' marker". The GlobalCommitter is committing data to the Delta Log.
With this design, having many instances of Committers actually has a benefit for us. Plus we would see some next features in our connector that would benefit from separate Committers with parallelism level higher than 1.
How I understood your suggestion Yun (and maybe It was a wrong interpretation) is to use both Committer and GlobalCommitter but to enforce parallelism level 1 on the former. The GlobalCommitter created by Flink's 1.15 SinkV1Adapter has parallelism 1 as expected and how it was in Flink < 1.15.
Anyways, I've play a little bit with the Flink code and I managed to achieved this [3]. After some additional changes which I will describe below, our test described in [4] passed without any data loss and no Exceptions thrown by Flink.
However changing parallelism of Committer operator to hardcoded value 1 was not enough and I had to do two more things:
1. add rebalance step (RebalancePartitioner) to graph between writer and committer since now they have different parallelism level and default partitioner was FORWARD that caused an exception to be thrown - BTW this is clear and understood
2. modify Flinks CommittableCollectorSerializer [5] and this is I believe an important thing.
The modification I had to made was caused by "Duplicate Key" exception from deserialize(int version, byte[] serialized) method from line 143 of [5] where we process a stream of SubtaskCommittableManager objects and collect it into to the Map. The map key is a subtaskId from SubtaskCommittableManager object.
After Task Manager recovery it may happen that List of SubtaskCommittableManager that is processed in that deserialize method will contain two SubtaskCommittableManager for the same subtask ID. What I did is that for such a case I call SubtaskCommittableManager .merge(...) method.
With those modifications our Delta test [7] started to pass on Flink 1.15.
I do not know whether setting parallelism level of the Committer to 1 is a right thing to do. Like I mentioned, Committer is doing some work in our Sink implementation and we might have more usage for it in next features we would like to add that would benefit from keeping parallelism level equal to writers count.
I still think there is some issue with the V2 architecture for topologies with GlobalCommitter and failover scenarios [4] and even that duplicated key in [5] described above is another case, maybe we should never have two entries for same subtaskId. That I don't know.
P.S.
Steven, apologies for hijacking the thread a little bit.
Thanks,
Krzysztof Chmielewski
[1] https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaCommitter.java <https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaCommitter.java >
[2] https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java <https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java >
[3] https://drive.google.com/file/d/1kU0R9nLZneJBDAkgNiaRc90dLGycyTec/view?usp=sharing <https://drive.google.com/file/d/1kU0R9nLZneJBDAkgNiaRc90dLGycyTec/view?usp=sharing >
[4] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc <https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc >
[5] https://github.com/apache/flink/blob/release-1.15/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java <https://github.com/apache/flink/blob/release-1.15/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java >
[7] https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java <https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java >
śr., 14 wrz 2022 o 05:26 Steven Wu <stevenz3wu@gmail.com <mailto:stevenz3wu@gmail.com >> napisał(a):
> setting the committer parallelism to 1.
Yun, setting the parallelism to 1 is essentially a global committer. That would work. not sure about the implications to other parts of the v2 sink interface.
On Tue, Sep 13, 2022 at 2:17 PM Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com >> wrote:
Hi Martijn
Could you clarify a little bit what do you mean by:
"The important part to remember is that this
topology is lagging one checkpoint behind in terms of fault-tolerance: it
only receives data once the committer committed"
What are the implications? 
Thanks,
Krzysztof Chmielewski
wt., 13 wrz 2022 o 09:57 Yun Gao <yu...@aliyun.com.invalid> napisał(a):
Hi,
 Very sorry for the late reply for being in the holiday. 
 And also very thanks for the discussion, it also reminds me 
 one more background on the change of the GlobalCommitter:
 When we are refactoring the job finish process in FLIP-147 to
 ensures all the records could be committed at the end of bounded
 streaming job, we have to desert the support for the cascade commits, 
 which makes the cascade commit of `committer -> global committer` not work
 in all cases. 
 For the current issues, one possible alternative option from my side is that we
 may support setting the committer parallelism to 1. Could this option solves
 the issue in the current scenarios? I'll also have a double check with if
 it could be implemented and the failed tests Krzysztof met. 
 Best,
 Yun
 ------------------------------------------------------------------
 From:Steven Wu <stevenz3wu@gmail.com <mailto:stevenz3wu@gmail.com >>
 Send Time:2022 Sep. 10 (Sat.) 11:31
 To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org >>
 Cc:Yun Gao <yungao.gy@aliyun.com <mailto:yungao.gy@aliyun.com >>; hililiwei <hililiwei@gmail.com <mailto:hililiwei@gmail.com >>
 Subject:Re: Sink V2 interface replacement for GlobalCommitter
 Martjin, thanks a lot for chiming in!
 Here are my concerns with adding GlobalCommitter in the PostCommitTopology 
 1. when we use TwoPhaseCommittingSink. We would need to create a noop/dummy committer. Actual Iceberg/DeltaLake commits happen in the PostCommit stage. The PostCommit stage should be doing some work after the commit (not for the commit).
 2. GlobalCommitter is marked as @deprecated. It will be removed at a certain point. What then?
 Thanks,
 Steven
 On Fri, Sep 9, 2022 at 1:20 PM Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com > <mailto:krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com > >> wrote:
 Thanks Martijn,
 I'm actually trying to run our V1 Delta connector on Flink 1.15 using
 SinkV1Adapter with GlobalCommitterOperator.
 Having said that, I might have found a potential issue with
 GlobalCommitterOperator, checkpoitining and failover recovery [1].
 For "normal" scenarios it does look good though.
 Regards,
 Krzysztof Chmielewski
 [1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc <https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc > <https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc <https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc > >
 pt., 9 wrz 2022 o 20:49 Martijn Visser <martijnvisser@apache.org <mailto:martijnvisser@apache.org > <mailto:martijnvisser@apache.org <mailto:martijnvisser@apache.org > >>
 napisał(a):
 > Hi all,
 >
 > A couple of bits from when work was being done on the new sink: V1 is
 > completely simulated as V2 [1]. V2 is strictly more expressive.
 >
 > If there's desire to stick to the `GlobalCommitter` interface, have a
 > look at the StandardSinkTopologies. Or you can just add your own more
 > fitting PostCommitTopology. The important part to remember is that this
 > topology is lagging one checkpoint behind in terms of fault-tolerance: it
 > only receives data once the committer committed
 > on notifyCheckpointComplete. Thus, the global committer needs to be
 > idempotent and able to restore the actual state on recovery. That
 > limitation is coming in from Flink's checkpointing behaviour and applies to
 > both V1 and V2. GlobalCommitterOperator is abstracting these issues along
 > with handling retries (so commits that happen much later). So it's probably
 > a good place to start just with the standard topology.
 >
 > Best regards,
 >
 > Martijn
 >
 > [1]
 >
 > https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370 <https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370 > <https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370 <https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370 > >
 >
 > Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski <
 > krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com > <mailto:krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com > >>:
 >
 > > Hi,
 > > Krzysztof Chmielewski [1] from Delta-Flink connector open source
 > community
 > > here [2].
 > >
 > > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is
 > > something exactly what Flink-Delta Sink needs since it is the place where
 > > we do an actual commit to the Delta Log which should be done from a one
 > > place/instance.
 > >
 > > Currently I'm evaluating V2 for our connector and having, how Steven
 > > described it a "more natural, built-in concept/support of GlobalCommitter
 > > in the sink v2 interface" would be greatly appreciated.
 > >
 > > Cheers,
 > > Krzysztof Chmielewski
 > >
 > > [1] https://github.com/kristoffSC <https://github.com/kristoffSC > <https://github.com/kristoffSC <https://github.com/kristoffSC > >
 > > [2] https://github.com/delta-io/connectors/tree/master/flink <https://github.com/delta-io/connectors/tree/master/flink > <https://github.com/delta-io/connectors/tree/master/flink <https://github.com/delta-io/connectors/tree/master/flink > >
 > >
 > > czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz3wu@gmail.com <mailto:stevenz3wu@gmail.com > <mailto:stevenz3wu@gmail.com <mailto:stevenz3wu@gmail.com > >> napisał(a):
 > >
 > > > Hi Yun,
 > > >
 > > > Thanks a lot for the reply!
 > > >
 > > > While we can add the global committer in the WithPostCommitTopology,
 > the
 > > > semantics are weird. The Commit stage actually didn't commit anything
 > to
 > > > the Iceberg table, and the PostCommit stage is where the Iceberg commit
 > > > happens.
 > > >
 > > > I just took a quick look at DeltaLake Flink sink. It still uses the V1
 > > sink
 > > > interface [1]. I think it might have the same issue when switching to
 > the
 > > > V2 sink interface.
 > > >
 > > > For data lake storages (like Iceberg, DeltaLake) or any storage with
 > > global
 > > > transactional commit, it would be more natural to have a built-in
 > > > concept/support of GlobalCommitter in the sink v2 interface.
 > > >
 > > > Thanks,
 > > > Steven
 > > >
 > > > [1]
 > > >
 > > >
 > >
 > https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java <https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java > <https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java <https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java > >
 > > >
 > > >
 > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao <yu...@aliyun.com.invalid>
 > > > wrote:
 > > >
 > > > > Hi Steven, Liwei,
 > > > > Very sorry for missing this mail and response very late.
 > > > > I think the initial thought is indeed to use `WithPostCommitTopology`
 > > as
 > > > > a replacement of the original GlobalCommitter, and currently the
 > > adapter
 > > > of
 > > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink V1
 > > > > interface
 > > > > onto an implementation of `WithPostCommitTopology`.
 > > > > Since `WithPostCommitTopology` supports arbitrary subgraph, thus It
 > > seems
 > > > > to
 > > > > me it could support both global committer and small file compaction?
 > We
 > > > > might
 > > > > have an `WithPostCommitTopology` implementation like
 > > > > DataStream ds = add global committer;
 > > > > if (enable file compaction) {
 > > > > build the compaction subgraph from ds
 > > > > }
 > > > > Best,
 > > > > Yun
 > > > > [1]
 > > > >
 > > >
 > >
 > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 <https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 > <https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 <https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 > >
 > > > > <
 > > > >
 > > >
 > >
 > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 <https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 > <https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 <https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 > >
 > > > > >
 > > > > ------------------------------------------------------------------
 > > > > From:Steven Wu <stevenz3wu@gmail.com <mailto:stevenz3wu@gmail.com > <mailto:stevenz3wu@gmail.com <mailto:stevenz3wu@gmail.com > >>
 > > > > Send Time:2022 Aug. 17 (Wed.) 07:30
 > > > > To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org > <mailto:dev@flink.apache.org <mailto:dev@flink.apache.org > >>; hililiwei <hililiwei@gmail.com <mailto:hililiwei@gmail.com > <mailto:hililiwei@gmail.com <mailto:hililiwei@gmail.com > >>
 > > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter
 > > > > > Plus, it will disable the future capability of small file
 > compaction
 > > > > stage post commit.
 > > > > I should clarify this comment. if we are using the
 > > > `WithPostCommitTopology`
 > > > > for global committer, we would lose the capability of using the post
 > > > commit
 > > > > stage for small files compaction.
 > > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <stevenz3wu@gmail.com <mailto:stevenz3wu@gmail.com > <mailto:stevenz3wu@gmail.com <mailto:stevenz3wu@gmail.com > >>
 > > wrote:
 > > > > >
 > > > > > In the V1 sink interface, there is a GlobalCommitter for Iceberg.
 > > With
 > > > > the
 > > > > > V2 sink interface, GlobalCommitter has been deprecated by
 > > > > > WithPostCommitTopology. I thought the post commit stage is mainly
 > for
 > > > > async
 > > > > > maintenance (like compaction).
 > > > > >
 > > > > > Are we supposed to do sth similar to the
 > GlobalCommittingSinkAdapter?
 > > > It
 > > > > > seems like a temporary transition plan for bridging v1 sinks to v2
 > > > > > interfaces.
 > > > > >
 > > > > > private class GlobalCommittingSinkAdapter extends
 > > > > TwoPhaseCommittingSinkAdapter
 > > > > > implements WithPostCommitTopology<InputT, CommT> {
 > > > > > @Override
 > > > > > public void
 > > addPostCommitTopology(DataStream<CommittableMessage<CommT>>
 > > > > committables) {
 > > > > > StandardSinkTopologies.addGlobalCommitter(
 > > > > > committables,
 > > > > > GlobalCommitterAdapter::new,
 > > > > > () -> sink.getCommittableSerializer().get());
 > > > > > }
 > > > > > }
 > > > > >
 > > > > >
 > > > > > In the Iceberg PR [1] for adopting the new sink interface, Liwei
 > used
 > > > the
 > > > > > "global" partitioner to force all committables go to a single
 > > committer
 > > > > > task 0. It will effectively force a global committer disguised in
 > the
 > > > > > parallel committers. It is a little weird and also can lead to
 > > > questions
 > > > > > why other committer tasks are not getting any messages. Plus, it
 > will
 > > > > > disable the future capability of small file compaction stage post
 > > > commit.
 > > > > > Hence, I am asking what is the right approach to achieve global
 > > > committer
 > > > > > behavior.
 > > > > >
 > > > > > Thanks,
 > > > > > Steven
 > > > > >
 > > > > > [1] https://github.com/apache/iceberg/pull/4904/files#r946975047 <https://github.com/apache/iceberg/pull/4904/files#r946975047 > <https://github.com/apache/iceberg/pull/4904/files#r946975047 <https://github.com/apache/iceberg/pull/4904/files#r946975047 > > <
 > > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 <https://github.com/apache/iceberg/pull/4904/files#r946975047 > <https://github.com/apache/iceberg/pull/4904/files#r946975047 <https://github.com/apache/iceberg/pull/4904/files#r946975047 > > >
 > > > > >
 > > > >
 > > >
 > >
 >
-- 
liwei li

Re: Sink V2 interface replacement for GlobalCommitter

Posted by Martijn Visser <ma...@apache.org>.
Hi Liwei,

I think it's indeed better to have a separate discussion thread on that.

Best regards,

Martijn

On Mon, Oct 10, 2022 at 7:22 AM liwei li <hi...@gmail.com> wrote:

> Thanks for the discussion.
>
> Favor schemes that allow for custom committer parallelism. This will help
> us better use the new unified sink.
>
> As an added point, in the current scheme, if I want to get some task
> information, such as jobid operatorid, in writer and commiter methods, it
> is not easy. flink does not expose this information. But this information
> is useful for saving snapshot information, naming files, and so on. Should
> we consider improving it? Of course. That's not what's in this thread, but
> I'm bringing it up for reference.
>
> Thanks,
> Liwei
>
> On Thu, Sep 29, 2022 at 02:44 Steven Wu <st...@gmail.com> wrote:
>
> > Yun,
> >
> > > `writer (parallelism = n) -> committer (parallelism = 1)`
> >
> > Yes, a single-parallelism committer would work for Iceberg (and probably
> > other similar global transaction storage). This is how Iceberg sink is
> > implemented today.
> >
> > > When we are refactoring the job finish process in FLIP-147 toensures
> > all the records could be committed at the end of boundedstreaming job, we
> > have to desert the support for the cascade commits, which makes the
> > cascade commit of `committer -> global committer` not work in all cases.
> >
> > This is not a concern for Iceberg, as Iceberg doesn't need cascading
> > "committers -> global committers".
> >
> > Thanks,
> > Steven
> >
> >
> > On Wed, Sep 28, 2022 at 9:21 AM Yun Gao <yu...@aliyun.com> wrote:
> >
> >> Hi all,
> >>
> >> Very sorry for the long delay for it took a bit of time to do the
> >> investigation.
> >>
> >> @Krzysztof
> >> For the current unexpected behavior of GlobalCommitter
> >> after failover, it is indeed caused by the bugs in implementation,
> >> I filed an issue[1] for these bugs and I think we may fix the issue
> >> for 1.15.x, 1.16.x and 1.17.x before the next minor releases.
> >>
> >> @Steven @Krzysztof
> >> For the long-term solution >= 1.17, for supporting changing the
> >> parallelism
> >> of the Committer operator, I initially meant that we could set the
> >> parallelism of the committer operator, thus it could support arbitrary
> >> parallelism:
> >>
> >> 1. For ordinary sink that does not require global committer, it is still
> >> `writer (parallelism = n) ->
> >>     committer (parallelism = n)`.
> >> 2. If the committer is not required in the implementation of v1, now it
> >> could be
> >>     reimplemented with `writer (parallelism = n) -> committer
> >> (parallelism = 1)`.
> >> 3. If the committer is required in the implementation of v1, now it has
> >> several ways
> >>     to re-implement on v2, like `writer -> pre-committer topology to do
> >> the rename ->
> >>     commiter  (parallelism = 1)`.
> >>
> >> Do you think this would be reasonable?
> >>
> >> Also @Krzysztof I have the same question with Steven that is it possible
> >> directly write to
> >> the formal files and skip the step of renaming? Since before the meta is
> >> written to the Delta Log
> >> I guess the files are not visible to the users, thus it is safe to
> >> directly write to the formal files?
> >>
> >> Best,
> >> Yun Gao
> >>
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-29459
> >>
> >>
> >>
> >> ------------------------------------------------------------------
> >> From:Steven Wu <st...@gmail.com>
> >> Send Time:2022 Sep. 14 (Wed.) 21:33
> >> To:Krzysztof Chmielewski <kr...@gmail.com>
> >> Cc:dev <de...@flink.apache.org>; Yun Gao <yu...@aliyun.com>;
> hililiwei
> >> <hi...@gmail.com>
> >> Subject:Re: Sink V2 interface replacement for GlobalCommitter
> >>
> >> Krzysztof, no worries. We are discussing the same topic (how to support
> >> storage with globally transactional commits).
> >>
> >> > In Delta Sink connector we actually use both Committer [1] and
> >> GlobalCommitter [2]. The former, since we are using Flink's Parquet file
> >> writers is doing a very simple job of "of renaming the hidden file to
> make
> >> it visible and removing from the name some 'in-progress file' marker".
> The
> >> GlobalCommitter is committing data to the Delta Log.
> >>
> >> Curious if the writers can write the visible files directly (vs hidden
> >> files first then renamed by committer). Since there is a global
> committer
> >> to commit the data files when Flink checkpoint completes, job failure or
> >> restart shouldn't cause data file dups or loss. I probably missed some
> >> context here.
> >>
> >> On Wed, Sep 14, 2022 at 5:20 AM Krzysztof Chmielewski <
> >> krzysiek.chmielewski@gmail.com> wrote:
> >> Hi Yun,
> >> Thanks for your input.
> >>
> >> In Delta Sink connector we actually use both Committer [1] and
> >> GlobalCommitter [2]. The former, since we are using Flink's Parquet file
> >> writers is doing a very simple job of "of renaming the hidden file to
> make
> >> it visible and removing from the name some 'in-progress file' marker".
> The
> >> GlobalCommitter is committing data to the Delta Log.
> >>
> >> With this design, having many instances of Committers actually has a
> >> benefit for us. Plus we would see some next features in our connector
> that
> >> would benefit from separate Committers with parallelism level higher
> than 1.
> >>
> >> How I understood your suggestion Yun (and maybe It was a wrong
> >> interpretation) is to use both Committer and GlobalCommitter but to
> enforce
> >> parallelism level 1 on the former. The GlobalCommitter created by
> Flink's
> >> 1.15 SinkV1Adapter has parallelism 1 as expected and how it was in
> Flink <
> >> 1.15.
> >>
> >> Anyways, I've play a little bit with the Flink code and I managed to
> >> achieved this [3]. After some additional changes which I will describe
> >> below, our test described in [4] passed without any data loss and no
> >> Exceptions thrown by Flink.
> >>
> >> However changing parallelism of Committer operator to hardcoded value 1
> >> was not enough and I had to do two more things:
> >> 1. add rebalance step (RebalancePartitioner) to graph between writer and
> >> committer since now they have different parallelism level and default
> >> partitioner was FORWARD that caused an exception to be thrown - BTW
> this is
> >> clear and understood
> >> 2. modify Flinks CommittableCollectorSerializer [5] and this is I
> believe
> >> an important thing.
> >>
> >> The modification I had to made was caused by "Duplicate Key" exception
> >> from deserialize(int version, byte[] serialized) method from line 143 of
> >> [5] where we process a stream of SubtaskCommittableManager objects and
> >> collect it into to the Map. The map key is a subtaskId
> >> from SubtaskCommittableManager object.
> >>
> >> After Task Manager recovery it may happen that List of
> >> SubtaskCommittableManager that is processed in that  deserialize method
> >> will contain two SubtaskCommittableManager for the same subtask ID.
> What I
> >> did is that for such a case I call SubtaskCommittableManager .merge(...)
> >> method.
> >>
> >> With those modifications our Delta test [7] started to pass on Flink
> 1.15.
> >>
> >> I do not know whether setting parallelism level of the Committer to 1 is
> >> a right thing to do. Like I mentioned, Committer is doing some work in
> our
> >> Sink implementation and we might have more usage for it in next
> features we
> >> would like to add that would benefit from keeping parallelism level
> equal
> >> to writers count.
> >>
> >> I still think there is some issue with the V2 architecture for
> topologies
> >> with GlobalCommitter and failover scenarios [4] and even that duplicated
> >> key in [5] described above is another case, maybe we should never have
> two
> >> entries for same subtaskId. That I don't know.
> >>
> >> P.S.
> >> Steven, apologies for hijacking the thread a little bit.
> >>
> >> Thanks,
> >> Krzysztof Chmielewski
> >>
> >> [1]
> >>
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaCommitter.java
> >> [2]
> >>
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
> >> [3]
> >>
> https://drive.google.com/file/d/1kU0R9nLZneJBDAkgNiaRc90dLGycyTec/view?usp=sharing
> >> [4] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc
> >> [5]
> >>
> https://github.com/apache/flink/blob/release-1.15/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
> >> [7]
> >>
> https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
> >>
> >> śr., 14 wrz 2022 o 05:26 Steven Wu <st...@gmail.com> napisał(a):
> >> > setting the committer parallelism to 1.
> >>
> >> Yun, setting the parallelism to 1 is essentially a global committer.
> That
> >> would work. not sure about the implications to other parts of the v2
> sink
> >> interface.
> >>
> >> On Tue, Sep 13, 2022 at 2:17 PM Krzysztof Chmielewski <
> >> krzysiek.chmielewski@gmail.com> wrote:
> >> Hi  Martijn
> >> Could you clarify a little bit what do you mean by:
> >>
> >> "The important part to remember is that this
> >> topology is lagging one checkpoint behind in terms of fault-tolerance:
> it
> >> only receives data once the committer committed"
> >>
> >> What are the implications?
> >>
> >> Thanks,
> >> Krzysztof Chmielewski
> >>
> >> wt., 13 wrz 2022 o 09:57 Yun Gao <yu...@aliyun.com.invalid>
> >> napisał(a):
> >> Hi,
> >> Very sorry for the late reply for being in the holiday.
> >> And also very thanks for the discussion, it also reminds me
> >> one more background on the change of the GlobalCommitter:
> >> When we are refactoring the job finish process in FLIP-147 to
> >> ensures all the records could be committed at the end of bounded
> >> streaming job, we have to desert the support for the cascade commits,
> >> which makes the cascade commit of `committer -> global committer` not
> work
> >> in all cases.
> >> For the current issues, one possible alternative option from my side is
> >> that we
> >> may support setting the committer parallelism to 1. Could this option
> >> solves
> >> the issue in the current scenarios? I'll also have a double check with
> if
> >> it could be implemented and the failed tests Krzysztof met.
> >> Best,
> >> Yun
> >> ------------------------------------------------------------------
> >> From:Steven Wu <st...@gmail.com>
> >> Send Time:2022 Sep. 10 (Sat.) 11:31
> >> To:dev <de...@flink.apache.org>
> >> Cc:Yun Gao <yu...@aliyun.com>; hililiwei <hi...@gmail.com>
> >> Subject:Re: Sink V2 interface replacement for GlobalCommitter
> >> Martjin, thanks a lot for chiming in!
> >> Here are my concerns with adding GlobalCommitter in the
> >> PostCommitTopology
> >> 1. when we use TwoPhaseCommittingSink. We would need to create a
> >> noop/dummy committer. Actual Iceberg/DeltaLake commits happen in the
> >> PostCommit stage. The PostCommit stage should be doing some work after
> the
> >> commit (not for the commit).
> >> 2. GlobalCommitter is marked as @deprecated. It will be removed at a
> >> certain point. What then?
> >> Thanks,
> >> Steven
> >> On Fri, Sep 9, 2022 at 1:20 PM Krzysztof Chmielewski <
> >> krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com
> >>
> >> wrote:
> >> Thanks Martijn,
> >>  I'm actually trying to run our V1 Delta connector on Flink 1.15 using
> >>  SinkV1Adapter with GlobalCommitterOperator.
> >>  Having said that, I might have found a potential issue with
> >>  GlobalCommitterOperator, checkpoitining and failover recovery [1].
> >>  For "normal" scenarios it does look good though.
> >>  Regards,
> >>  Krzysztof Chmielewski
> >>  [1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc <
> >> https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc >
> >>  pt., 9 wrz 2022 o 20:49 Martijn Visser <martijnvisser@apache.org
> >> <mailto:martijnvisser@apache.org >>
> >>  napisał(a):
> >>  > Hi all,
> >>  >
> >>  > A couple of bits from when work was being done on the new sink: V1 is
> >>  > completely simulated as V2 [1]. V2 is strictly more expressive.
> >>  >
> >>  > If there's desire to stick to the `GlobalCommitter` interface, have a
> >>  > look at the StandardSinkTopologies. Or you can just add your own more
> >>  > fitting PostCommitTopology. The important part to remember is that
> this
> >>  > topology is lagging one checkpoint behind in terms of
> fault-tolerance:
> >> it
> >>  > only receives data once the committer committed
> >>  > on notifyCheckpointComplete. Thus, the global committer needs to be
> >>  > idempotent and able to restore the actual state on recovery. That
> >>  > limitation is coming in from Flink's checkpointing behaviour and
> >> applies to
> >>  > both V1 and V2. GlobalCommitterOperator is abstracting these issues
> >> along
> >>  > with handling retries (so commits that happen much later). So it's
> >> probably
> >>  > a good place to start just with the standard topology.
> >>  >
> >>  > Best regards,
> >>  >
> >>  > Martijn
> >>  >
> >>  > [1]
> >>  >
> >>  >
> >>
> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
> >> <
> >>
> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
> >> >
> >>  >
> >>  > Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski <
> >>  > krzysiek.chmielewski@gmail.com <mailto:
> krzysiek.chmielewski@gmail.com
> >> >>:
> >>  >
> >>  > > Hi,
> >>  > > Krzysztof Chmielewski [1] from Delta-Flink connector open source
> >>  > community
> >>  > > here [2].
> >>  > >
> >>  > > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is
> >>  > > something exactly what Flink-Delta Sink needs since it is the place
> >> where
> >>  > > we do an actual commit to the Delta Log which should be done from a
> >> one
> >>  > > place/instance.
> >>  > >
> >>  > > Currently I'm evaluating V2 for our connector and having, how
> Steven
> >>  > > described it a "more natural, built-in concept/support of
> >> GlobalCommitter
> >>  > > in the sink v2 interface" would be greatly appreciated.
> >>  > >
> >>  > > Cheers,
> >>  > > Krzysztof Chmielewski
> >>  > >
> >>  > > [1] https://github.com/kristoffSC <https://github.com/kristoffSC >
> >>  > > [2] https://github.com/delta-io/connectors/tree/master/flink <
> >> https://github.com/delta-io/connectors/tree/master/flink >
> >>  > >
> >>  > > czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz3wu@gmail.com <mailto:
> >> stevenz3wu@gmail.com >> napisał(a):
> >>  > >
> >>  > > > Hi Yun,
> >>  > > >
> >>  > > > Thanks a lot for the reply!
> >>  > > >
> >>  > > > While we can add the global committer in the
> >> WithPostCommitTopology,
> >>  > the
> >>  > > > semantics are weird. The Commit stage actually didn't commit
> >> anything
> >>  > to
> >>  > > > the Iceberg table, and the PostCommit stage is where the Iceberg
> >> commit
> >>  > > > happens.
> >>  > > >
> >>  > > > I just took a quick look at DeltaLake Flink sink. It still uses
> >> the V1
> >>  > > sink
> >>  > > > interface [1]. I think it might have the same issue when
> switching
> >> to
> >>  > the
> >>  > > > V2 sink interface.
> >>  > > >
> >>  > > > For data lake storages (like Iceberg, DeltaLake) or any storage
> >> with
> >>  > > global
> >>  > > > transactional commit, it would be more natural to have a built-in
> >>  > > > concept/support of GlobalCommitter in the sink v2 interface.
> >>  > > >
> >>  > > > Thanks,
> >>  > > > Steven
> >>  > > >
> >>  > > > [1]
> >>  > > >
> >>  > > >
> >>  > >
> >>  >
> >>
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
> >> <
> >>
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
> >> >
> >>  > > >
> >>  > > >
> >>  > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao
> >> <yu...@aliyun.com.invalid>
> >>  > > > wrote:
> >>  > > >
> >>  > > > > Hi Steven, Liwei,
> >>  > > > > Very sorry for missing this mail and response very late.
> >>  > > > > I think the initial thought is indeed to use
> >> `WithPostCommitTopology`
> >>  > > as
> >>  > > > > a replacement of the original GlobalCommitter, and currently
> the
> >>  > > adapter
> >>  > > > of
> >>  > > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink
> >> V1
> >>  > > > > interface
> >>  > > > > onto an implementation of `WithPostCommitTopology`.
> >>  > > > > Since `WithPostCommitTopology` supports arbitrary subgraph,
> thus
> >> It
> >>  > > seems
> >>  > > > > to
> >>  > > > > me it could support both global committer and small file
> >> compaction?
> >>  > We
> >>  > > > > might
> >>  > > > > have an `WithPostCommitTopology` implementation like
> >>  > > > > DataStream ds = add global committer;
> >>  > > > > if (enable file compaction) {
> >>  > > > > build the compaction subgraph from ds
> >>  > > > > }
> >>  > > > > Best,
> >>  > > > > Yun
> >>  > > > > [1]
> >>  > > > >
> >>  > > >
> >>  > >
> >>  >
> >>
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> >> <
> >>
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> >> >
> >>  > > > > <
> >>  > > > >
> >>  > > >
> >>  > >
> >>  >
> >>
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> >> <
> >>
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> >> >
> >>  > > > > >
> >>  > > > >
> >> ------------------------------------------------------------------
> >>  > > > > From:Steven Wu <stevenz3wu@gmail.com <mailto:
> >> stevenz3wu@gmail.com >>
> >>  > > > > Send Time:2022 Aug. 17 (Wed.) 07:30
> >>  > > > > To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org >>;
> >> hililiwei <hililiwei@gmail.com <mailto:hililiwei@gmail.com >>
> >>  > > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter
> >>  > > > > > Plus, it will disable the future capability of small file
> >>  > compaction
> >>  > > > > stage post commit.
> >>  > > > > I should clarify this comment. if we are using the
> >>  > > > `WithPostCommitTopology`
> >>  > > > > for global committer, we would lose the capability of using the
> >> post
> >>  > > > commit
> >>  > > > > stage for small files compaction.
> >>  > > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <
> stevenz3wu@gmail.com
> >> <mailto:stevenz3wu@gmail.com >>
> >>  > > wrote:
> >>  > > > > >
> >>  > > > > > In the V1 sink interface, there is a GlobalCommitter for
> >> Iceberg.
> >>  > > With
> >>  > > > > the
> >>  > > > > > V2 sink interface, GlobalCommitter has been deprecated by
> >>  > > > > > WithPostCommitTopology. I thought the post commit stage is
> >> mainly
> >>  > for
> >>  > > > > async
> >>  > > > > > maintenance (like compaction).
> >>  > > > > >
> >>  > > > > > Are we supposed to do sth similar to the
> >>  > GlobalCommittingSinkAdapter?
> >>  > > > It
> >>  > > > > > seems like a temporary transition plan for bridging v1 sinks
> >> to v2
> >>  > > > > > interfaces.
> >>  > > > > >
> >>  > > > > > private class GlobalCommittingSinkAdapter extends
> >>  > > > > TwoPhaseCommittingSinkAdapter
> >>  > > > > > implements WithPostCommitTopology<InputT, CommT> {
> >>  > > > > > @Override
> >>  > > > > > public void
> >>  > > addPostCommitTopology(DataStream<CommittableMessage<CommT>>
> >>  > > > > committables) {
> >>  > > > > > StandardSinkTopologies.addGlobalCommitter(
> >>  > > > > > committables,
> >>  > > > > > GlobalCommitterAdapter::new,
> >>  > > > > > () -> sink.getCommittableSerializer().get());
> >>  > > > > > }
> >>  > > > > > }
> >>  > > > > >
> >>  > > > > >
> >>  > > > > > In the Iceberg PR [1] for adopting the new sink interface,
> >> Liwei
> >>  > used
> >>  > > > the
> >>  > > > > > "global" partitioner to force all committables go to a single
> >>  > > committer
> >>  > > > > > task 0. It will effectively force a global committer
> disguised
> >> in
> >>  > the
> >>  > > > > > parallel committers. It is a little weird and also can lead
> to
> >>  > > > questions
> >>  > > > > > why other committer tasks are not getting any messages. Plus,
> >> it
> >>  > will
> >>  > > > > > disable the future capability of small file compaction stage
> >> post
> >>  > > > commit.
> >>  > > > > > Hence, I am asking what is the right approach to achieve
> global
> >>  > > > committer
> >>  > > > > > behavior.
> >>  > > > > >
> >>  > > > > > Thanks,
> >>  > > > > > Steven
> >>  > > > > >
> >>  > > > > > [1]
> >> https://github.com/apache/iceberg/pull/4904/files#r946975047 <
> >> https://github.com/apache/iceberg/pull/4904/files#r946975047 > <
> >>  > > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 <
> >> https://github.com/apache/iceberg/pull/4904/files#r946975047 > >
> >>  > > > > >
> >>  > > > >
> >>  > > >
> >>  > >
> >>  >
> >>
> >>
> >> --
> liwei li
>

Re: Sink V2 interface replacement for GlobalCommitter

Posted by liwei li <hi...@gmail.com>.
Thanks for the discussion.

Favor schemes that allow for custom committer parallelism. This will help
us better use the new unified sink.

As an added point, in the current scheme, if I want to get some task
information, such as jobid operatorid, in writer and commiter methods, it
is not easy. flink does not expose this information. But this information
is useful for saving snapshot information, naming files, and so on. Should
we consider improving it? Of course. That's not what's in this thread, but
I'm bringing it up for reference.

Thanks,
Liwei

On Thu, Sep 29, 2022 at 02:44 Steven Wu <st...@gmail.com> wrote:

> Yun,
>
> > `writer (parallelism = n) -> committer (parallelism = 1)`
>
> Yes, a single-parallelism committer would work for Iceberg (and probably
> other similar global transaction storage). This is how Iceberg sink is
> implemented today.
>
> > When we are refactoring the job finish process in FLIP-147 toensures
> all the records could be committed at the end of boundedstreaming job, we
> have to desert the support for the cascade commits, which makes the
> cascade commit of `committer -> global committer` not work in all cases.
>
> This is not a concern for Iceberg, as Iceberg doesn't need cascading
> "committers -> global committers".
>
> Thanks,
> Steven
>
>
> On Wed, Sep 28, 2022 at 9:21 AM Yun Gao <yu...@aliyun.com> wrote:
>
>> Hi all,
>>
>> Very sorry for the long delay for it took a bit of time to do the
>> investigation.
>>
>> @Krzysztof
>> For the current unexpected behavior of GlobalCommitter
>> after failover, it is indeed caused by the bugs in implementation,
>> I filed an issue[1] for these bugs and I think we may fix the issue
>> for 1.15.x, 1.16.x and 1.17.x before the next minor releases.
>>
>> @Steven @Krzysztof
>> For the long-term solution >= 1.17, for supporting changing the
>> parallelism
>> of the Committer operator, I initially meant that we could set the
>> parallelism of the committer operator, thus it could support arbitrary
>> parallelism:
>>
>> 1. For ordinary sink that does not require global committer, it is still
>> `writer (parallelism = n) ->
>>     committer (parallelism = n)`.
>> 2. If the committer is not required in the implementation of v1, now it
>> could be
>>     reimplemented with `writer (parallelism = n) -> committer
>> (parallelism = 1)`.
>> 3. If the committer is required in the implementation of v1, now it has
>> several ways
>>     to re-implement on v2, like `writer -> pre-committer topology to do
>> the rename ->
>>     commiter  (parallelism = 1)`.
>>
>> Do you think this would be reasonable?
>>
>> Also @Krzysztof I have the same question with Steven that is it possible
>> directly write to
>> the formal files and skip the step of renaming? Since before the meta is
>> written to the Delta Log
>> I guess the files are not visible to the users, thus it is safe to
>> directly write to the formal files?
>>
>> Best,
>> Yun Gao
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-29459
>>
>>
>>
>> ------------------------------------------------------------------
>> From:Steven Wu <st...@gmail.com>
>> Send Time:2022 Sep. 14 (Wed.) 21:33
>> To:Krzysztof Chmielewski <kr...@gmail.com>
>> Cc:dev <de...@flink.apache.org>; Yun Gao <yu...@aliyun.com>; hililiwei
>> <hi...@gmail.com>
>> Subject:Re: Sink V2 interface replacement for GlobalCommitter
>>
>> Krzysztof, no worries. We are discussing the same topic (how to support
>> storage with globally transactional commits).
>>
>> > In Delta Sink connector we actually use both Committer [1] and
>> GlobalCommitter [2]. The former, since we are using Flink's Parquet file
>> writers is doing a very simple job of "of renaming the hidden file to make
>> it visible and removing from the name some 'in-progress file' marker". The
>> GlobalCommitter is committing data to the Delta Log.
>>
>> Curious if the writers can write the visible files directly (vs hidden
>> files first then renamed by committer). Since there is a global committer
>> to commit the data files when Flink checkpoint completes, job failure or
>> restart shouldn't cause data file dups or loss. I probably missed some
>> context here.
>>
>> On Wed, Sep 14, 2022 at 5:20 AM Krzysztof Chmielewski <
>> krzysiek.chmielewski@gmail.com> wrote:
>> Hi Yun,
>> Thanks for your input.
>>
>> In Delta Sink connector we actually use both Committer [1] and
>> GlobalCommitter [2]. The former, since we are using Flink's Parquet file
>> writers is doing a very simple job of "of renaming the hidden file to make
>> it visible and removing from the name some 'in-progress file' marker". The
>> GlobalCommitter is committing data to the Delta Log.
>>
>> With this design, having many instances of Committers actually has a
>> benefit for us. Plus we would see some next features in our connector that
>> would benefit from separate Committers with parallelism level higher than 1.
>>
>> How I understood your suggestion Yun (and maybe It was a wrong
>> interpretation) is to use both Committer and GlobalCommitter but to enforce
>> parallelism level 1 on the former. The GlobalCommitter created by Flink's
>> 1.15 SinkV1Adapter has parallelism 1 as expected and how it was in Flink <
>> 1.15.
>>
>> Anyways, I've play a little bit with the Flink code and I managed to
>> achieved this [3]. After some additional changes which I will describe
>> below, our test described in [4] passed without any data loss and no
>> Exceptions thrown by Flink.
>>
>> However changing parallelism of Committer operator to hardcoded value 1
>> was not enough and I had to do two more things:
>> 1. add rebalance step (RebalancePartitioner) to graph between writer and
>> committer since now they have different parallelism level and default
>> partitioner was FORWARD that caused an exception to be thrown - BTW this is
>> clear and understood
>> 2. modify Flinks CommittableCollectorSerializer [5] and this is I believe
>> an important thing.
>>
>> The modification I had to made was caused by "Duplicate Key" exception
>> from deserialize(int version, byte[] serialized) method from line 143 of
>> [5] where we process a stream of SubtaskCommittableManager objects and
>> collect it into to the Map. The map key is a subtaskId
>> from SubtaskCommittableManager object.
>>
>> After Task Manager recovery it may happen that List of
>> SubtaskCommittableManager that is processed in that  deserialize method
>> will contain two SubtaskCommittableManager for the same subtask ID. What I
>> did is that for such a case I call SubtaskCommittableManager .merge(...)
>> method.
>>
>> With those modifications our Delta test [7] started to pass on Flink 1.15.
>>
>> I do not know whether setting parallelism level of the Committer to 1 is
>> a right thing to do. Like I mentioned, Committer is doing some work in our
>> Sink implementation and we might have more usage for it in next features we
>> would like to add that would benefit from keeping parallelism level equal
>> to writers count.
>>
>> I still think there is some issue with the V2 architecture for topologies
>> with GlobalCommitter and failover scenarios [4] and even that duplicated
>> key in [5] described above is another case, maybe we should never have two
>> entries for same subtaskId. That I don't know.
>>
>> P.S.
>> Steven, apologies for hijacking the thread a little bit.
>>
>> Thanks,
>> Krzysztof Chmielewski
>>
>> [1]
>> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaCommitter.java
>> [2]
>> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
>> [3]
>> https://drive.google.com/file/d/1kU0R9nLZneJBDAkgNiaRc90dLGycyTec/view?usp=sharing
>> [4] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc
>> [5]
>> https://github.com/apache/flink/blob/release-1.15/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
>> [7]
>> https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
>>
>> śr., 14 wrz 2022 o 05:26 Steven Wu <st...@gmail.com> napisał(a):
>> > setting the committer parallelism to 1.
>>
>> Yun, setting the parallelism to 1 is essentially a global committer. That
>> would work. not sure about the implications to other parts of the v2 sink
>> interface.
>>
>> On Tue, Sep 13, 2022 at 2:17 PM Krzysztof Chmielewski <
>> krzysiek.chmielewski@gmail.com> wrote:
>> Hi  Martijn
>> Could you clarify a little bit what do you mean by:
>>
>> "The important part to remember is that this
>> topology is lagging one checkpoint behind in terms of fault-tolerance: it
>> only receives data once the committer committed"
>>
>> What are the implications?
>>
>> Thanks,
>> Krzysztof Chmielewski
>>
>> wt., 13 wrz 2022 o 09:57 Yun Gao <yu...@aliyun.com.invalid>
>> napisał(a):
>> Hi,
>> Very sorry for the late reply for being in the holiday.
>> And also very thanks for the discussion, it also reminds me
>> one more background on the change of the GlobalCommitter:
>> When we are refactoring the job finish process in FLIP-147 to
>> ensures all the records could be committed at the end of bounded
>> streaming job, we have to desert the support for the cascade commits,
>> which makes the cascade commit of `committer -> global committer` not work
>> in all cases.
>> For the current issues, one possible alternative option from my side is
>> that we
>> may support setting the committer parallelism to 1. Could this option
>> solves
>> the issue in the current scenarios? I'll also have a double check with if
>> it could be implemented and the failed tests Krzysztof met.
>> Best,
>> Yun
>> ------------------------------------------------------------------
>> From:Steven Wu <st...@gmail.com>
>> Send Time:2022 Sep. 10 (Sat.) 11:31
>> To:dev <de...@flink.apache.org>
>> Cc:Yun Gao <yu...@aliyun.com>; hililiwei <hi...@gmail.com>
>> Subject:Re: Sink V2 interface replacement for GlobalCommitter
>> Martjin, thanks a lot for chiming in!
>> Here are my concerns with adding GlobalCommitter in the
>> PostCommitTopology
>> 1. when we use TwoPhaseCommittingSink. We would need to create a
>> noop/dummy committer. Actual Iceberg/DeltaLake commits happen in the
>> PostCommit stage. The PostCommit stage should be doing some work after the
>> commit (not for the commit).
>> 2. GlobalCommitter is marked as @deprecated. It will be removed at a
>> certain point. What then?
>> Thanks,
>> Steven
>> On Fri, Sep 9, 2022 at 1:20 PM Krzysztof Chmielewski <
>> krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com >>
>> wrote:
>> Thanks Martijn,
>>  I'm actually trying to run our V1 Delta connector on Flink 1.15 using
>>  SinkV1Adapter with GlobalCommitterOperator.
>>  Having said that, I might have found a potential issue with
>>  GlobalCommitterOperator, checkpoitining and failover recovery [1].
>>  For "normal" scenarios it does look good though.
>>  Regards,
>>  Krzysztof Chmielewski
>>  [1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc <
>> https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc >
>>  pt., 9 wrz 2022 o 20:49 Martijn Visser <martijnvisser@apache.org
>> <mailto:martijnvisser@apache.org >>
>>  napisał(a):
>>  > Hi all,
>>  >
>>  > A couple of bits from when work was being done on the new sink: V1 is
>>  > completely simulated as V2 [1]. V2 is strictly more expressive.
>>  >
>>  > If there's desire to stick to the `GlobalCommitter` interface, have a
>>  > look at the StandardSinkTopologies. Or you can just add your own more
>>  > fitting PostCommitTopology. The important part to remember is that this
>>  > topology is lagging one checkpoint behind in terms of fault-tolerance:
>> it
>>  > only receives data once the committer committed
>>  > on notifyCheckpointComplete. Thus, the global committer needs to be
>>  > idempotent and able to restore the actual state on recovery. That
>>  > limitation is coming in from Flink's checkpointing behaviour and
>> applies to
>>  > both V1 and V2. GlobalCommitterOperator is abstracting these issues
>> along
>>  > with handling retries (so commits that happen much later). So it's
>> probably
>>  > a good place to start just with the standard topology.
>>  >
>>  > Best regards,
>>  >
>>  > Martijn
>>  >
>>  > [1]
>>  >
>>  >
>> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
>> <
>> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
>> >
>>  >
>>  > Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski <
>>  > krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com
>> >>:
>>  >
>>  > > Hi,
>>  > > Krzysztof Chmielewski [1] from Delta-Flink connector open source
>>  > community
>>  > > here [2].
>>  > >
>>  > > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is
>>  > > something exactly what Flink-Delta Sink needs since it is the place
>> where
>>  > > we do an actual commit to the Delta Log which should be done from a
>> one
>>  > > place/instance.
>>  > >
>>  > > Currently I'm evaluating V2 for our connector and having, how Steven
>>  > > described it a "more natural, built-in concept/support of
>> GlobalCommitter
>>  > > in the sink v2 interface" would be greatly appreciated.
>>  > >
>>  > > Cheers,
>>  > > Krzysztof Chmielewski
>>  > >
>>  > > [1] https://github.com/kristoffSC <https://github.com/kristoffSC >
>>  > > [2] https://github.com/delta-io/connectors/tree/master/flink <
>> https://github.com/delta-io/connectors/tree/master/flink >
>>  > >
>>  > > czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz3wu@gmail.com <mailto:
>> stevenz3wu@gmail.com >> napisał(a):
>>  > >
>>  > > > Hi Yun,
>>  > > >
>>  > > > Thanks a lot for the reply!
>>  > > >
>>  > > > While we can add the global committer in the
>> WithPostCommitTopology,
>>  > the
>>  > > > semantics are weird. The Commit stage actually didn't commit
>> anything
>>  > to
>>  > > > the Iceberg table, and the PostCommit stage is where the Iceberg
>> commit
>>  > > > happens.
>>  > > >
>>  > > > I just took a quick look at DeltaLake Flink sink. It still uses
>> the V1
>>  > > sink
>>  > > > interface [1]. I think it might have the same issue when switching
>> to
>>  > the
>>  > > > V2 sink interface.
>>  > > >
>>  > > > For data lake storages (like Iceberg, DeltaLake) or any storage
>> with
>>  > > global
>>  > > > transactional commit, it would be more natural to have a built-in
>>  > > > concept/support of GlobalCommitter in the sink v2 interface.
>>  > > >
>>  > > > Thanks,
>>  > > > Steven
>>  > > >
>>  > > > [1]
>>  > > >
>>  > > >
>>  > >
>>  >
>> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
>> <
>> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
>> >
>>  > > >
>>  > > >
>>  > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao
>> <yu...@aliyun.com.invalid>
>>  > > > wrote:
>>  > > >
>>  > > > > Hi Steven, Liwei,
>>  > > > > Very sorry for missing this mail and response very late.
>>  > > > > I think the initial thought is indeed to use
>> `WithPostCommitTopology`
>>  > > as
>>  > > > > a replacement of the original GlobalCommitter, and currently the
>>  > > adapter
>>  > > > of
>>  > > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink
>> V1
>>  > > > > interface
>>  > > > > onto an implementation of `WithPostCommitTopology`.
>>  > > > > Since `WithPostCommitTopology` supports arbitrary subgraph, thus
>> It
>>  > > seems
>>  > > > > to
>>  > > > > me it could support both global committer and small file
>> compaction?
>>  > We
>>  > > > > might
>>  > > > > have an `WithPostCommitTopology` implementation like
>>  > > > > DataStream ds = add global committer;
>>  > > > > if (enable file compaction) {
>>  > > > > build the compaction subgraph from ds
>>  > > > > }
>>  > > > > Best,
>>  > > > > Yun
>>  > > > > [1]
>>  > > > >
>>  > > >
>>  > >
>>  >
>> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
>> <
>> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
>> >
>>  > > > > <
>>  > > > >
>>  > > >
>>  > >
>>  >
>> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
>> <
>> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
>> >
>>  > > > > >
>>  > > > >
>> ------------------------------------------------------------------
>>  > > > > From:Steven Wu <stevenz3wu@gmail.com <mailto:
>> stevenz3wu@gmail.com >>
>>  > > > > Send Time:2022 Aug. 17 (Wed.) 07:30
>>  > > > > To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org >>;
>> hililiwei <hililiwei@gmail.com <mailto:hililiwei@gmail.com >>
>>  > > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter
>>  > > > > > Plus, it will disable the future capability of small file
>>  > compaction
>>  > > > > stage post commit.
>>  > > > > I should clarify this comment. if we are using the
>>  > > > `WithPostCommitTopology`
>>  > > > > for global committer, we would lose the capability of using the
>> post
>>  > > > commit
>>  > > > > stage for small files compaction.
>>  > > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <stevenz3wu@gmail.com
>> <mailto:stevenz3wu@gmail.com >>
>>  > > wrote:
>>  > > > > >
>>  > > > > > In the V1 sink interface, there is a GlobalCommitter for
>> Iceberg.
>>  > > With
>>  > > > > the
>>  > > > > > V2 sink interface, GlobalCommitter has been deprecated by
>>  > > > > > WithPostCommitTopology. I thought the post commit stage is
>> mainly
>>  > for
>>  > > > > async
>>  > > > > > maintenance (like compaction).
>>  > > > > >
>>  > > > > > Are we supposed to do sth similar to the
>>  > GlobalCommittingSinkAdapter?
>>  > > > It
>>  > > > > > seems like a temporary transition plan for bridging v1 sinks
>> to v2
>>  > > > > > interfaces.
>>  > > > > >
>>  > > > > > private class GlobalCommittingSinkAdapter extends
>>  > > > > TwoPhaseCommittingSinkAdapter
>>  > > > > > implements WithPostCommitTopology<InputT, CommT> {
>>  > > > > > @Override
>>  > > > > > public void
>>  > > addPostCommitTopology(DataStream<CommittableMessage<CommT>>
>>  > > > > committables) {
>>  > > > > > StandardSinkTopologies.addGlobalCommitter(
>>  > > > > > committables,
>>  > > > > > GlobalCommitterAdapter::new,
>>  > > > > > () -> sink.getCommittableSerializer().get());
>>  > > > > > }
>>  > > > > > }
>>  > > > > >
>>  > > > > >
>>  > > > > > In the Iceberg PR [1] for adopting the new sink interface,
>> Liwei
>>  > used
>>  > > > the
>>  > > > > > "global" partitioner to force all committables go to a single
>>  > > committer
>>  > > > > > task 0. It will effectively force a global committer disguised
>> in
>>  > the
>>  > > > > > parallel committers. It is a little weird and also can lead to
>>  > > > questions
>>  > > > > > why other committer tasks are not getting any messages. Plus,
>> it
>>  > will
>>  > > > > > disable the future capability of small file compaction stage
>> post
>>  > > > commit.
>>  > > > > > Hence, I am asking what is the right approach to achieve global
>>  > > > committer
>>  > > > > > behavior.
>>  > > > > >
>>  > > > > > Thanks,
>>  > > > > > Steven
>>  > > > > >
>>  > > > > > [1]
>> https://github.com/apache/iceberg/pull/4904/files#r946975047 <
>> https://github.com/apache/iceberg/pull/4904/files#r946975047 > <
>>  > > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 <
>> https://github.com/apache/iceberg/pull/4904/files#r946975047 > >
>>  > > > > >
>>  > > > >
>>  > > >
>>  > >
>>  >
>>
>>
>> --
liwei li

Re: Sink V2 interface replacement for GlobalCommitter

Posted by Steven Wu <st...@gmail.com>.
Yun,

> `writer (parallelism = n) -> committer (parallelism = 1)`

Yes, a single-parallelism committer would work for Iceberg (and probably
other similar global transaction storage). This is how Iceberg sink is
implemented today.

> When we are refactoring the job finish process in FLIP-147 toensures all
the records could be committed at the end of boundedstreaming job, we have
to desert the support for the cascade commits, which makes the cascade
commit of `committer -> global committer` not work in all cases.

This is not a concern for Iceberg, as Iceberg doesn't need cascading
"committers -> global committers".

Thanks,
Steven


On Wed, Sep 28, 2022 at 9:21 AM Yun Gao <yu...@aliyun.com> wrote:

> Hi all,
>
> Very sorry for the long delay for it took a bit of time to do the
> investigation.
>
> @Krzysztof
> For the current unexpected behavior of GlobalCommitter
> after failover, it is indeed caused by the bugs in implementation,
> I filed an issue[1] for these bugs and I think we may fix the issue
> for 1.15.x, 1.16.x and 1.17.x before the next minor releases.
>
> @Steven @Krzysztof
> For the long-term solution >= 1.17, for supporting changing the parallelism
> of the Committer operator, I initially meant that we could set the
> parallelism of the committer operator, thus it could support arbitrary
> parallelism:
>
> 1. For ordinary sink that does not require global committer, it is still
> `writer (parallelism = n) ->
>     committer (parallelism = n)`.
> 2. If the committer is not required in the implementation of v1, now it
> could be
>     reimplemented with `writer (parallelism = n) -> committer (parallelism
> = 1)`.
> 3. If the committer is required in the implementation of v1, now it has
> several ways
>     to re-implement on v2, like `writer -> pre-committer topology to do
> the rename ->
>     commiter  (parallelism = 1)`.
>
> Do you think this would be reasonable?
>
> Also @Krzysztof I have the same question with Steven that is it possible
> directly write to
> the formal files and skip the step of renaming? Since before the meta is
> written to the Delta Log
> I guess the files are not visible to the users, thus it is safe to
> directly write to the formal files?
>
> Best,
> Yun Gao
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-29459
>
>
>
> ------------------------------------------------------------------
> From:Steven Wu <st...@gmail.com>
> Send Time:2022 Sep. 14 (Wed.) 21:33
> To:Krzysztof Chmielewski <kr...@gmail.com>
> Cc:dev <de...@flink.apache.org>; Yun Gao <yu...@aliyun.com>; hililiwei <
> hililiwei@gmail.com>
> Subject:Re: Sink V2 interface replacement for GlobalCommitter
>
> Krzysztof, no worries. We are discussing the same topic (how to support
> storage with globally transactional commits).
>
> > In Delta Sink connector we actually use both Committer [1] and
> GlobalCommitter [2]. The former, since we are using Flink's Parquet file
> writers is doing a very simple job of "of renaming the hidden file to make
> it visible and removing from the name some 'in-progress file' marker". The
> GlobalCommitter is committing data to the Delta Log.
>
> Curious if the writers can write the visible files directly (vs hidden
> files first then renamed by committer). Since there is a global committer
> to commit the data files when Flink checkpoint completes, job failure or
> restart shouldn't cause data file dups or loss. I probably missed some
> context here.
>
> On Wed, Sep 14, 2022 at 5:20 AM Krzysztof Chmielewski <
> krzysiek.chmielewski@gmail.com> wrote:
> Hi Yun,
> Thanks for your input.
>
> In Delta Sink connector we actually use both Committer [1] and
> GlobalCommitter [2]. The former, since we are using Flink's Parquet file
> writers is doing a very simple job of "of renaming the hidden file to make
> it visible and removing from the name some 'in-progress file' marker". The
> GlobalCommitter is committing data to the Delta Log.
>
> With this design, having many instances of Committers actually has a
> benefit for us. Plus we would see some next features in our connector that
> would benefit from separate Committers with parallelism level higher than 1.
>
> How I understood your suggestion Yun (and maybe It was a wrong
> interpretation) is to use both Committer and GlobalCommitter but to enforce
> parallelism level 1 on the former. The GlobalCommitter created by Flink's
> 1.15 SinkV1Adapter has parallelism 1 as expected and how it was in Flink <
> 1.15.
>
> Anyways, I've play a little bit with the Flink code and I managed to
> achieved this [3]. After some additional changes which I will describe
> below, our test described in [4] passed without any data loss and no
> Exceptions thrown by Flink.
>
> However changing parallelism of Committer operator to hardcoded value 1
> was not enough and I had to do two more things:
> 1. add rebalance step (RebalancePartitioner) to graph between writer and
> committer since now they have different parallelism level and default
> partitioner was FORWARD that caused an exception to be thrown - BTW this is
> clear and understood
> 2. modify Flinks CommittableCollectorSerializer [5] and this is I believe
> an important thing.
>
> The modification I had to made was caused by "Duplicate Key" exception
> from deserialize(int version, byte[] serialized) method from line 143 of
> [5] where we process a stream of SubtaskCommittableManager objects and
> collect it into to the Map. The map key is a subtaskId
> from SubtaskCommittableManager object.
>
> After Task Manager recovery it may happen that List of
> SubtaskCommittableManager that is processed in that  deserialize method
> will contain two SubtaskCommittableManager for the same subtask ID. What I
> did is that for such a case I call SubtaskCommittableManager .merge(...)
> method.
>
> With those modifications our Delta test [7] started to pass on Flink 1.15.
>
> I do not know whether setting parallelism level of the Committer to 1 is a
> right thing to do. Like I mentioned, Committer is doing some work in our
> Sink implementation and we might have more usage for it in next features we
> would like to add that would benefit from keeping parallelism level equal
> to writers count.
>
> I still think there is some issue with the V2 architecture for topologies
> with GlobalCommitter and failover scenarios [4] and even that duplicated
> key in [5] described above is another case, maybe we should never have two
> entries for same subtaskId. That I don't know.
>
> P.S.
> Steven, apologies for hijacking the thread a little bit.
>
> Thanks,
> Krzysztof Chmielewski
>
> [1]
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaCommitter.java
> [2]
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
> [3]
> https://drive.google.com/file/d/1kU0R9nLZneJBDAkgNiaRc90dLGycyTec/view?usp=sharing
> [4] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc
> [5]
> https://github.com/apache/flink/blob/release-1.15/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
> [7]
> https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
>
> śr., 14 wrz 2022 o 05:26 Steven Wu <st...@gmail.com> napisał(a):
> > setting the committer parallelism to 1.
>
> Yun, setting the parallelism to 1 is essentially a global committer. That
> would work. not sure about the implications to other parts of the v2 sink
> interface.
>
> On Tue, Sep 13, 2022 at 2:17 PM Krzysztof Chmielewski <
> krzysiek.chmielewski@gmail.com> wrote:
> Hi  Martijn
> Could you clarify a little bit what do you mean by:
>
> "The important part to remember is that this
> topology is lagging one checkpoint behind in terms of fault-tolerance: it
> only receives data once the committer committed"
>
> What are the implications?
>
> Thanks,
> Krzysztof Chmielewski
>
> wt., 13 wrz 2022 o 09:57 Yun Gao <yu...@aliyun.com.invalid>
> napisał(a):
> Hi,
> Very sorry for the late reply for being in the holiday.
> And also very thanks for the discussion, it also reminds me
> one more background on the change of the GlobalCommitter:
> When we are refactoring the job finish process in FLIP-147 to
> ensures all the records could be committed at the end of bounded
> streaming job, we have to desert the support for the cascade commits,
> which makes the cascade commit of `committer -> global committer` not work
> in all cases.
> For the current issues, one possible alternative option from my side is
> that we
> may support setting the committer parallelism to 1. Could this option
> solves
> the issue in the current scenarios? I'll also have a double check with if
> it could be implemented and the failed tests Krzysztof met.
> Best,
> Yun
> ------------------------------------------------------------------
> From:Steven Wu <st...@gmail.com>
> Send Time:2022 Sep. 10 (Sat.) 11:31
> To:dev <de...@flink.apache.org>
> Cc:Yun Gao <yu...@aliyun.com>; hililiwei <hi...@gmail.com>
> Subject:Re: Sink V2 interface replacement for GlobalCommitter
> Martjin, thanks a lot for chiming in!
> Here are my concerns with adding GlobalCommitter in the PostCommitTopology
> 1. when we use TwoPhaseCommittingSink. We would need to create a
> noop/dummy committer. Actual Iceberg/DeltaLake commits happen in the
> PostCommit stage. The PostCommit stage should be doing some work after the
> commit (not for the commit).
> 2. GlobalCommitter is marked as @deprecated. It will be removed at a
> certain point. What then?
> Thanks,
> Steven
> On Fri, Sep 9, 2022 at 1:20 PM Krzysztof Chmielewski <
> krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com >>
> wrote:
> Thanks Martijn,
>  I'm actually trying to run our V1 Delta connector on Flink 1.15 using
>  SinkV1Adapter with GlobalCommitterOperator.
>  Having said that, I might have found a potential issue with
>  GlobalCommitterOperator, checkpoitining and failover recovery [1].
>  For "normal" scenarios it does look good though.
>  Regards,
>  Krzysztof Chmielewski
>  [1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc <
> https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc >
>  pt., 9 wrz 2022 o 20:49 Martijn Visser <martijnvisser@apache.org <mailto:
> martijnvisser@apache.org >>
>  napisał(a):
>  > Hi all,
>  >
>  > A couple of bits from when work was being done on the new sink: V1 is
>  > completely simulated as V2 [1]. V2 is strictly more expressive.
>  >
>  > If there's desire to stick to the `GlobalCommitter` interface, have a
>  > look at the StandardSinkTopologies. Or you can just add your own more
>  > fitting PostCommitTopology. The important part to remember is that this
>  > topology is lagging one checkpoint behind in terms of fault-tolerance:
> it
>  > only receives data once the committer committed
>  > on notifyCheckpointComplete. Thus, the global committer needs to be
>  > idempotent and able to restore the actual state on recovery. That
>  > limitation is coming in from Flink's checkpointing behaviour and
> applies to
>  > both V1 and V2. GlobalCommitterOperator is abstracting these issues
> along
>  > with handling retries (so commits that happen much later). So it's
> probably
>  > a good place to start just with the standard topology.
>  >
>  > Best regards,
>  >
>  > Martijn
>  >
>  > [1]
>  >
>  >
> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
> <
> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
> >
>  >
>  > Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski <
>  > krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com
> >>:
>  >
>  > > Hi,
>  > > Krzysztof Chmielewski [1] from Delta-Flink connector open source
>  > community
>  > > here [2].
>  > >
>  > > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is
>  > > something exactly what Flink-Delta Sink needs since it is the place
> where
>  > > we do an actual commit to the Delta Log which should be done from a
> one
>  > > place/instance.
>  > >
>  > > Currently I'm evaluating V2 for our connector and having, how Steven
>  > > described it a "more natural, built-in concept/support of
> GlobalCommitter
>  > > in the sink v2 interface" would be greatly appreciated.
>  > >
>  > > Cheers,
>  > > Krzysztof Chmielewski
>  > >
>  > > [1] https://github.com/kristoffSC <https://github.com/kristoffSC >
>  > > [2] https://github.com/delta-io/connectors/tree/master/flink <
> https://github.com/delta-io/connectors/tree/master/flink >
>  > >
>  > > czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz3wu@gmail.com <mailto:
> stevenz3wu@gmail.com >> napisał(a):
>  > >
>  > > > Hi Yun,
>  > > >
>  > > > Thanks a lot for the reply!
>  > > >
>  > > > While we can add the global committer in the WithPostCommitTopology,
>  > the
>  > > > semantics are weird. The Commit stage actually didn't commit
> anything
>  > to
>  > > > the Iceberg table, and the PostCommit stage is where the Iceberg
> commit
>  > > > happens.
>  > > >
>  > > > I just took a quick look at DeltaLake Flink sink. It still uses the
> V1
>  > > sink
>  > > > interface [1]. I think it might have the same issue when switching
> to
>  > the
>  > > > V2 sink interface.
>  > > >
>  > > > For data lake storages (like Iceberg, DeltaLake) or any storage with
>  > > global
>  > > > transactional commit, it would be more natural to have a built-in
>  > > > concept/support of GlobalCommitter in the sink v2 interface.
>  > > >
>  > > > Thanks,
>  > > > Steven
>  > > >
>  > > > [1]
>  > > >
>  > > >
>  > >
>  >
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
> <
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
> >
>  > > >
>  > > >
>  > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao <yungao.gy@aliyun.com.invalid
> >
>  > > > wrote:
>  > > >
>  > > > > Hi Steven, Liwei,
>  > > > > Very sorry for missing this mail and response very late.
>  > > > > I think the initial thought is indeed to use
> `WithPostCommitTopology`
>  > > as
>  > > > > a replacement of the original GlobalCommitter, and currently the
>  > > adapter
>  > > > of
>  > > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink V1
>  > > > > interface
>  > > > > onto an implementation of `WithPostCommitTopology`.
>  > > > > Since `WithPostCommitTopology` supports arbitrary subgraph, thus
> It
>  > > seems
>  > > > > to
>  > > > > me it could support both global committer and small file
> compaction?
>  > We
>  > > > > might
>  > > > > have an `WithPostCommitTopology` implementation like
>  > > > > DataStream ds = add global committer;
>  > > > > if (enable file compaction) {
>  > > > > build the compaction subgraph from ds
>  > > > > }
>  > > > > Best,
>  > > > > Yun
>  > > > > [1]
>  > > > >
>  > > >
>  > >
>  >
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> <
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> >
>  > > > > <
>  > > > >
>  > > >
>  > >
>  >
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> <
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> >
>  > > > > >
>  > > > > ------------------------------------------------------------------
>  > > > > From:Steven Wu <stevenz3wu@gmail.com <mailto:stevenz3wu@gmail.com
> >>
>  > > > > Send Time:2022 Aug. 17 (Wed.) 07:30
>  > > > > To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org >>;
> hililiwei <hililiwei@gmail.com <mailto:hililiwei@gmail.com >>
>  > > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter
>  > > > > > Plus, it will disable the future capability of small file
>  > compaction
>  > > > > stage post commit.
>  > > > > I should clarify this comment. if we are using the
>  > > > `WithPostCommitTopology`
>  > > > > for global committer, we would lose the capability of using the
> post
>  > > > commit
>  > > > > stage for small files compaction.
>  > > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <stevenz3wu@gmail.com
> <mailto:stevenz3wu@gmail.com >>
>  > > wrote:
>  > > > > >
>  > > > > > In the V1 sink interface, there is a GlobalCommitter for
> Iceberg.
>  > > With
>  > > > > the
>  > > > > > V2 sink interface, GlobalCommitter has been deprecated by
>  > > > > > WithPostCommitTopology. I thought the post commit stage is
> mainly
>  > for
>  > > > > async
>  > > > > > maintenance (like compaction).
>  > > > > >
>  > > > > > Are we supposed to do sth similar to the
>  > GlobalCommittingSinkAdapter?
>  > > > It
>  > > > > > seems like a temporary transition plan for bridging v1 sinks to
> v2
>  > > > > > interfaces.
>  > > > > >
>  > > > > > private class GlobalCommittingSinkAdapter extends
>  > > > > TwoPhaseCommittingSinkAdapter
>  > > > > > implements WithPostCommitTopology<InputT, CommT> {
>  > > > > > @Override
>  > > > > > public void
>  > > addPostCommitTopology(DataStream<CommittableMessage<CommT>>
>  > > > > committables) {
>  > > > > > StandardSinkTopologies.addGlobalCommitter(
>  > > > > > committables,
>  > > > > > GlobalCommitterAdapter::new,
>  > > > > > () -> sink.getCommittableSerializer().get());
>  > > > > > }
>  > > > > > }
>  > > > > >
>  > > > > >
>  > > > > > In the Iceberg PR [1] for adopting the new sink interface, Liwei
>  > used
>  > > > the
>  > > > > > "global" partitioner to force all committables go to a single
>  > > committer
>  > > > > > task 0. It will effectively force a global committer disguised
> in
>  > the
>  > > > > > parallel committers. It is a little weird and also can lead to
>  > > > questions
>  > > > > > why other committer tasks are not getting any messages. Plus, it
>  > will
>  > > > > > disable the future capability of small file compaction stage
> post
>  > > > commit.
>  > > > > > Hence, I am asking what is the right approach to achieve global
>  > > > committer
>  > > > > > behavior.
>  > > > > >
>  > > > > > Thanks,
>  > > > > > Steven
>  > > > > >
>  > > > > > [1]
> https://github.com/apache/iceberg/pull/4904/files#r946975047 <
> https://github.com/apache/iceberg/pull/4904/files#r946975047 > <
>  > > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 <
> https://github.com/apache/iceberg/pull/4904/files#r946975047 > >
>  > > > > >
>  > > > >
>  > > >
>  > >
>  >
>
>
>

Re: Sink V2 interface replacement for GlobalCommitter

Posted by Yun Gao <yu...@aliyun.com.INVALID>.
Hi all,
Very sorry for the long delay for it took a bit of time to do the investigation. 
@Krzysztof 
For the current unexpected behavior of GlobalCommitter
after failover, it is indeed caused by the bugs in implementation,
I filed an issue[1] for these bugs and I think we may fix the issue 
for 1.15.x, 1.16.x and 1.17.x before the next minor releases.
@Steven @Krzysztof
For the long-term solution >= 1.17, for supporting changing the parallelism
of the Committer operator, I initially meant that we could set the
parallelism of the committer operator, thus it could support arbitrary parallelism:
1. For ordinary sink that does not require global committer, it is still `writer (parallelism = n) -> 
 committer (parallelism = n)`. 
2. If the committer is not required in the implementation of v1, now it could be
 reimplemented with `writer (parallelism = n) -> committer (parallelism = 1)`.
3. If the committer is required in the implementation of v1, now it has several ways
 to re-implement on v2, like `writer -> pre-committer topology to do the rename ->
 commiter (parallelism = 1)`.
Do you think this would be reasonable?
Also @Krzysztof I have the same question with Steven that is it possible directly write to
the formal files and skip the step of renaming? Since before the meta is written to the Delta Log
I guess the files are not visible to the users, thus it is safe to directly write to the formal files?
Best,
Yun Gao
[1] https://issues.apache.org/jira/browse/FLINK-29459 <https://issues.apache.org/jira/browse/FLINK-29459 >
------------------------------------------------------------------
From:Steven Wu <st...@gmail.com>
Send Time:2022 Sep. 14 (Wed.) 21:33
To:Krzysztof Chmielewski <kr...@gmail.com>
Cc:dev <de...@flink.apache.org>; Yun Gao <yu...@aliyun.com>; hililiwei <hi...@gmail.com>
Subject:Re: Sink V2 interface replacement for GlobalCommitter
Krzysztof, no worries. We are discussing the same topic (how to support storage with globally transactional commits).
> In Delta Sink connector we actually use both Committer [1] and GlobalCommitter [2]. The former, since we are using Flink's Parquet file writers is doing a very simple job of "of renaming the hidden file to make it visible and removing from the name some 'in-progress file' marker". The GlobalCommitter is committing data to the Delta Log.
Curious if the writers can write the visible files directly (vs hidden files first then renamed by committer). Since there is a global committer to commit the data files when Flink checkpoint completes, job failure or restart shouldn't cause data file dups or loss. I probably missed some context here.
On Wed, Sep 14, 2022 at 5:20 AM Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com >> wrote:
Hi Yun,
Thanks for your input.
In Delta Sink connector we actually use both Committer [1] and GlobalCommitter [2]. The former, since we are using Flink's Parquet file writers is doing a very simple job of "of renaming the hidden file to make it visible and removing from the name some 'in-progress file' marker". The GlobalCommitter is committing data to the Delta Log.
With this design, having many instances of Committers actually has a benefit for us. Plus we would see some next features in our connector that would benefit from separate Committers with parallelism level higher than 1.
How I understood your suggestion Yun (and maybe It was a wrong interpretation) is to use both Committer and GlobalCommitter but to enforce parallelism level 1 on the former. The GlobalCommitter created by Flink's 1.15 SinkV1Adapter has parallelism 1 as expected and how it was in Flink < 1.15.
Anyways, I've play a little bit with the Flink code and I managed to achieved this [3]. After some additional changes which I will describe below, our test described in [4] passed without any data loss and no Exceptions thrown by Flink.
However changing parallelism of Committer operator to hardcoded value 1 was not enough and I had to do two more things:
1. add rebalance step (RebalancePartitioner) to graph between writer and committer since now they have different parallelism level and default partitioner was FORWARD that caused an exception to be thrown - BTW this is clear and understood
2. modify Flinks CommittableCollectorSerializer [5] and this is I believe an important thing.
The modification I had to made was caused by "Duplicate Key" exception from deserialize(int version, byte[] serialized) method from line 143 of [5] where we process a stream of SubtaskCommittableManager objects and collect it into to the Map. The map key is a subtaskId from SubtaskCommittableManager object.
After Task Manager recovery it may happen that List of SubtaskCommittableManager that is processed in that deserialize method will contain two SubtaskCommittableManager for the same subtask ID. What I did is that for such a case I call SubtaskCommittableManager .merge(...) method.
With those modifications our Delta test [7] started to pass on Flink 1.15.
I do not know whether setting parallelism level of the Committer to 1 is a right thing to do. Like I mentioned, Committer is doing some work in our Sink implementation and we might have more usage for it in next features we would like to add that would benefit from keeping parallelism level equal to writers count.
I still think there is some issue with the V2 architecture for topologies with GlobalCommitter and failover scenarios [4] and even that duplicated key in [5] described above is another case, maybe we should never have two entries for same subtaskId. That I don't know.
P.S.
Steven, apologies for hijacking the thread a little bit.
Thanks,
Krzysztof Chmielewski
[1] https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaCommitter.java <https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaCommitter.java >
[2] https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java <https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java >
[3] https://drive.google.com/file/d/1kU0R9nLZneJBDAkgNiaRc90dLGycyTec/view?usp=sharing <https://drive.google.com/file/d/1kU0R9nLZneJBDAkgNiaRc90dLGycyTec/view?usp=sharing >
[4] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc <https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc >
[5] https://github.com/apache/flink/blob/release-1.15/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java <https://github.com/apache/flink/blob/release-1.15/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java >
[7] https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java <https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java >
śr., 14 wrz 2022 o 05:26 Steven Wu <stevenz3wu@gmail.com <mailto:stevenz3wu@gmail.com >> napisał(a):
> setting the committer parallelism to 1.
Yun, setting the parallelism to 1 is essentially a global committer. That would work. not sure about the implications to other parts of the v2 sink interface.
On Tue, Sep 13, 2022 at 2:17 PM Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com >> wrote:
Hi Martijn
Could you clarify a little bit what do you mean by:
"The important part to remember is that this
topology is lagging one checkpoint behind in terms of fault-tolerance: it
only receives data once the committer committed"
What are the implications? 
Thanks,
Krzysztof Chmielewski
wt., 13 wrz 2022 o 09:57 Yun Gao <yu...@aliyun.com.invalid> napisał(a):
Hi,
 Very sorry for the late reply for being in the holiday. 
 And also very thanks for the discussion, it also reminds me 
 one more background on the change of the GlobalCommitter:
 When we are refactoring the job finish process in FLIP-147 to
 ensures all the records could be committed at the end of bounded
 streaming job, we have to desert the support for the cascade commits, 
 which makes the cascade commit of `committer -> global committer` not work
 in all cases. 
 For the current issues, one possible alternative option from my side is that we
 may support setting the committer parallelism to 1. Could this option solves
 the issue in the current scenarios? I'll also have a double check with if
 it could be implemented and the failed tests Krzysztof met. 
 Best,
 Yun
 ------------------------------------------------------------------
 From:Steven Wu <stevenz3wu@gmail.com <mailto:stevenz3wu@gmail.com >>
 Send Time:2022 Sep. 10 (Sat.) 11:31
 To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org >>
 Cc:Yun Gao <yungao.gy@aliyun.com <mailto:yungao.gy@aliyun.com >>; hililiwei <hililiwei@gmail.com <mailto:hililiwei@gmail.com >>
 Subject:Re: Sink V2 interface replacement for GlobalCommitter
 Martjin, thanks a lot for chiming in!
 Here are my concerns with adding GlobalCommitter in the PostCommitTopology 
 1. when we use TwoPhaseCommittingSink. We would need to create a noop/dummy committer. Actual Iceberg/DeltaLake commits happen in the PostCommit stage. The PostCommit stage should be doing some work after the commit (not for the commit).
 2. GlobalCommitter is marked as @deprecated. It will be removed at a certain point. What then?
 Thanks,
 Steven
 On Fri, Sep 9, 2022 at 1:20 PM Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com > <mailto:krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com > >> wrote:
 Thanks Martijn,
 I'm actually trying to run our V1 Delta connector on Flink 1.15 using
 SinkV1Adapter with GlobalCommitterOperator.
 Having said that, I might have found a potential issue with
 GlobalCommitterOperator, checkpoitining and failover recovery [1].
 For "normal" scenarios it does look good though.
 Regards,
 Krzysztof Chmielewski
 [1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc <https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc > <https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc <https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc > >
 pt., 9 wrz 2022 o 20:49 Martijn Visser <martijnvisser@apache.org <mailto:martijnvisser@apache.org > <mailto:martijnvisser@apache.org <mailto:martijnvisser@apache.org > >>
 napisał(a):
 > Hi all,
 >
 > A couple of bits from when work was being done on the new sink: V1 is
 > completely simulated as V2 [1]. V2 is strictly more expressive.
 >
 > If there's desire to stick to the `GlobalCommitter` interface, have a
 > look at the StandardSinkTopologies. Or you can just add your own more
 > fitting PostCommitTopology. The important part to remember is that this
 > topology is lagging one checkpoint behind in terms of fault-tolerance: it
 > only receives data once the committer committed
 > on notifyCheckpointComplete. Thus, the global committer needs to be
 > idempotent and able to restore the actual state on recovery. That
 > limitation is coming in from Flink's checkpointing behaviour and applies to
 > both V1 and V2. GlobalCommitterOperator is abstracting these issues along
 > with handling retries (so commits that happen much later). So it's probably
 > a good place to start just with the standard topology.
 >
 > Best regards,
 >
 > Martijn
 >
 > [1]
 >
 > https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370 <https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370 > <https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370 <https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370 > >
 >
 > Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski <
 > krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com > <mailto:krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com > >>:
 >
 > > Hi,
 > > Krzysztof Chmielewski [1] from Delta-Flink connector open source
 > community
 > > here [2].
 > >
 > > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is
 > > something exactly what Flink-Delta Sink needs since it is the place where
 > > we do an actual commit to the Delta Log which should be done from a one
 > > place/instance.
 > >
 > > Currently I'm evaluating V2 for our connector and having, how Steven
 > > described it a "more natural, built-in concept/support of GlobalCommitter
 > > in the sink v2 interface" would be greatly appreciated.
 > >
 > > Cheers,
 > > Krzysztof Chmielewski
 > >
 > > [1] https://github.com/kristoffSC <https://github.com/kristoffSC > <https://github.com/kristoffSC <https://github.com/kristoffSC > >
 > > [2] https://github.com/delta-io/connectors/tree/master/flink <https://github.com/delta-io/connectors/tree/master/flink > <https://github.com/delta-io/connectors/tree/master/flink <https://github.com/delta-io/connectors/tree/master/flink > >
 > >
 > > czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz3wu@gmail.com <mailto:stevenz3wu@gmail.com > <mailto:stevenz3wu@gmail.com <mailto:stevenz3wu@gmail.com > >> napisał(a):
 > >
 > > > Hi Yun,
 > > >
 > > > Thanks a lot for the reply!
 > > >
 > > > While we can add the global committer in the WithPostCommitTopology,
 > the
 > > > semantics are weird. The Commit stage actually didn't commit anything
 > to
 > > > the Iceberg table, and the PostCommit stage is where the Iceberg commit
 > > > happens.
 > > >
 > > > I just took a quick look at DeltaLake Flink sink. It still uses the V1
 > > sink
 > > > interface [1]. I think it might have the same issue when switching to
 > the
 > > > V2 sink interface.
 > > >
 > > > For data lake storages (like Iceberg, DeltaLake) or any storage with
 > > global
 > > > transactional commit, it would be more natural to have a built-in
 > > > concept/support of GlobalCommitter in the sink v2 interface.
 > > >
 > > > Thanks,
 > > > Steven
 > > >
 > > > [1]
 > > >
 > > >
 > >
 > https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java <https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java > <https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java <https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java > >
 > > >
 > > >
 > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao <yu...@aliyun.com.invalid>
 > > > wrote:
 > > >
 > > > > Hi Steven, Liwei,
 > > > > Very sorry for missing this mail and response very late.
 > > > > I think the initial thought is indeed to use `WithPostCommitTopology`
 > > as
 > > > > a replacement of the original GlobalCommitter, and currently the
 > > adapter
 > > > of
 > > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink V1
 > > > > interface
 > > > > onto an implementation of `WithPostCommitTopology`.
 > > > > Since `WithPostCommitTopology` supports arbitrary subgraph, thus It
 > > seems
 > > > > to
 > > > > me it could support both global committer and small file compaction?
 > We
 > > > > might
 > > > > have an `WithPostCommitTopology` implementation like
 > > > > DataStream ds = add global committer;
 > > > > if (enable file compaction) {
 > > > > build the compaction subgraph from ds
 > > > > }
 > > > > Best,
 > > > > Yun
 > > > > [1]
 > > > >
 > > >
 > >
 > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 <https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 > <https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 <https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 > >
 > > > > <
 > > > >
 > > >
 > >
 > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 <https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 > <https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 <https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 > >
 > > > > >
 > > > > ------------------------------------------------------------------
 > > > > From:Steven Wu <stevenz3wu@gmail.com <mailto:stevenz3wu@gmail.com > <mailto:stevenz3wu@gmail.com <mailto:stevenz3wu@gmail.com > >>
 > > > > Send Time:2022 Aug. 17 (Wed.) 07:30
 > > > > To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org > <mailto:dev@flink.apache.org <mailto:dev@flink.apache.org > >>; hililiwei <hililiwei@gmail.com <mailto:hililiwei@gmail.com > <mailto:hililiwei@gmail.com <mailto:hililiwei@gmail.com > >>
 > > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter
 > > > > > Plus, it will disable the future capability of small file
 > compaction
 > > > > stage post commit.
 > > > > I should clarify this comment. if we are using the
 > > > `WithPostCommitTopology`
 > > > > for global committer, we would lose the capability of using the post
 > > > commit
 > > > > stage for small files compaction.
 > > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <stevenz3wu@gmail.com <mailto:stevenz3wu@gmail.com > <mailto:stevenz3wu@gmail.com <mailto:stevenz3wu@gmail.com > >>
 > > wrote:
 > > > > >
 > > > > > In the V1 sink interface, there is a GlobalCommitter for Iceberg.
 > > With
 > > > > the
 > > > > > V2 sink interface, GlobalCommitter has been deprecated by
 > > > > > WithPostCommitTopology. I thought the post commit stage is mainly
 > for
 > > > > async
 > > > > > maintenance (like compaction).
 > > > > >
 > > > > > Are we supposed to do sth similar to the
 > GlobalCommittingSinkAdapter?
 > > > It
 > > > > > seems like a temporary transition plan for bridging v1 sinks to v2
 > > > > > interfaces.
 > > > > >
 > > > > > private class GlobalCommittingSinkAdapter extends
 > > > > TwoPhaseCommittingSinkAdapter
 > > > > > implements WithPostCommitTopology<InputT, CommT> {
 > > > > > @Override
 > > > > > public void
 > > addPostCommitTopology(DataStream<CommittableMessage<CommT>>
 > > > > committables) {
 > > > > > StandardSinkTopologies.addGlobalCommitter(
 > > > > > committables,
 > > > > > GlobalCommitterAdapter::new,
 > > > > > () -> sink.getCommittableSerializer().get());
 > > > > > }
 > > > > > }
 > > > > >
 > > > > >
 > > > > > In the Iceberg PR [1] for adopting the new sink interface, Liwei
 > used
 > > > the
 > > > > > "global" partitioner to force all committables go to a single
 > > committer
 > > > > > task 0. It will effectively force a global committer disguised in
 > the
 > > > > > parallel committers. It is a little weird and also can lead to
 > > > questions
 > > > > > why other committer tasks are not getting any messages. Plus, it
 > will
 > > > > > disable the future capability of small file compaction stage post
 > > > commit.
 > > > > > Hence, I am asking what is the right approach to achieve global
 > > > committer
 > > > > > behavior.
 > > > > >
 > > > > > Thanks,
 > > > > > Steven
 > > > > >
 > > > > > [1] https://github.com/apache/iceberg/pull/4904/files#r946975047 <https://github.com/apache/iceberg/pull/4904/files#r946975047 > <https://github.com/apache/iceberg/pull/4904/files#r946975047 <https://github.com/apache/iceberg/pull/4904/files#r946975047 > > <
 > > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 <https://github.com/apache/iceberg/pull/4904/files#r946975047 > <https://github.com/apache/iceberg/pull/4904/files#r946975047 <https://github.com/apache/iceberg/pull/4904/files#r946975047 > > >
 > > > > >
 > > > >
 > > >
 > >
 >

Re: Sink V2 interface replacement for GlobalCommitter

Posted by Steven Wu <st...@gmail.com>.
Krzysztof, no worries. We are discussing the same topic (how to support
storage with globally transactional commits).

> In Delta Sink connector we actually use both Committer [1] and
GlobalCommitter [2]. The former, since we are using Flink's Parquet file
writers is doing a very simple job of "of renaming the hidden file to make
it visible and removing from the name some 'in-progress file' marker". The
GlobalCommitter is committing data to the Delta Log.

Curious if the writers can write the visible files directly (vs hidden
files first then renamed by committer). Since there is a global committer
to commit the data files when Flink checkpoint completes, job failure or
restart shouldn't cause data file dups or loss. I probably missed some
context here.

On Wed, Sep 14, 2022 at 5:20 AM Krzysztof Chmielewski <
krzysiek.chmielewski@gmail.com> wrote:

> Hi Yun,
> Thanks for your input.
>
> In Delta Sink connector we actually use both Committer [1] and
> GlobalCommitter [2]. The former, since we are using Flink's Parquet file
> writers is doing a very simple job of "of renaming the hidden file to make
> it visible and removing from the name some 'in-progress file' marker". The
> GlobalCommitter is committing data to the Delta Log.
>
> With this design, having many instances of Committers actually has a
> benefit for us. Plus we would see some next features in our connector that
> would benefit from separate Committers with parallelism level higher than 1.
>
> How I understood your suggestion Yun (and maybe It was a wrong
> interpretation) is to use both Committer and GlobalCommitter but to enforce
> parallelism level 1 on the former. The GlobalCommitter created by Flink's
> 1.15 SinkV1Adapter has parallelism 1 as expected and how it was in Flink <
> 1.15.
>
> Anyways, I've play a little bit with the Flink code and I managed to
> achieved this [3]. After some additional changes which I will describe
> below, our test described in [4] passed without any data loss and no
> Exceptions thrown by Flink.
>
> However changing parallelism of Committer operator to hardcoded value 1
> was not enough and I had to do two more things:
> 1. add rebalance step (RebalancePartitioner) to graph between writer and
> committer since now they have different parallelism level and default
> partitioner was FORWARD that caused an exception to be thrown - BTW this is
> clear and understood
> 2. modify Flinks CommittableCollectorSerializer [5] and this is I believe
> an important thing.
>
> The modification I had to made was caused by "Duplicate Key" exception
> from deserialize(int version, byte[] serialized) method from line 143 of
> [5] where we process a stream of SubtaskCommittableManager objects and
> collect it into to the Map. The map key is a subtaskId
> from SubtaskCommittableManager object.
>
> After Task Manager recovery it may happen that List of
> SubtaskCommittableManager that is processed in that  deserialize method
> will contain two SubtaskCommittableManager for the same subtask ID. What I
> did is that for such a case I call SubtaskCommittableManager .merge(...)
> method.
>
> With those modifications our Delta test [7] started to pass on Flink 1.15.
>
> I do not know whether setting parallelism level of the Committer to 1 is a
> right thing to do. Like I mentioned, Committer is doing some work in our
> Sink implementation and we might have more usage for it in next features we
> would like to add that would benefit from keeping parallelism level equal
> to writers count.
>
> I still think there is some issue with the V2 architecture for topologies
> with GlobalCommitter and failover scenarios [4] and even that duplicated
> key in [5] described above is another case, maybe we should never have two
> entries for same subtaskId. That I don't know.
>
> P.S.
> Steven, apologies for hijacking the thread a little bit.
>
> Thanks,
> Krzysztof Chmielewski
>
> [1]
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaCommitter.java
> [2]
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
> [3]
> https://drive.google.com/file/d/1kU0R9nLZneJBDAkgNiaRc90dLGycyTec/view?usp=sharing
> [4] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc
> [5]
> https://github.com/apache/flink/blob/release-1.15/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
> [7]
> https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
>
> śr., 14 wrz 2022 o 05:26 Steven Wu <st...@gmail.com> napisał(a):
>
>> > setting the committer parallelism to 1.
>>
>> Yun, setting the parallelism to 1 is essentially a global committer. That
>> would work. not sure about the implications to other parts of the v2 sink
>> interface.
>>
>> On Tue, Sep 13, 2022 at 2:17 PM Krzysztof Chmielewski <
>> krzysiek.chmielewski@gmail.com> wrote:
>>
>>> Hi  Martijn
>>> Could you clarify a little bit what do you mean by:
>>>
>>> "The important part to remember is that this
>>> topology is lagging one checkpoint behind in terms of fault-tolerance: it
>>> only receives data once the committer committed"
>>>
>>> What are the implications?
>>>
>>> Thanks,
>>> Krzysztof Chmielewski
>>>
>>> wt., 13 wrz 2022 o 09:57 Yun Gao <yu...@aliyun.com.invalid>
>>> napisał(a):
>>>
>>>> Hi,
>>>> Very sorry for the late reply for being in the holiday.
>>>> And also very thanks for the discussion, it also reminds me
>>>> one more background on the change of the GlobalCommitter:
>>>> When we are refactoring the job finish process in FLIP-147 to
>>>> ensures all the records could be committed at the end of bounded
>>>> streaming job, we have to desert the support for the cascade commits,
>>>> which makes the cascade commit of `committer -> global committer` not
>>>> work
>>>> in all cases.
>>>> For the current issues, one possible alternative option from my side is
>>>> that we
>>>> may support setting the committer parallelism to 1. Could this option
>>>> solves
>>>> the issue in the current scenarios? I'll also have a double check with
>>>> if
>>>> it could be implemented and the failed tests Krzysztof met.
>>>> Best,
>>>> Yun
>>>> ------------------------------------------------------------------
>>>> From:Steven Wu <st...@gmail.com>
>>>> Send Time:2022 Sep. 10 (Sat.) 11:31
>>>> To:dev <de...@flink.apache.org>
>>>> Cc:Yun Gao <yu...@aliyun.com>; hililiwei <hi...@gmail.com>
>>>> Subject:Re: Sink V2 interface replacement for GlobalCommitter
>>>> Martjin, thanks a lot for chiming in!
>>>> Here are my concerns with adding GlobalCommitter in the
>>>> PostCommitTopology
>>>> 1. when we use TwoPhaseCommittingSink. We would need to create a
>>>> noop/dummy committer. Actual Iceberg/DeltaLake commits happen in the
>>>> PostCommit stage. The PostCommit stage should be doing some work after the
>>>> commit (not for the commit).
>>>> 2. GlobalCommitter is marked as @deprecated. It will be removed at a
>>>> certain point. What then?
>>>> Thanks,
>>>> Steven
>>>> On Fri, Sep 9, 2022 at 1:20 PM Krzysztof Chmielewski <
>>>> krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com
>>>> >> wrote:
>>>> Thanks Martijn,
>>>>  I'm actually trying to run our V1 Delta connector on Flink 1.15 using
>>>>  SinkV1Adapter with GlobalCommitterOperator.
>>>>  Having said that, I might have found a potential issue with
>>>>  GlobalCommitterOperator, checkpoitining and failover recovery [1].
>>>>  For "normal" scenarios it does look good though.
>>>>  Regards,
>>>>  Krzysztof Chmielewski
>>>>  [1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc <
>>>> https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc >
>>>>  pt., 9 wrz 2022 o 20:49 Martijn Visser <martijnvisser@apache.org
>>>> <mailto:martijnvisser@apache.org >>
>>>>  napisał(a):
>>>>  > Hi all,
>>>>  >
>>>>  > A couple of bits from when work was being done on the new sink: V1 is
>>>>  > completely simulated as V2 [1]. V2 is strictly more expressive.
>>>>  >
>>>>  > If there's desire to stick to the `GlobalCommitter` interface, have a
>>>>  > look at the StandardSinkTopologies. Or you can just add your own more
>>>>  > fitting PostCommitTopology. The important part to remember is that
>>>> this
>>>>  > topology is lagging one checkpoint behind in terms of
>>>> fault-tolerance: it
>>>>  > only receives data once the committer committed
>>>>  > on notifyCheckpointComplete. Thus, the global committer needs to be
>>>>  > idempotent and able to restore the actual state on recovery. That
>>>>  > limitation is coming in from Flink's checkpointing behaviour and
>>>> applies to
>>>>  > both V1 and V2. GlobalCommitterOperator is abstracting these issues
>>>> along
>>>>  > with handling retries (so commits that happen much later). So it's
>>>> probably
>>>>  > a good place to start just with the standard topology.
>>>>  >
>>>>  > Best regards,
>>>>  >
>>>>  > Martijn
>>>>  >
>>>>  > [1]
>>>>  >
>>>>  >
>>>> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
>>>> <
>>>> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
>>>> >
>>>>  >
>>>>  > Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski <
>>>>  > krzysiek.chmielewski@gmail.com <mailto:
>>>> krzysiek.chmielewski@gmail.com >>:
>>>>  >
>>>>  > > Hi,
>>>>  > > Krzysztof Chmielewski [1] from Delta-Flink connector open source
>>>>  > community
>>>>  > > here [2].
>>>>  > >
>>>>  > > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is
>>>>  > > something exactly what Flink-Delta Sink needs since it is the
>>>> place where
>>>>  > > we do an actual commit to the Delta Log which should be done from
>>>> a one
>>>>  > > place/instance.
>>>>  > >
>>>>  > > Currently I'm evaluating V2 for our connector and having, how
>>>> Steven
>>>>  > > described it a "more natural, built-in concept/support of
>>>> GlobalCommitter
>>>>  > > in the sink v2 interface" would be greatly appreciated.
>>>>  > >
>>>>  > > Cheers,
>>>>  > > Krzysztof Chmielewski
>>>>  > >
>>>>  > > [1] https://github.com/kristoffSC <https://github.com/kristoffSC >
>>>>  > > [2] https://github.com/delta-io/connectors/tree/master/flink <
>>>> https://github.com/delta-io/connectors/tree/master/flink >
>>>>  > >
>>>>  > > czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz3wu@gmail.com <mailto:
>>>> stevenz3wu@gmail.com >> napisał(a):
>>>>  > >
>>>>  > > > Hi Yun,
>>>>  > > >
>>>>  > > > Thanks a lot for the reply!
>>>>  > > >
>>>>  > > > While we can add the global committer in the
>>>> WithPostCommitTopology,
>>>>  > the
>>>>  > > > semantics are weird. The Commit stage actually didn't commit
>>>> anything
>>>>  > to
>>>>  > > > the Iceberg table, and the PostCommit stage is where the Iceberg
>>>> commit
>>>>  > > > happens.
>>>>  > > >
>>>>  > > > I just took a quick look at DeltaLake Flink sink. It still uses
>>>> the V1
>>>>  > > sink
>>>>  > > > interface [1]. I think it might have the same issue when
>>>> switching to
>>>>  > the
>>>>  > > > V2 sink interface.
>>>>  > > >
>>>>  > > > For data lake storages (like Iceberg, DeltaLake) or any storage
>>>> with
>>>>  > > global
>>>>  > > > transactional commit, it would be more natural to have a built-in
>>>>  > > > concept/support of GlobalCommitter in the sink v2 interface.
>>>>  > > >
>>>>  > > > Thanks,
>>>>  > > > Steven
>>>>  > > >
>>>>  > > > [1]
>>>>  > > >
>>>>  > > >
>>>>  > >
>>>>  >
>>>> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
>>>> <
>>>> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
>>>> >
>>>>  > > >
>>>>  > > >
>>>>  > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao
>>>> <yu...@aliyun.com.invalid>
>>>>  > > > wrote:
>>>>  > > >
>>>>  > > > > Hi Steven, Liwei,
>>>>  > > > > Very sorry for missing this mail and response very late.
>>>>  > > > > I think the initial thought is indeed to use
>>>> `WithPostCommitTopology`
>>>>  > > as
>>>>  > > > > a replacement of the original GlobalCommitter, and currently
>>>> the
>>>>  > > adapter
>>>>  > > > of
>>>>  > > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in
>>>> Sink V1
>>>>  > > > > interface
>>>>  > > > > onto an implementation of `WithPostCommitTopology`.
>>>>  > > > > Since `WithPostCommitTopology` supports arbitrary subgraph,
>>>> thus It
>>>>  > > seems
>>>>  > > > > to
>>>>  > > > > me it could support both global committer and small file
>>>> compaction?
>>>>  > We
>>>>  > > > > might
>>>>  > > > > have an `WithPostCommitTopology` implementation like
>>>>  > > > > DataStream ds = add global committer;
>>>>  > > > > if (enable file compaction) {
>>>>  > > > > build the compaction subgraph from ds
>>>>  > > > > }
>>>>  > > > > Best,
>>>>  > > > > Yun
>>>>  > > > > [1]
>>>>  > > > >
>>>>  > > >
>>>>  > >
>>>>  >
>>>> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
>>>> <
>>>> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
>>>> >
>>>>  > > > > <
>>>>  > > > >
>>>>  > > >
>>>>  > >
>>>>  >
>>>> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
>>>> <
>>>> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
>>>> >
>>>>  > > > > >
>>>>  > > > >
>>>> ------------------------------------------------------------------
>>>>  > > > > From:Steven Wu <stevenz3wu@gmail.com <mailto:
>>>> stevenz3wu@gmail.com >>
>>>>  > > > > Send Time:2022 Aug. 17 (Wed.) 07:30
>>>>  > > > > To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org >>;
>>>> hililiwei <hililiwei@gmail.com <mailto:hililiwei@gmail.com >>
>>>>  > > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter
>>>>  > > > > > Plus, it will disable the future capability of small file
>>>>  > compaction
>>>>  > > > > stage post commit.
>>>>  > > > > I should clarify this comment. if we are using the
>>>>  > > > `WithPostCommitTopology`
>>>>  > > > > for global committer, we would lose the capability of using
>>>> the post
>>>>  > > > commit
>>>>  > > > > stage for small files compaction.
>>>>  > > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <
>>>> stevenz3wu@gmail.com <mailto:stevenz3wu@gmail.com >>
>>>>  > > wrote:
>>>>  > > > > >
>>>>  > > > > > In the V1 sink interface, there is a GlobalCommitter for
>>>> Iceberg.
>>>>  > > With
>>>>  > > > > the
>>>>  > > > > > V2 sink interface, GlobalCommitter has been deprecated by
>>>>  > > > > > WithPostCommitTopology. I thought the post commit stage is
>>>> mainly
>>>>  > for
>>>>  > > > > async
>>>>  > > > > > maintenance (like compaction).
>>>>  > > > > >
>>>>  > > > > > Are we supposed to do sth similar to the
>>>>  > GlobalCommittingSinkAdapter?
>>>>  > > > It
>>>>  > > > > > seems like a temporary transition plan for bridging v1 sinks
>>>> to v2
>>>>  > > > > > interfaces.
>>>>  > > > > >
>>>>  > > > > > private class GlobalCommittingSinkAdapter extends
>>>>  > > > > TwoPhaseCommittingSinkAdapter
>>>>  > > > > > implements WithPostCommitTopology<InputT, CommT> {
>>>>  > > > > > @Override
>>>>  > > > > > public void
>>>>  > > addPostCommitTopology(DataStream<CommittableMessage<CommT>>
>>>>  > > > > committables) {
>>>>  > > > > > StandardSinkTopologies.addGlobalCommitter(
>>>>  > > > > > committables,
>>>>  > > > > > GlobalCommitterAdapter::new,
>>>>  > > > > > () -> sink.getCommittableSerializer().get());
>>>>  > > > > > }
>>>>  > > > > > }
>>>>  > > > > >
>>>>  > > > > >
>>>>  > > > > > In the Iceberg PR [1] for adopting the new sink interface,
>>>> Liwei
>>>>  > used
>>>>  > > > the
>>>>  > > > > > "global" partitioner to force all committables go to a single
>>>>  > > committer
>>>>  > > > > > task 0. It will effectively force a global committer
>>>> disguised in
>>>>  > the
>>>>  > > > > > parallel committers. It is a little weird and also can lead
>>>> to
>>>>  > > > questions
>>>>  > > > > > why other committer tasks are not getting any messages.
>>>> Plus, it
>>>>  > will
>>>>  > > > > > disable the future capability of small file compaction stage
>>>> post
>>>>  > > > commit.
>>>>  > > > > > Hence, I am asking what is the right approach to achieve
>>>> global
>>>>  > > > committer
>>>>  > > > > > behavior.
>>>>  > > > > >
>>>>  > > > > > Thanks,
>>>>  > > > > > Steven
>>>>  > > > > >
>>>>  > > > > > [1]
>>>> https://github.com/apache/iceberg/pull/4904/files#r946975047 <
>>>> https://github.com/apache/iceberg/pull/4904/files#r946975047 > <
>>>>  > > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 <
>>>> https://github.com/apache/iceberg/pull/4904/files#r946975047 > >
>>>>  > > > > >
>>>>  > > > >
>>>>  > > >
>>>>  > >
>>>>  >
>>>>
>>>

Re: Sink V2 interface replacement for GlobalCommitter

Posted by Krzysztof Chmielewski <kr...@gmail.com>.
Hi Yun,
Thanks for your input.

In Delta Sink connector we actually use both Committer [1] and
GlobalCommitter [2]. The former, since we are using Flink's Parquet file
writers is doing a very simple job of "of renaming the hidden file to make
it visible and removing from the name some 'in-progress file' marker". The
GlobalCommitter is committing data to the Delta Log.

With this design, having many instances of Committers actually has a
benefit for us. Plus we would see some next features in our connector that
would benefit from separate Committers with parallelism level higher than 1.

How I understood your suggestion Yun (and maybe It was a wrong
interpretation) is to use both Committer and GlobalCommitter but to enforce
parallelism level 1 on the former. The GlobalCommitter created by Flink's
1.15 SinkV1Adapter has parallelism 1 as expected and how it was in Flink <
1.15.

Anyways, I've play a little bit with the Flink code and I managed to
achieved this [3]. After some additional changes which I will describe
below, our test described in [4] passed without any data loss and no
Exceptions thrown by Flink.

However changing parallelism of Committer operator to hardcoded value 1 was
not enough and I had to do two more things:
1. add rebalance step (RebalancePartitioner) to graph between writer and
committer since now they have different parallelism level and default
partitioner was FORWARD that caused an exception to be thrown - BTW this is
clear and understood
2. modify Flinks CommittableCollectorSerializer [5] and this is I believe
an important thing.

The modification I had to made was caused by "Duplicate Key" exception from
deserialize(int version, byte[] serialized) method from line 143 of [5]
where we process a stream of SubtaskCommittableManager objects and collect
it into to the Map. The map key is a subtaskId
from SubtaskCommittableManager object.

After Task Manager recovery it may happen that List of
SubtaskCommittableManager that is processed in that  deserialize method
will contain two SubtaskCommittableManager for the same subtask ID. What I
did is that for such a case I call SubtaskCommittableManager .merge(...)
method.

With those modifications our Delta test [7] started to pass on Flink 1.15.

I do not know whether setting parallelism level of the Committer to 1 is a
right thing to do. Like I mentioned, Committer is doing some work in our
Sink implementation and we might have more usage for it in next features we
would like to add that would benefit from keeping parallelism level equal
to writers count.

I still think there is some issue with the V2 architecture for topologies
with GlobalCommitter and failover scenarios [4] and even that duplicated
key in [5] described above is another case, maybe we should never have two
entries for same subtaskId. That I don't know.

P.S.
Steven, apologies for hijacking the thread a little bit.

Thanks,
Krzysztof Chmielewski

[1]
https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaCommitter.java
[2]
https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
[3]
https://drive.google.com/file/d/1kU0R9nLZneJBDAkgNiaRc90dLGycyTec/view?usp=sharing
[4] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc
[5]
https://github.com/apache/flink/blob/release-1.15/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
[7]
https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java

śr., 14 wrz 2022 o 05:26 Steven Wu <st...@gmail.com> napisał(a):

> > setting the committer parallelism to 1.
>
> Yun, setting the parallelism to 1 is essentially a global committer. That
> would work. not sure about the implications to other parts of the v2 sink
> interface.
>
> On Tue, Sep 13, 2022 at 2:17 PM Krzysztof Chmielewski <
> krzysiek.chmielewski@gmail.com> wrote:
>
>> Hi  Martijn
>> Could you clarify a little bit what do you mean by:
>>
>> "The important part to remember is that this
>> topology is lagging one checkpoint behind in terms of fault-tolerance: it
>> only receives data once the committer committed"
>>
>> What are the implications?
>>
>> Thanks,
>> Krzysztof Chmielewski
>>
>> wt., 13 wrz 2022 o 09:57 Yun Gao <yu...@aliyun.com.invalid>
>> napisał(a):
>>
>>> Hi,
>>> Very sorry for the late reply for being in the holiday.
>>> And also very thanks for the discussion, it also reminds me
>>> one more background on the change of the GlobalCommitter:
>>> When we are refactoring the job finish process in FLIP-147 to
>>> ensures all the records could be committed at the end of bounded
>>> streaming job, we have to desert the support for the cascade commits,
>>> which makes the cascade commit of `committer -> global committer` not
>>> work
>>> in all cases.
>>> For the current issues, one possible alternative option from my side is
>>> that we
>>> may support setting the committer parallelism to 1. Could this option
>>> solves
>>> the issue in the current scenarios? I'll also have a double check with if
>>> it could be implemented and the failed tests Krzysztof met.
>>> Best,
>>> Yun
>>> ------------------------------------------------------------------
>>> From:Steven Wu <st...@gmail.com>
>>> Send Time:2022 Sep. 10 (Sat.) 11:31
>>> To:dev <de...@flink.apache.org>
>>> Cc:Yun Gao <yu...@aliyun.com>; hililiwei <hi...@gmail.com>
>>> Subject:Re: Sink V2 interface replacement for GlobalCommitter
>>> Martjin, thanks a lot for chiming in!
>>> Here are my concerns with adding GlobalCommitter in the
>>> PostCommitTopology
>>> 1. when we use TwoPhaseCommittingSink. We would need to create a
>>> noop/dummy committer. Actual Iceberg/DeltaLake commits happen in the
>>> PostCommit stage. The PostCommit stage should be doing some work after the
>>> commit (not for the commit).
>>> 2. GlobalCommitter is marked as @deprecated. It will be removed at a
>>> certain point. What then?
>>> Thanks,
>>> Steven
>>> On Fri, Sep 9, 2022 at 1:20 PM Krzysztof Chmielewski <
>>> krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com
>>> >> wrote:
>>> Thanks Martijn,
>>>  I'm actually trying to run our V1 Delta connector on Flink 1.15 using
>>>  SinkV1Adapter with GlobalCommitterOperator.
>>>  Having said that, I might have found a potential issue with
>>>  GlobalCommitterOperator, checkpoitining and failover recovery [1].
>>>  For "normal" scenarios it does look good though.
>>>  Regards,
>>>  Krzysztof Chmielewski
>>>  [1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc <
>>> https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc >
>>>  pt., 9 wrz 2022 o 20:49 Martijn Visser <martijnvisser@apache.org
>>> <mailto:martijnvisser@apache.org >>
>>>  napisał(a):
>>>  > Hi all,
>>>  >
>>>  > A couple of bits from when work was being done on the new sink: V1 is
>>>  > completely simulated as V2 [1]. V2 is strictly more expressive.
>>>  >
>>>  > If there's desire to stick to the `GlobalCommitter` interface, have a
>>>  > look at the StandardSinkTopologies. Or you can just add your own more
>>>  > fitting PostCommitTopology. The important part to remember is that
>>> this
>>>  > topology is lagging one checkpoint behind in terms of
>>> fault-tolerance: it
>>>  > only receives data once the committer committed
>>>  > on notifyCheckpointComplete. Thus, the global committer needs to be
>>>  > idempotent and able to restore the actual state on recovery. That
>>>  > limitation is coming in from Flink's checkpointing behaviour and
>>> applies to
>>>  > both V1 and V2. GlobalCommitterOperator is abstracting these issues
>>> along
>>>  > with handling retries (so commits that happen much later). So it's
>>> probably
>>>  > a good place to start just with the standard topology.
>>>  >
>>>  > Best regards,
>>>  >
>>>  > Martijn
>>>  >
>>>  > [1]
>>>  >
>>>  >
>>> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
>>> <
>>> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
>>> >
>>>  >
>>>  > Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski <
>>>  > krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com
>>> >>:
>>>  >
>>>  > > Hi,
>>>  > > Krzysztof Chmielewski [1] from Delta-Flink connector open source
>>>  > community
>>>  > > here [2].
>>>  > >
>>>  > > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is
>>>  > > something exactly what Flink-Delta Sink needs since it is the place
>>> where
>>>  > > we do an actual commit to the Delta Log which should be done from a
>>> one
>>>  > > place/instance.
>>>  > >
>>>  > > Currently I'm evaluating V2 for our connector and having, how Steven
>>>  > > described it a "more natural, built-in concept/support of
>>> GlobalCommitter
>>>  > > in the sink v2 interface" would be greatly appreciated.
>>>  > >
>>>  > > Cheers,
>>>  > > Krzysztof Chmielewski
>>>  > >
>>>  > > [1] https://github.com/kristoffSC <https://github.com/kristoffSC >
>>>  > > [2] https://github.com/delta-io/connectors/tree/master/flink <
>>> https://github.com/delta-io/connectors/tree/master/flink >
>>>  > >
>>>  > > czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz3wu@gmail.com <mailto:
>>> stevenz3wu@gmail.com >> napisał(a):
>>>  > >
>>>  > > > Hi Yun,
>>>  > > >
>>>  > > > Thanks a lot for the reply!
>>>  > > >
>>>  > > > While we can add the global committer in the
>>> WithPostCommitTopology,
>>>  > the
>>>  > > > semantics are weird. The Commit stage actually didn't commit
>>> anything
>>>  > to
>>>  > > > the Iceberg table, and the PostCommit stage is where the Iceberg
>>> commit
>>>  > > > happens.
>>>  > > >
>>>  > > > I just took a quick look at DeltaLake Flink sink. It still uses
>>> the V1
>>>  > > sink
>>>  > > > interface [1]. I think it might have the same issue when
>>> switching to
>>>  > the
>>>  > > > V2 sink interface.
>>>  > > >
>>>  > > > For data lake storages (like Iceberg, DeltaLake) or any storage
>>> with
>>>  > > global
>>>  > > > transactional commit, it would be more natural to have a built-in
>>>  > > > concept/support of GlobalCommitter in the sink v2 interface.
>>>  > > >
>>>  > > > Thanks,
>>>  > > > Steven
>>>  > > >
>>>  > > > [1]
>>>  > > >
>>>  > > >
>>>  > >
>>>  >
>>> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
>>> <
>>> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
>>> >
>>>  > > >
>>>  > > >
>>>  > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao
>>> <yu...@aliyun.com.invalid>
>>>  > > > wrote:
>>>  > > >
>>>  > > > > Hi Steven, Liwei,
>>>  > > > > Very sorry for missing this mail and response very late.
>>>  > > > > I think the initial thought is indeed to use
>>> `WithPostCommitTopology`
>>>  > > as
>>>  > > > > a replacement of the original GlobalCommitter, and currently the
>>>  > > adapter
>>>  > > > of
>>>  > > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink
>>> V1
>>>  > > > > interface
>>>  > > > > onto an implementation of `WithPostCommitTopology`.
>>>  > > > > Since `WithPostCommitTopology` supports arbitrary subgraph,
>>> thus It
>>>  > > seems
>>>  > > > > to
>>>  > > > > me it could support both global committer and small file
>>> compaction?
>>>  > We
>>>  > > > > might
>>>  > > > > have an `WithPostCommitTopology` implementation like
>>>  > > > > DataStream ds = add global committer;
>>>  > > > > if (enable file compaction) {
>>>  > > > > build the compaction subgraph from ds
>>>  > > > > }
>>>  > > > > Best,
>>>  > > > > Yun
>>>  > > > > [1]
>>>  > > > >
>>>  > > >
>>>  > >
>>>  >
>>> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
>>> <
>>> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
>>> >
>>>  > > > > <
>>>  > > > >
>>>  > > >
>>>  > >
>>>  >
>>> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
>>> <
>>> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
>>> >
>>>  > > > > >
>>>  > > > >
>>> ------------------------------------------------------------------
>>>  > > > > From:Steven Wu <stevenz3wu@gmail.com <mailto:
>>> stevenz3wu@gmail.com >>
>>>  > > > > Send Time:2022 Aug. 17 (Wed.) 07:30
>>>  > > > > To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org >>;
>>> hililiwei <hililiwei@gmail.com <mailto:hililiwei@gmail.com >>
>>>  > > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter
>>>  > > > > > Plus, it will disable the future capability of small file
>>>  > compaction
>>>  > > > > stage post commit.
>>>  > > > > I should clarify this comment. if we are using the
>>>  > > > `WithPostCommitTopology`
>>>  > > > > for global committer, we would lose the capability of using the
>>> post
>>>  > > > commit
>>>  > > > > stage for small files compaction.
>>>  > > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <stevenz3wu@gmail.com
>>> <mailto:stevenz3wu@gmail.com >>
>>>  > > wrote:
>>>  > > > > >
>>>  > > > > > In the V1 sink interface, there is a GlobalCommitter for
>>> Iceberg.
>>>  > > With
>>>  > > > > the
>>>  > > > > > V2 sink interface, GlobalCommitter has been deprecated by
>>>  > > > > > WithPostCommitTopology. I thought the post commit stage is
>>> mainly
>>>  > for
>>>  > > > > async
>>>  > > > > > maintenance (like compaction).
>>>  > > > > >
>>>  > > > > > Are we supposed to do sth similar to the
>>>  > GlobalCommittingSinkAdapter?
>>>  > > > It
>>>  > > > > > seems like a temporary transition plan for bridging v1 sinks
>>> to v2
>>>  > > > > > interfaces.
>>>  > > > > >
>>>  > > > > > private class GlobalCommittingSinkAdapter extends
>>>  > > > > TwoPhaseCommittingSinkAdapter
>>>  > > > > > implements WithPostCommitTopology<InputT, CommT> {
>>>  > > > > > @Override
>>>  > > > > > public void
>>>  > > addPostCommitTopology(DataStream<CommittableMessage<CommT>>
>>>  > > > > committables) {
>>>  > > > > > StandardSinkTopologies.addGlobalCommitter(
>>>  > > > > > committables,
>>>  > > > > > GlobalCommitterAdapter::new,
>>>  > > > > > () -> sink.getCommittableSerializer().get());
>>>  > > > > > }
>>>  > > > > > }
>>>  > > > > >
>>>  > > > > >
>>>  > > > > > In the Iceberg PR [1] for adopting the new sink interface,
>>> Liwei
>>>  > used
>>>  > > > the
>>>  > > > > > "global" partitioner to force all committables go to a single
>>>  > > committer
>>>  > > > > > task 0. It will effectively force a global committer
>>> disguised in
>>>  > the
>>>  > > > > > parallel committers. It is a little weird and also can lead to
>>>  > > > questions
>>>  > > > > > why other committer tasks are not getting any messages. Plus,
>>> it
>>>  > will
>>>  > > > > > disable the future capability of small file compaction stage
>>> post
>>>  > > > commit.
>>>  > > > > > Hence, I am asking what is the right approach to achieve
>>> global
>>>  > > > committer
>>>  > > > > > behavior.
>>>  > > > > >
>>>  > > > > > Thanks,
>>>  > > > > > Steven
>>>  > > > > >
>>>  > > > > > [1]
>>> https://github.com/apache/iceberg/pull/4904/files#r946975047 <
>>> https://github.com/apache/iceberg/pull/4904/files#r946975047 > <
>>>  > > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 <
>>> https://github.com/apache/iceberg/pull/4904/files#r946975047 > >
>>>  > > > > >
>>>  > > > >
>>>  > > >
>>>  > >
>>>  >
>>>
>>

Re: Sink V2 interface replacement for GlobalCommitter

Posted by Steven Wu <st...@gmail.com>.
> setting the committer parallelism to 1.

Yun, setting the parallelism to 1 is essentially a global committer. That
would work. not sure about the implications to other parts of the v2 sink
interface.

On Tue, Sep 13, 2022 at 2:17 PM Krzysztof Chmielewski <
krzysiek.chmielewski@gmail.com> wrote:

> Hi  Martijn
> Could you clarify a little bit what do you mean by:
>
> "The important part to remember is that this
> topology is lagging one checkpoint behind in terms of fault-tolerance: it
> only receives data once the committer committed"
>
> What are the implications?
>
> Thanks,
> Krzysztof Chmielewski
>
> wt., 13 wrz 2022 o 09:57 Yun Gao <yu...@aliyun.com.invalid>
> napisał(a):
>
>> Hi,
>> Very sorry for the late reply for being in the holiday.
>> And also very thanks for the discussion, it also reminds me
>> one more background on the change of the GlobalCommitter:
>> When we are refactoring the job finish process in FLIP-147 to
>> ensures all the records could be committed at the end of bounded
>> streaming job, we have to desert the support for the cascade commits,
>> which makes the cascade commit of `committer -> global committer` not work
>> in all cases.
>> For the current issues, one possible alternative option from my side is
>> that we
>> may support setting the committer parallelism to 1. Could this option
>> solves
>> the issue in the current scenarios? I'll also have a double check with if
>> it could be implemented and the failed tests Krzysztof met.
>> Best,
>> Yun
>> ------------------------------------------------------------------
>> From:Steven Wu <st...@gmail.com>
>> Send Time:2022 Sep. 10 (Sat.) 11:31
>> To:dev <de...@flink.apache.org>
>> Cc:Yun Gao <yu...@aliyun.com>; hililiwei <hi...@gmail.com>
>> Subject:Re: Sink V2 interface replacement for GlobalCommitter
>> Martjin, thanks a lot for chiming in!
>> Here are my concerns with adding GlobalCommitter in the
>> PostCommitTopology
>> 1. when we use TwoPhaseCommittingSink. We would need to create a
>> noop/dummy committer. Actual Iceberg/DeltaLake commits happen in the
>> PostCommit stage. The PostCommit stage should be doing some work after the
>> commit (not for the commit).
>> 2. GlobalCommitter is marked as @deprecated. It will be removed at a
>> certain point. What then?
>> Thanks,
>> Steven
>> On Fri, Sep 9, 2022 at 1:20 PM Krzysztof Chmielewski <
>> krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com >>
>> wrote:
>> Thanks Martijn,
>>  I'm actually trying to run our V1 Delta connector on Flink 1.15 using
>>  SinkV1Adapter with GlobalCommitterOperator.
>>  Having said that, I might have found a potential issue with
>>  GlobalCommitterOperator, checkpoitining and failover recovery [1].
>>  For "normal" scenarios it does look good though.
>>  Regards,
>>  Krzysztof Chmielewski
>>  [1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc <
>> https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc >
>>  pt., 9 wrz 2022 o 20:49 Martijn Visser <martijnvisser@apache.org
>> <mailto:martijnvisser@apache.org >>
>>  napisał(a):
>>  > Hi all,
>>  >
>>  > A couple of bits from when work was being done on the new sink: V1 is
>>  > completely simulated as V2 [1]. V2 is strictly more expressive.
>>  >
>>  > If there's desire to stick to the `GlobalCommitter` interface, have a
>>  > look at the StandardSinkTopologies. Or you can just add your own more
>>  > fitting PostCommitTopology. The important part to remember is that this
>>  > topology is lagging one checkpoint behind in terms of fault-tolerance:
>> it
>>  > only receives data once the committer committed
>>  > on notifyCheckpointComplete. Thus, the global committer needs to be
>>  > idempotent and able to restore the actual state on recovery. That
>>  > limitation is coming in from Flink's checkpointing behaviour and
>> applies to
>>  > both V1 and V2. GlobalCommitterOperator is abstracting these issues
>> along
>>  > with handling retries (so commits that happen much later). So it's
>> probably
>>  > a good place to start just with the standard topology.
>>  >
>>  > Best regards,
>>  >
>>  > Martijn
>>  >
>>  > [1]
>>  >
>>  >
>> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
>> <
>> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
>> >
>>  >
>>  > Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski <
>>  > krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com
>> >>:
>>  >
>>  > > Hi,
>>  > > Krzysztof Chmielewski [1] from Delta-Flink connector open source
>>  > community
>>  > > here [2].
>>  > >
>>  > > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is
>>  > > something exactly what Flink-Delta Sink needs since it is the place
>> where
>>  > > we do an actual commit to the Delta Log which should be done from a
>> one
>>  > > place/instance.
>>  > >
>>  > > Currently I'm evaluating V2 for our connector and having, how Steven
>>  > > described it a "more natural, built-in concept/support of
>> GlobalCommitter
>>  > > in the sink v2 interface" would be greatly appreciated.
>>  > >
>>  > > Cheers,
>>  > > Krzysztof Chmielewski
>>  > >
>>  > > [1] https://github.com/kristoffSC <https://github.com/kristoffSC >
>>  > > [2] https://github.com/delta-io/connectors/tree/master/flink <
>> https://github.com/delta-io/connectors/tree/master/flink >
>>  > >
>>  > > czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz3wu@gmail.com <mailto:
>> stevenz3wu@gmail.com >> napisał(a):
>>  > >
>>  > > > Hi Yun,
>>  > > >
>>  > > > Thanks a lot for the reply!
>>  > > >
>>  > > > While we can add the global committer in the
>> WithPostCommitTopology,
>>  > the
>>  > > > semantics are weird. The Commit stage actually didn't commit
>> anything
>>  > to
>>  > > > the Iceberg table, and the PostCommit stage is where the Iceberg
>> commit
>>  > > > happens.
>>  > > >
>>  > > > I just took a quick look at DeltaLake Flink sink. It still uses
>> the V1
>>  > > sink
>>  > > > interface [1]. I think it might have the same issue when switching
>> to
>>  > the
>>  > > > V2 sink interface.
>>  > > >
>>  > > > For data lake storages (like Iceberg, DeltaLake) or any storage
>> with
>>  > > global
>>  > > > transactional commit, it would be more natural to have a built-in
>>  > > > concept/support of GlobalCommitter in the sink v2 interface.
>>  > > >
>>  > > > Thanks,
>>  > > > Steven
>>  > > >
>>  > > > [1]
>>  > > >
>>  > > >
>>  > >
>>  >
>> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
>> <
>> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
>> >
>>  > > >
>>  > > >
>>  > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao
>> <yu...@aliyun.com.invalid>
>>  > > > wrote:
>>  > > >
>>  > > > > Hi Steven, Liwei,
>>  > > > > Very sorry for missing this mail and response very late.
>>  > > > > I think the initial thought is indeed to use
>> `WithPostCommitTopology`
>>  > > as
>>  > > > > a replacement of the original GlobalCommitter, and currently the
>>  > > adapter
>>  > > > of
>>  > > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink
>> V1
>>  > > > > interface
>>  > > > > onto an implementation of `WithPostCommitTopology`.
>>  > > > > Since `WithPostCommitTopology` supports arbitrary subgraph, thus
>> It
>>  > > seems
>>  > > > > to
>>  > > > > me it could support both global committer and small file
>> compaction?
>>  > We
>>  > > > > might
>>  > > > > have an `WithPostCommitTopology` implementation like
>>  > > > > DataStream ds = add global committer;
>>  > > > > if (enable file compaction) {
>>  > > > > build the compaction subgraph from ds
>>  > > > > }
>>  > > > > Best,
>>  > > > > Yun
>>  > > > > [1]
>>  > > > >
>>  > > >
>>  > >
>>  >
>> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
>> <
>> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
>> >
>>  > > > > <
>>  > > > >
>>  > > >
>>  > >
>>  >
>> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
>> <
>> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
>> >
>>  > > > > >
>>  > > > >
>> ------------------------------------------------------------------
>>  > > > > From:Steven Wu <stevenz3wu@gmail.com <mailto:
>> stevenz3wu@gmail.com >>
>>  > > > > Send Time:2022 Aug. 17 (Wed.) 07:30
>>  > > > > To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org >>;
>> hililiwei <hililiwei@gmail.com <mailto:hililiwei@gmail.com >>
>>  > > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter
>>  > > > > > Plus, it will disable the future capability of small file
>>  > compaction
>>  > > > > stage post commit.
>>  > > > > I should clarify this comment. if we are using the
>>  > > > `WithPostCommitTopology`
>>  > > > > for global committer, we would lose the capability of using the
>> post
>>  > > > commit
>>  > > > > stage for small files compaction.
>>  > > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <stevenz3wu@gmail.com
>> <mailto:stevenz3wu@gmail.com >>
>>  > > wrote:
>>  > > > > >
>>  > > > > > In the V1 sink interface, there is a GlobalCommitter for
>> Iceberg.
>>  > > With
>>  > > > > the
>>  > > > > > V2 sink interface, GlobalCommitter has been deprecated by
>>  > > > > > WithPostCommitTopology. I thought the post commit stage is
>> mainly
>>  > for
>>  > > > > async
>>  > > > > > maintenance (like compaction).
>>  > > > > >
>>  > > > > > Are we supposed to do sth similar to the
>>  > GlobalCommittingSinkAdapter?
>>  > > > It
>>  > > > > > seems like a temporary transition plan for bridging v1 sinks
>> to v2
>>  > > > > > interfaces.
>>  > > > > >
>>  > > > > > private class GlobalCommittingSinkAdapter extends
>>  > > > > TwoPhaseCommittingSinkAdapter
>>  > > > > > implements WithPostCommitTopology<InputT, CommT> {
>>  > > > > > @Override
>>  > > > > > public void
>>  > > addPostCommitTopology(DataStream<CommittableMessage<CommT>>
>>  > > > > committables) {
>>  > > > > > StandardSinkTopologies.addGlobalCommitter(
>>  > > > > > committables,
>>  > > > > > GlobalCommitterAdapter::new,
>>  > > > > > () -> sink.getCommittableSerializer().get());
>>  > > > > > }
>>  > > > > > }
>>  > > > > >
>>  > > > > >
>>  > > > > > In the Iceberg PR [1] for adopting the new sink interface,
>> Liwei
>>  > used
>>  > > > the
>>  > > > > > "global" partitioner to force all committables go to a single
>>  > > committer
>>  > > > > > task 0. It will effectively force a global committer disguised
>> in
>>  > the
>>  > > > > > parallel committers. It is a little weird and also can lead to
>>  > > > questions
>>  > > > > > why other committer tasks are not getting any messages. Plus,
>> it
>>  > will
>>  > > > > > disable the future capability of small file compaction stage
>> post
>>  > > > commit.
>>  > > > > > Hence, I am asking what is the right approach to achieve global
>>  > > > committer
>>  > > > > > behavior.
>>  > > > > >
>>  > > > > > Thanks,
>>  > > > > > Steven
>>  > > > > >
>>  > > > > > [1]
>> https://github.com/apache/iceberg/pull/4904/files#r946975047 <
>> https://github.com/apache/iceberg/pull/4904/files#r946975047 > <
>>  > > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 <
>> https://github.com/apache/iceberg/pull/4904/files#r946975047 > >
>>  > > > > >
>>  > > > >
>>  > > >
>>  > >
>>  >
>>
>

Re: Sink V2 interface replacement for GlobalCommitter

Posted by Krzysztof Chmielewski <kr...@gmail.com>.
Hi  Martijn
Could you clarify a little bit what do you mean by:

"The important part to remember is that this
topology is lagging one checkpoint behind in terms of fault-tolerance: it
only receives data once the committer committed"

What are the implications?

Thanks,
Krzysztof Chmielewski

wt., 13 wrz 2022 o 09:57 Yun Gao <yu...@aliyun.com.invalid> napisał(a):

> Hi,
> Very sorry for the late reply for being in the holiday.
> And also very thanks for the discussion, it also reminds me
> one more background on the change of the GlobalCommitter:
> When we are refactoring the job finish process in FLIP-147 to
> ensures all the records could be committed at the end of bounded
> streaming job, we have to desert the support for the cascade commits,
> which makes the cascade commit of `committer -> global committer` not work
> in all cases.
> For the current issues, one possible alternative option from my side is
> that we
> may support setting the committer parallelism to 1. Could this option
> solves
> the issue in the current scenarios? I'll also have a double check with if
> it could be implemented and the failed tests Krzysztof met.
> Best,
> Yun
> ------------------------------------------------------------------
> From:Steven Wu <st...@gmail.com>
> Send Time:2022 Sep. 10 (Sat.) 11:31
> To:dev <de...@flink.apache.org>
> Cc:Yun Gao <yu...@aliyun.com>; hililiwei <hi...@gmail.com>
> Subject:Re: Sink V2 interface replacement for GlobalCommitter
> Martjin, thanks a lot for chiming in!
> Here are my concerns with adding GlobalCommitter in the PostCommitTopology
> 1. when we use TwoPhaseCommittingSink. We would need to create a
> noop/dummy committer. Actual Iceberg/DeltaLake commits happen in the
> PostCommit stage. The PostCommit stage should be doing some work after the
> commit (not for the commit).
> 2. GlobalCommitter is marked as @deprecated. It will be removed at a
> certain point. What then?
> Thanks,
> Steven
> On Fri, Sep 9, 2022 at 1:20 PM Krzysztof Chmielewski <
> krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com >>
> wrote:
> Thanks Martijn,
>  I'm actually trying to run our V1 Delta connector on Flink 1.15 using
>  SinkV1Adapter with GlobalCommitterOperator.
>  Having said that, I might have found a potential issue with
>  GlobalCommitterOperator, checkpoitining and failover recovery [1].
>  For "normal" scenarios it does look good though.
>  Regards,
>  Krzysztof Chmielewski
>  [1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc <
> https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc >
>  pt., 9 wrz 2022 o 20:49 Martijn Visser <martijnvisser@apache.org <mailto:
> martijnvisser@apache.org >>
>  napisał(a):
>  > Hi all,
>  >
>  > A couple of bits from when work was being done on the new sink: V1 is
>  > completely simulated as V2 [1]. V2 is strictly more expressive.
>  >
>  > If there's desire to stick to the `GlobalCommitter` interface, have a
>  > look at the StandardSinkTopologies. Or you can just add your own more
>  > fitting PostCommitTopology. The important part to remember is that this
>  > topology is lagging one checkpoint behind in terms of fault-tolerance:
> it
>  > only receives data once the committer committed
>  > on notifyCheckpointComplete. Thus, the global committer needs to be
>  > idempotent and able to restore the actual state on recovery. That
>  > limitation is coming in from Flink's checkpointing behaviour and
> applies to
>  > both V1 and V2. GlobalCommitterOperator is abstracting these issues
> along
>  > with handling retries (so commits that happen much later). So it's
> probably
>  > a good place to start just with the standard topology.
>  >
>  > Best regards,
>  >
>  > Martijn
>  >
>  > [1]
>  >
>  >
> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
> <
> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
> >
>  >
>  > Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski <
>  > krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com
> >>:
>  >
>  > > Hi,
>  > > Krzysztof Chmielewski [1] from Delta-Flink connector open source
>  > community
>  > > here [2].
>  > >
>  > > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is
>  > > something exactly what Flink-Delta Sink needs since it is the place
> where
>  > > we do an actual commit to the Delta Log which should be done from a
> one
>  > > place/instance.
>  > >
>  > > Currently I'm evaluating V2 for our connector and having, how Steven
>  > > described it a "more natural, built-in concept/support of
> GlobalCommitter
>  > > in the sink v2 interface" would be greatly appreciated.
>  > >
>  > > Cheers,
>  > > Krzysztof Chmielewski
>  > >
>  > > [1] https://github.com/kristoffSC <https://github.com/kristoffSC >
>  > > [2] https://github.com/delta-io/connectors/tree/master/flink <
> https://github.com/delta-io/connectors/tree/master/flink >
>  > >
>  > > czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz3wu@gmail.com <mailto:
> stevenz3wu@gmail.com >> napisał(a):
>  > >
>  > > > Hi Yun,
>  > > >
>  > > > Thanks a lot for the reply!
>  > > >
>  > > > While we can add the global committer in the WithPostCommitTopology,
>  > the
>  > > > semantics are weird. The Commit stage actually didn't commit
> anything
>  > to
>  > > > the Iceberg table, and the PostCommit stage is where the Iceberg
> commit
>  > > > happens.
>  > > >
>  > > > I just took a quick look at DeltaLake Flink sink. It still uses the
> V1
>  > > sink
>  > > > interface [1]. I think it might have the same issue when switching
> to
>  > the
>  > > > V2 sink interface.
>  > > >
>  > > > For data lake storages (like Iceberg, DeltaLake) or any storage with
>  > > global
>  > > > transactional commit, it would be more natural to have a built-in
>  > > > concept/support of GlobalCommitter in the sink v2 interface.
>  > > >
>  > > > Thanks,
>  > > > Steven
>  > > >
>  > > > [1]
>  > > >
>  > > >
>  > >
>  >
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
> <
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
> >
>  > > >
>  > > >
>  > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao <yungao.gy@aliyun.com.invalid
> >
>  > > > wrote:
>  > > >
>  > > > > Hi Steven, Liwei,
>  > > > > Very sorry for missing this mail and response very late.
>  > > > > I think the initial thought is indeed to use
> `WithPostCommitTopology`
>  > > as
>  > > > > a replacement of the original GlobalCommitter, and currently the
>  > > adapter
>  > > > of
>  > > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink V1
>  > > > > interface
>  > > > > onto an implementation of `WithPostCommitTopology`.
>  > > > > Since `WithPostCommitTopology` supports arbitrary subgraph, thus
> It
>  > > seems
>  > > > > to
>  > > > > me it could support both global committer and small file
> compaction?
>  > We
>  > > > > might
>  > > > > have an `WithPostCommitTopology` implementation like
>  > > > > DataStream ds = add global committer;
>  > > > > if (enable file compaction) {
>  > > > > build the compaction subgraph from ds
>  > > > > }
>  > > > > Best,
>  > > > > Yun
>  > > > > [1]
>  > > > >
>  > > >
>  > >
>  >
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> <
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> >
>  > > > > <
>  > > > >
>  > > >
>  > >
>  >
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> <
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> >
>  > > > > >
>  > > > > ------------------------------------------------------------------
>  > > > > From:Steven Wu <stevenz3wu@gmail.com <mailto:stevenz3wu@gmail.com
> >>
>  > > > > Send Time:2022 Aug. 17 (Wed.) 07:30
>  > > > > To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org >>;
> hililiwei <hililiwei@gmail.com <mailto:hililiwei@gmail.com >>
>  > > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter
>  > > > > > Plus, it will disable the future capability of small file
>  > compaction
>  > > > > stage post commit.
>  > > > > I should clarify this comment. if we are using the
>  > > > `WithPostCommitTopology`
>  > > > > for global committer, we would lose the capability of using the
> post
>  > > > commit
>  > > > > stage for small files compaction.
>  > > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <stevenz3wu@gmail.com
> <mailto:stevenz3wu@gmail.com >>
>  > > wrote:
>  > > > > >
>  > > > > > In the V1 sink interface, there is a GlobalCommitter for
> Iceberg.
>  > > With
>  > > > > the
>  > > > > > V2 sink interface, GlobalCommitter has been deprecated by
>  > > > > > WithPostCommitTopology. I thought the post commit stage is
> mainly
>  > for
>  > > > > async
>  > > > > > maintenance (like compaction).
>  > > > > >
>  > > > > > Are we supposed to do sth similar to the
>  > GlobalCommittingSinkAdapter?
>  > > > It
>  > > > > > seems like a temporary transition plan for bridging v1 sinks to
> v2
>  > > > > > interfaces.
>  > > > > >
>  > > > > > private class GlobalCommittingSinkAdapter extends
>  > > > > TwoPhaseCommittingSinkAdapter
>  > > > > > implements WithPostCommitTopology<InputT, CommT> {
>  > > > > > @Override
>  > > > > > public void
>  > > addPostCommitTopology(DataStream<CommittableMessage<CommT>>
>  > > > > committables) {
>  > > > > > StandardSinkTopologies.addGlobalCommitter(
>  > > > > > committables,
>  > > > > > GlobalCommitterAdapter::new,
>  > > > > > () -> sink.getCommittableSerializer().get());
>  > > > > > }
>  > > > > > }
>  > > > > >
>  > > > > >
>  > > > > > In the Iceberg PR [1] for adopting the new sink interface, Liwei
>  > used
>  > > > the
>  > > > > > "global" partitioner to force all committables go to a single
>  > > committer
>  > > > > > task 0. It will effectively force a global committer disguised
> in
>  > the
>  > > > > > parallel committers. It is a little weird and also can lead to
>  > > > questions
>  > > > > > why other committer tasks are not getting any messages. Plus, it
>  > will
>  > > > > > disable the future capability of small file compaction stage
> post
>  > > > commit.
>  > > > > > Hence, I am asking what is the right approach to achieve global
>  > > > committer
>  > > > > > behavior.
>  > > > > >
>  > > > > > Thanks,
>  > > > > > Steven
>  > > > > >
>  > > > > > [1]
> https://github.com/apache/iceberg/pull/4904/files#r946975047 <
> https://github.com/apache/iceberg/pull/4904/files#r946975047 > <
>  > > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 <
> https://github.com/apache/iceberg/pull/4904/files#r946975047 > >
>  > > > > >
>  > > > >
>  > > >
>  > >
>  >
>

Re: Sink V2 interface replacement for GlobalCommitter

Posted by Yun Gao <yu...@aliyun.com.INVALID>.
Hi,
Very sorry for the late reply for being in the holiday. 
And also very thanks for the discussion, it also reminds me 
one more background on the change of the GlobalCommitter:
When we are refactoring the job finish process in FLIP-147 to
ensures all the records could be committed at the end of bounded
streaming job, we have to desert the support for the cascade commits, 
which makes the cascade commit of `committer -> global committer` not work
in all cases. 
For the current issues, one possible alternative option from my side is that we
may support setting the committer parallelism to 1. Could this option solves
the issue in the current scenarios? I'll also have a double check with if
it could be implemented and the failed tests Krzysztof met. 
Best,
Yun
------------------------------------------------------------------
From:Steven Wu <st...@gmail.com>
Send Time:2022 Sep. 10 (Sat.) 11:31
To:dev <de...@flink.apache.org>
Cc:Yun Gao <yu...@aliyun.com>; hililiwei <hi...@gmail.com>
Subject:Re: Sink V2 interface replacement for GlobalCommitter
Martjin, thanks a lot for chiming in!
Here are my concerns with adding GlobalCommitter in the PostCommitTopology 
1. when we use TwoPhaseCommittingSink. We would need to create a noop/dummy committer. Actual Iceberg/DeltaLake commits happen in the PostCommit stage. The PostCommit stage should be doing some work after the commit (not for the commit).
2. GlobalCommitter is marked as @deprecated. It will be removed at a certain point. What then?
Thanks,
Steven
On Fri, Sep 9, 2022 at 1:20 PM Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com >> wrote:
Thanks Martijn,
 I'm actually trying to run our V1 Delta connector on Flink 1.15 using
 SinkV1Adapter with GlobalCommitterOperator.
 Having said that, I might have found a potential issue with
 GlobalCommitterOperator, checkpoitining and failover recovery [1].
 For "normal" scenarios it does look good though.
 Regards,
 Krzysztof Chmielewski
 [1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc <https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc >
 pt., 9 wrz 2022 o 20:49 Martijn Visser <martijnvisser@apache.org <mailto:martijnvisser@apache.org >>
 napisał(a):
 > Hi all,
 >
 > A couple of bits from when work was being done on the new sink: V1 is
 > completely simulated as V2 [1]. V2 is strictly more expressive.
 >
 > If there's desire to stick to the `GlobalCommitter` interface, have a
 > look at the StandardSinkTopologies. Or you can just add your own more
 > fitting PostCommitTopology. The important part to remember is that this
 > topology is lagging one checkpoint behind in terms of fault-tolerance: it
 > only receives data once the committer committed
 > on notifyCheckpointComplete. Thus, the global committer needs to be
 > idempotent and able to restore the actual state on recovery. That
 > limitation is coming in from Flink's checkpointing behaviour and applies to
 > both V1 and V2. GlobalCommitterOperator is abstracting these issues along
 > with handling retries (so commits that happen much later). So it's probably
 > a good place to start just with the standard topology.
 >
 > Best regards,
 >
 > Martijn
 >
 > [1]
 >
 > https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370 <https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370 >
 >
 > Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski <
 > krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com >>:
 >
 > > Hi,
 > > Krzysztof Chmielewski [1] from Delta-Flink connector open source
 > community
 > > here [2].
 > >
 > > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is
 > > something exactly what Flink-Delta Sink needs since it is the place where
 > > we do an actual commit to the Delta Log which should be done from a one
 > > place/instance.
 > >
 > > Currently I'm evaluating V2 for our connector and having, how Steven
 > > described it a "more natural, built-in concept/support of GlobalCommitter
 > > in the sink v2 interface" would be greatly appreciated.
 > >
 > > Cheers,
 > > Krzysztof Chmielewski
 > >
 > > [1] https://github.com/kristoffSC <https://github.com/kristoffSC >
 > > [2] https://github.com/delta-io/connectors/tree/master/flink <https://github.com/delta-io/connectors/tree/master/flink >
 > >
 > > czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz3wu@gmail.com <mailto:stevenz3wu@gmail.com >> napisał(a):
 > >
 > > > Hi Yun,
 > > >
 > > > Thanks a lot for the reply!
 > > >
 > > > While we can add the global committer in the WithPostCommitTopology,
 > the
 > > > semantics are weird. The Commit stage actually didn't commit anything
 > to
 > > > the Iceberg table, and the PostCommit stage is where the Iceberg commit
 > > > happens.
 > > >
 > > > I just took a quick look at DeltaLake Flink sink. It still uses the V1
 > > sink
 > > > interface [1]. I think it might have the same issue when switching to
 > the
 > > > V2 sink interface.
 > > >
 > > > For data lake storages (like Iceberg, DeltaLake) or any storage with
 > > global
 > > > transactional commit, it would be more natural to have a built-in
 > > > concept/support of GlobalCommitter in the sink v2 interface.
 > > >
 > > > Thanks,
 > > > Steven
 > > >
 > > > [1]
 > > >
 > > >
 > >
 > https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java <https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java >
 > > >
 > > >
 > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao <yu...@aliyun.com.invalid>
 > > > wrote:
 > > >
 > > > > Hi Steven, Liwei,
 > > > > Very sorry for missing this mail and response very late.
 > > > > I think the initial thought is indeed to use `WithPostCommitTopology`
 > > as
 > > > > a replacement of the original GlobalCommitter, and currently the
 > > adapter
 > > > of
 > > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink V1
 > > > > interface
 > > > > onto an implementation of `WithPostCommitTopology`.
 > > > > Since `WithPostCommitTopology` supports arbitrary subgraph, thus It
 > > seems
 > > > > to
 > > > > me it could support both global committer and small file compaction?
 > We
 > > > > might
 > > > > have an `WithPostCommitTopology` implementation like
 > > > > DataStream ds = add global committer;
 > > > > if (enable file compaction) {
 > > > > build the compaction subgraph from ds
 > > > > }
 > > > > Best,
 > > > > Yun
 > > > > [1]
 > > > >
 > > >
 > >
 > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 <https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 >
 > > > > <
 > > > >
 > > >
 > >
 > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 <https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 >
 > > > > >
 > > > > ------------------------------------------------------------------
 > > > > From:Steven Wu <stevenz3wu@gmail.com <mailto:stevenz3wu@gmail.com >>
 > > > > Send Time:2022 Aug. 17 (Wed.) 07:30
 > > > > To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org >>; hililiwei <hililiwei@gmail.com <mailto:hililiwei@gmail.com >>
 > > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter
 > > > > > Plus, it will disable the future capability of small file
 > compaction
 > > > > stage post commit.
 > > > > I should clarify this comment. if we are using the
 > > > `WithPostCommitTopology`
 > > > > for global committer, we would lose the capability of using the post
 > > > commit
 > > > > stage for small files compaction.
 > > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <stevenz3wu@gmail.com <mailto:stevenz3wu@gmail.com >>
 > > wrote:
 > > > > >
 > > > > > In the V1 sink interface, there is a GlobalCommitter for Iceberg.
 > > With
 > > > > the
 > > > > > V2 sink interface, GlobalCommitter has been deprecated by
 > > > > > WithPostCommitTopology. I thought the post commit stage is mainly
 > for
 > > > > async
 > > > > > maintenance (like compaction).
 > > > > >
 > > > > > Are we supposed to do sth similar to the
 > GlobalCommittingSinkAdapter?
 > > > It
 > > > > > seems like a temporary transition plan for bridging v1 sinks to v2
 > > > > > interfaces.
 > > > > >
 > > > > > private class GlobalCommittingSinkAdapter extends
 > > > > TwoPhaseCommittingSinkAdapter
 > > > > > implements WithPostCommitTopology<InputT, CommT> {
 > > > > > @Override
 > > > > > public void
 > > addPostCommitTopology(DataStream<CommittableMessage<CommT>>
 > > > > committables) {
 > > > > > StandardSinkTopologies.addGlobalCommitter(
 > > > > > committables,
 > > > > > GlobalCommitterAdapter::new,
 > > > > > () -> sink.getCommittableSerializer().get());
 > > > > > }
 > > > > > }
 > > > > >
 > > > > >
 > > > > > In the Iceberg PR [1] for adopting the new sink interface, Liwei
 > used
 > > > the
 > > > > > "global" partitioner to force all committables go to a single
 > > committer
 > > > > > task 0. It will effectively force a global committer disguised in
 > the
 > > > > > parallel committers. It is a little weird and also can lead to
 > > > questions
 > > > > > why other committer tasks are not getting any messages. Plus, it
 > will
 > > > > > disable the future capability of small file compaction stage post
 > > > commit.
 > > > > > Hence, I am asking what is the right approach to achieve global
 > > > committer
 > > > > > behavior.
 > > > > >
 > > > > > Thanks,
 > > > > > Steven
 > > > > >
 > > > > > [1] https://github.com/apache/iceberg/pull/4904/files#r946975047 <https://github.com/apache/iceberg/pull/4904/files#r946975047 > <
 > > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 <https://github.com/apache/iceberg/pull/4904/files#r946975047 > >
 > > > > >
 > > > >
 > > >
 > >
 >

Re: Sink V2 interface replacement for GlobalCommitter

Posted by Steven Wu <st...@gmail.com>.
Martjin, thanks a lot for chiming in!

Here are my concerns with adding GlobalCommitter in the PostCommitTopology
1. when we use TwoPhaseCommittingSink. We would need to create a noop/dummy
committer. Actual Iceberg/DeltaLake commits happen in the PostCommit stage.
The PostCommit stage should be doing some work after the commit (not for
the commit).
2. GlobalCommitter is marked as @deprecated. It will be removed at a
certain point. What then?

Thanks,
Steven

On Fri, Sep 9, 2022 at 1:20 PM Krzysztof Chmielewski <
krzysiek.chmielewski@gmail.com> wrote:

> Thanks Martijn,
> I'm actually trying to run our V1 Delta connector on Flink 1.15 using
> SinkV1Adapter with GlobalCommitterOperator.
>
> Having said that, I might have found a potential issue with
> GlobalCommitterOperator, checkpoitining and failover recovery [1].
> For "normal" scenarios it does look good though.
>
> Regards,
> Krzysztof Chmielewski
>
> [1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc
>
> pt., 9 wrz 2022 o 20:49 Martijn Visser <ma...@apache.org>
> napisał(a):
>
> > Hi all,
> >
> > A couple of bits from when work was being done on the new sink: V1 is
> > completely simulated as V2 [1]. V2 is strictly more expressive.
> >
> > If there's desire to stick to the `GlobalCommitter` interface, have a
> > look at the StandardSinkTopologies. Or you can just add your own more
> > fitting PostCommitTopology. The important part to remember is that this
> > topology is lagging one checkpoint behind in terms of fault-tolerance: it
> > only receives data once the committer committed
> > on notifyCheckpointComplete. Thus, the global committer needs to be
> > idempotent and able to restore the actual state on recovery. That
> > limitation is coming in from Flink's checkpointing behaviour and applies
> to
> > both V1 and V2. GlobalCommitterOperator is abstracting these issues along
> > with handling retries (so commits that happen much later). So it's
> probably
> > a good place to start just with the standard topology.
> >
> > Best regards,
> >
> > Martijn
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
> >
> > Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski <
> > krzysiek.chmielewski@gmail.com>:
> >
> > > Hi,
> > > Krzysztof Chmielewski [1] from Delta-Flink connector open source
> > community
> > > here [2].
> > >
> > > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is
> > > something exactly what Flink-Delta Sink needs since it is the place
> where
> > > we do an actual commit to the Delta Log which should be done from a one
> > > place/instance.
> > >
> > > Currently I'm evaluating V2 for our connector and having, how Steven
> > > described it a "more natural, built-in concept/support of
> GlobalCommitter
> > > in the sink v2 interface" would be greatly appreciated.
> > >
> > > Cheers,
> > > Krzysztof Chmielewski
> > >
> > > [1] https://github.com/kristoffSC
> > > [2] https://github.com/delta-io/connectors/tree/master/flink
> > >
> > > czw., 8 wrz 2022 o 19:51 Steven Wu <st...@gmail.com> napisał(a):
> > >
> > > > Hi Yun,
> > > >
> > > > Thanks a lot for the reply!
> > > >
> > > > While we can add the global committer in the WithPostCommitTopology,
> > the
> > > > semantics are weird. The Commit stage actually didn't commit anything
> > to
> > > > the Iceberg table, and the PostCommit stage is where the Iceberg
> commit
> > > > happens.
> > > >
> > > > I just took a quick look at DeltaLake Flink sink. It still uses the
> V1
> > > sink
> > > > interface [1]. I think it might have the same issue when switching to
> > the
> > > > V2 sink interface.
> > > >
> > > > For data lake storages (like Iceberg, DeltaLake) or any storage with
> > > global
> > > > transactional commit, it would be more natural to have a built-in
> > > > concept/support of GlobalCommitter in the sink v2 interface.
> > > >
> > > > Thanks,
> > > > Steven
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
> > > >
> > > >
> > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao <yungao.gy@aliyun.com.invalid
> >
> > > > wrote:
> > > >
> > > > > Hi Steven, Liwei,
> > > > > Very sorry for missing this mail and response very late.
> > > > > I think the initial thought is indeed to use
> `WithPostCommitTopology`
> > > as
> > > > > a replacement of the original GlobalCommitter, and currently the
> > > adapter
> > > > of
> > > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink V1
> > > > > interface
> > > > > onto an implementation of `WithPostCommitTopology`.
> > > > > Since `WithPostCommitTopology` supports arbitrary subgraph, thus It
> > > seems
> > > > > to
> > > > > me it could support both global committer and small file
> compaction?
> > We
> > > > > might
> > > > > have an `WithPostCommitTopology` implementation like
> > > > > DataStream ds = add global committer;
> > > > > if (enable file compaction) {
> > > > >  build the compaction subgraph from ds
> > > > > }
> > > > > Best,
> > > > > Yun
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> > > > > <
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> > > > > >
> > > > > ------------------------------------------------------------------
> > > > > From:Steven Wu <st...@gmail.com>
> > > > > Send Time:2022 Aug. 17 (Wed.) 07:30
> > > > > To:dev <de...@flink.apache.org>; hililiwei <hi...@gmail.com>
> > > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter
> > > > > > Plus, it will disable the future capability of small file
> > compaction
> > > > > stage post commit.
> > > > > I should clarify this comment. if we are using the
> > > > `WithPostCommitTopology`
> > > > > for global committer, we would lose the capability of using the
> post
> > > > commit
> > > > > stage for small files compaction.
> > > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <st...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > In the V1 sink interface, there is a GlobalCommitter for Iceberg.
> > > With
> > > > > the
> > > > > > V2 sink interface, GlobalCommitter has been deprecated by
> > > > > > WithPostCommitTopology. I thought the post commit stage is mainly
> > for
> > > > > async
> > > > > > maintenance (like compaction).
> > > > > >
> > > > > > Are we supposed to do sth similar to the
> > GlobalCommittingSinkAdapter?
> > > > It
> > > > > > seems like a temporary transition plan for bridging v1 sinks to
> v2
> > > > > > interfaces.
> > > > > >
> > > > > > private class GlobalCommittingSinkAdapter extends
> > > > > TwoPhaseCommittingSinkAdapter
> > > > > > implements WithPostCommitTopology<InputT, CommT> {
> > > > > > @Override
> > > > > > public void
> > > addPostCommitTopology(DataStream<CommittableMessage<CommT>>
> > > > > committables) {
> > > > > > StandardSinkTopologies.addGlobalCommitter(
> > > > > > committables,
> > > > > > GlobalCommitterAdapter::new,
> > > > > > () -> sink.getCommittableSerializer().get());
> > > > > > }
> > > > > > }
> > > > > >
> > > > > >
> > > > > > In the Iceberg PR [1] for adopting the new sink interface, Liwei
> > used
> > > > the
> > > > > > "global" partitioner to force all committables go to a single
> > > committer
> > > > > > task 0. It will effectively force a global committer disguised in
> > the
> > > > > > parallel committers. It is a little weird and also can lead to
> > > > questions
> > > > > > why other committer tasks are not getting any messages. Plus, it
> > will
> > > > > > disable the future capability of small file compaction stage post
> > > > commit.
> > > > > > Hence, I am asking what is the right approach to achieve global
> > > > committer
> > > > > > behavior.
> > > > > >
> > > > > > Thanks,
> > > > > > Steven
> > > > > >
> > > > > > [1] https://github.com/apache/iceberg/pull/4904/files#r946975047
> <
> > > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Sink V2 interface replacement for GlobalCommitter

Posted by Krzysztof Chmielewski <kr...@gmail.com>.
Thanks Martijn,
I'm actually trying to run our V1 Delta connector on Flink 1.15 using
SinkV1Adapter with GlobalCommitterOperator.

Having said that, I might have found a potential issue with
GlobalCommitterOperator, checkpoitining and failover recovery [1].
For "normal" scenarios it does look good though.

Regards,
Krzysztof Chmielewski

[1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc

pt., 9 wrz 2022 o 20:49 Martijn Visser <ma...@apache.org>
napisał(a):

> Hi all,
>
> A couple of bits from when work was being done on the new sink: V1 is
> completely simulated as V2 [1]. V2 is strictly more expressive.
>
> If there's desire to stick to the `GlobalCommitter` interface, have a
> look at the StandardSinkTopologies. Or you can just add your own more
> fitting PostCommitTopology. The important part to remember is that this
> topology is lagging one checkpoint behind in terms of fault-tolerance: it
> only receives data once the committer committed
> on notifyCheckpointComplete. Thus, the global committer needs to be
> idempotent and able to restore the actual state on recovery. That
> limitation is coming in from Flink's checkpointing behaviour and applies to
> both V1 and V2. GlobalCommitterOperator is abstracting these issues along
> with handling retries (so commits that happen much later). So it's probably
> a good place to start just with the standard topology.
>
> Best regards,
>
> Martijn
>
> [1]
>
> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
>
> Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski <
> krzysiek.chmielewski@gmail.com>:
>
> > Hi,
> > Krzysztof Chmielewski [1] from Delta-Flink connector open source
> community
> > here [2].
> >
> > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is
> > something exactly what Flink-Delta Sink needs since it is the place where
> > we do an actual commit to the Delta Log which should be done from a one
> > place/instance.
> >
> > Currently I'm evaluating V2 for our connector and having, how Steven
> > described it a "more natural, built-in concept/support of GlobalCommitter
> > in the sink v2 interface" would be greatly appreciated.
> >
> > Cheers,
> > Krzysztof Chmielewski
> >
> > [1] https://github.com/kristoffSC
> > [2] https://github.com/delta-io/connectors/tree/master/flink
> >
> > czw., 8 wrz 2022 o 19:51 Steven Wu <st...@gmail.com> napisał(a):
> >
> > > Hi Yun,
> > >
> > > Thanks a lot for the reply!
> > >
> > > While we can add the global committer in the WithPostCommitTopology,
> the
> > > semantics are weird. The Commit stage actually didn't commit anything
> to
> > > the Iceberg table, and the PostCommit stage is where the Iceberg commit
> > > happens.
> > >
> > > I just took a quick look at DeltaLake Flink sink. It still uses the V1
> > sink
> > > interface [1]. I think it might have the same issue when switching to
> the
> > > V2 sink interface.
> > >
> > > For data lake storages (like Iceberg, DeltaLake) or any storage with
> > global
> > > transactional commit, it would be more natural to have a built-in
> > > concept/support of GlobalCommitter in the sink v2 interface.
> > >
> > > Thanks,
> > > Steven
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
> > >
> > >
> > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao <yu...@aliyun.com.invalid>
> > > wrote:
> > >
> > > > Hi Steven, Liwei,
> > > > Very sorry for missing this mail and response very late.
> > > > I think the initial thought is indeed to use `WithPostCommitTopology`
> > as
> > > > a replacement of the original GlobalCommitter, and currently the
> > adapter
> > > of
> > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink V1
> > > > interface
> > > > onto an implementation of `WithPostCommitTopology`.
> > > > Since `WithPostCommitTopology` supports arbitrary subgraph, thus It
> > seems
> > > > to
> > > > me it could support both global committer and small file compaction?
> We
> > > > might
> > > > have an `WithPostCommitTopology` implementation like
> > > > DataStream ds = add global committer;
> > > > if (enable file compaction) {
> > > >  build the compaction subgraph from ds
> > > > }
> > > > Best,
> > > > Yun
> > > > [1]
> > > >
> > >
> >
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> > > > <
> > > >
> > >
> >
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> > > > >
> > > > ------------------------------------------------------------------
> > > > From:Steven Wu <st...@gmail.com>
> > > > Send Time:2022 Aug. 17 (Wed.) 07:30
> > > > To:dev <de...@flink.apache.org>; hililiwei <hi...@gmail.com>
> > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter
> > > > > Plus, it will disable the future capability of small file
> compaction
> > > > stage post commit.
> > > > I should clarify this comment. if we are using the
> > > `WithPostCommitTopology`
> > > > for global committer, we would lose the capability of using the post
> > > commit
> > > > stage for small files compaction.
> > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <st...@gmail.com>
> > wrote:
> > > > >
> > > > > In the V1 sink interface, there is a GlobalCommitter for Iceberg.
> > With
> > > > the
> > > > > V2 sink interface, GlobalCommitter has been deprecated by
> > > > > WithPostCommitTopology. I thought the post commit stage is mainly
> for
> > > > async
> > > > > maintenance (like compaction).
> > > > >
> > > > > Are we supposed to do sth similar to the
> GlobalCommittingSinkAdapter?
> > > It
> > > > > seems like a temporary transition plan for bridging v1 sinks to v2
> > > > > interfaces.
> > > > >
> > > > > private class GlobalCommittingSinkAdapter extends
> > > > TwoPhaseCommittingSinkAdapter
> > > > > implements WithPostCommitTopology<InputT, CommT> {
> > > > > @Override
> > > > > public void
> > addPostCommitTopology(DataStream<CommittableMessage<CommT>>
> > > > committables) {
> > > > > StandardSinkTopologies.addGlobalCommitter(
> > > > > committables,
> > > > > GlobalCommitterAdapter::new,
> > > > > () -> sink.getCommittableSerializer().get());
> > > > > }
> > > > > }
> > > > >
> > > > >
> > > > > In the Iceberg PR [1] for adopting the new sink interface, Liwei
> used
> > > the
> > > > > "global" partitioner to force all committables go to a single
> > committer
> > > > > task 0. It will effectively force a global committer disguised in
> the
> > > > > parallel committers. It is a little weird and also can lead to
> > > questions
> > > > > why other committer tasks are not getting any messages. Plus, it
> will
> > > > > disable the future capability of small file compaction stage post
> > > commit.
> > > > > Hence, I am asking what is the right approach to achieve global
> > > committer
> > > > > behavior.
> > > > >
> > > > > Thanks,
> > > > > Steven
> > > > >
> > > > > [1] https://github.com/apache/iceberg/pull/4904/files#r946975047 <
> > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 >
> > > > >
> > > >
> > >
> >
>

Re: Sink V2 interface replacement for GlobalCommitter

Posted by Martijn Visser <ma...@apache.org>.
Hi all,

A couple of bits from when work was being done on the new sink: V1 is
completely simulated as V2 [1]. V2 is strictly more expressive.

If there's desire to stick to the `GlobalCommitter` interface, have a
look at the StandardSinkTopologies. Or you can just add your own more
fitting PostCommitTopology. The important part to remember is that this
topology is lagging one checkpoint behind in terms of fault-tolerance: it
only receives data once the committer committed
on notifyCheckpointComplete. Thus, the global committer needs to be
idempotent and able to restore the actual state on recovery. That
limitation is coming in from Flink's checkpointing behaviour and applies to
both V1 and V2. GlobalCommitterOperator is abstracting these issues along
with handling retries (so commits that happen much later). So it's probably
a good place to start just with the standard topology.

Best regards,

Martijn

[1]
https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370

Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski <
krzysiek.chmielewski@gmail.com>:

> Hi,
> Krzysztof Chmielewski [1] from Delta-Flink connector open source community
> here [2].
>
> I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is
> something exactly what Flink-Delta Sink needs since it is the place where
> we do an actual commit to the Delta Log which should be done from a one
> place/instance.
>
> Currently I'm evaluating V2 for our connector and having, how Steven
> described it a "more natural, built-in concept/support of GlobalCommitter
> in the sink v2 interface" would be greatly appreciated.
>
> Cheers,
> Krzysztof Chmielewski
>
> [1] https://github.com/kristoffSC
> [2] https://github.com/delta-io/connectors/tree/master/flink
>
> czw., 8 wrz 2022 o 19:51 Steven Wu <st...@gmail.com> napisał(a):
>
> > Hi Yun,
> >
> > Thanks a lot for the reply!
> >
> > While we can add the global committer in the WithPostCommitTopology, the
> > semantics are weird. The Commit stage actually didn't commit anything to
> > the Iceberg table, and the PostCommit stage is where the Iceberg commit
> > happens.
> >
> > I just took a quick look at DeltaLake Flink sink. It still uses the V1
> sink
> > interface [1]. I think it might have the same issue when switching to the
> > V2 sink interface.
> >
> > For data lake storages (like Iceberg, DeltaLake) or any storage with
> global
> > transactional commit, it would be more natural to have a built-in
> > concept/support of GlobalCommitter in the sink v2 interface.
> >
> > Thanks,
> > Steven
> >
> > [1]
> >
> >
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
> >
> >
> > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao <yu...@aliyun.com.invalid>
> > wrote:
> >
> > > Hi Steven, Liwei,
> > > Very sorry for missing this mail and response very late.
> > > I think the initial thought is indeed to use `WithPostCommitTopology`
> as
> > > a replacement of the original GlobalCommitter, and currently the
> adapter
> > of
> > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink V1
> > > interface
> > > onto an implementation of `WithPostCommitTopology`.
> > > Since `WithPostCommitTopology` supports arbitrary subgraph, thus It
> seems
> > > to
> > > me it could support both global committer and small file compaction? We
> > > might
> > > have an `WithPostCommitTopology` implementation like
> > > DataStream ds = add global committer;
> > > if (enable file compaction) {
> > >  build the compaction subgraph from ds
> > > }
> > > Best,
> > > Yun
> > > [1]
> > >
> >
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> > > <
> > >
> >
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> > > >
> > > ------------------------------------------------------------------
> > > From:Steven Wu <st...@gmail.com>
> > > Send Time:2022 Aug. 17 (Wed.) 07:30
> > > To:dev <de...@flink.apache.org>; hililiwei <hi...@gmail.com>
> > > Subject:Re: Sink V2 interface replacement for GlobalCommitter
> > > > Plus, it will disable the future capability of small file compaction
> > > stage post commit.
> > > I should clarify this comment. if we are using the
> > `WithPostCommitTopology`
> > > for global committer, we would lose the capability of using the post
> > commit
> > > stage for small files compaction.
> > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <st...@gmail.com>
> wrote:
> > > >
> > > > In the V1 sink interface, there is a GlobalCommitter for Iceberg.
> With
> > > the
> > > > V2 sink interface, GlobalCommitter has been deprecated by
> > > > WithPostCommitTopology. I thought the post commit stage is mainly for
> > > async
> > > > maintenance (like compaction).
> > > >
> > > > Are we supposed to do sth similar to the GlobalCommittingSinkAdapter?
> > It
> > > > seems like a temporary transition plan for bridging v1 sinks to v2
> > > > interfaces.
> > > >
> > > > private class GlobalCommittingSinkAdapter extends
> > > TwoPhaseCommittingSinkAdapter
> > > > implements WithPostCommitTopology<InputT, CommT> {
> > > > @Override
> > > > public void
> addPostCommitTopology(DataStream<CommittableMessage<CommT>>
> > > committables) {
> > > > StandardSinkTopologies.addGlobalCommitter(
> > > > committables,
> > > > GlobalCommitterAdapter::new,
> > > > () -> sink.getCommittableSerializer().get());
> > > > }
> > > > }
> > > >
> > > >
> > > > In the Iceberg PR [1] for adopting the new sink interface, Liwei used
> > the
> > > > "global" partitioner to force all committables go to a single
> committer
> > > > task 0. It will effectively force a global committer disguised in the
> > > > parallel committers. It is a little weird and also can lead to
> > questions
> > > > why other committer tasks are not getting any messages. Plus, it will
> > > > disable the future capability of small file compaction stage post
> > commit.
> > > > Hence, I am asking what is the right approach to achieve global
> > committer
> > > > behavior.
> > > >
> > > > Thanks,
> > > > Steven
> > > >
> > > > [1] https://github.com/apache/iceberg/pull/4904/files#r946975047 <
> > > https://github.com/apache/iceberg/pull/4904/files#r946975047 >
> > > >
> > >
> >
>

Re: Sink V2 interface replacement for GlobalCommitter

Posted by Krzysztof Chmielewski <kr...@gmail.com>.
Hi,
Krzysztof Chmielewski [1] from Delta-Flink connector open source community
here [2].

I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is
something exactly what Flink-Delta Sink needs since it is the place where
we do an actual commit to the Delta Log which should be done from a one
place/instance.

Currently I'm evaluating V2 for our connector and having, how Steven
described it a "more natural, built-in concept/support of GlobalCommitter
in the sink v2 interface" would be greatly appreciated.

Cheers,
Krzysztof Chmielewski

[1] https://github.com/kristoffSC
[2] https://github.com/delta-io/connectors/tree/master/flink

czw., 8 wrz 2022 o 19:51 Steven Wu <st...@gmail.com> napisał(a):

> Hi Yun,
>
> Thanks a lot for the reply!
>
> While we can add the global committer in the WithPostCommitTopology, the
> semantics are weird. The Commit stage actually didn't commit anything to
> the Iceberg table, and the PostCommit stage is where the Iceberg commit
> happens.
>
> I just took a quick look at DeltaLake Flink sink. It still uses the V1 sink
> interface [1]. I think it might have the same issue when switching to the
> V2 sink interface.
>
> For data lake storages (like Iceberg, DeltaLake) or any storage with global
> transactional commit, it would be more natural to have a built-in
> concept/support of GlobalCommitter in the sink v2 interface.
>
> Thanks,
> Steven
>
> [1]
>
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
>
>
> On Wed, Sep 7, 2022 at 2:15 AM Yun Gao <yu...@aliyun.com.invalid>
> wrote:
>
> > Hi Steven, Liwei,
> > Very sorry for missing this mail and response very late.
> > I think the initial thought is indeed to use `WithPostCommitTopology` as
> > a replacement of the original GlobalCommitter, and currently the adapter
> of
> > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink V1
> > interface
> > onto an implementation of `WithPostCommitTopology`.
> > Since `WithPostCommitTopology` supports arbitrary subgraph, thus It seems
> > to
> > me it could support both global committer and small file compaction? We
> > might
> > have an `WithPostCommitTopology` implementation like
> > DataStream ds = add global committer;
> > if (enable file compaction) {
> >  build the compaction subgraph from ds
> > }
> > Best,
> > Yun
> > [1]
> >
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> > <
> >
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> > >
> > ------------------------------------------------------------------
> > From:Steven Wu <st...@gmail.com>
> > Send Time:2022 Aug. 17 (Wed.) 07:30
> > To:dev <de...@flink.apache.org>; hililiwei <hi...@gmail.com>
> > Subject:Re: Sink V2 interface replacement for GlobalCommitter
> > > Plus, it will disable the future capability of small file compaction
> > stage post commit.
> > I should clarify this comment. if we are using the
> `WithPostCommitTopology`
> > for global committer, we would lose the capability of using the post
> commit
> > stage for small files compaction.
> > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <st...@gmail.com> wrote:
> > >
> > > In the V1 sink interface, there is a GlobalCommitter for Iceberg. With
> > the
> > > V2 sink interface, GlobalCommitter has been deprecated by
> > > WithPostCommitTopology. I thought the post commit stage is mainly for
> > async
> > > maintenance (like compaction).
> > >
> > > Are we supposed to do sth similar to the GlobalCommittingSinkAdapter?
> It
> > > seems like a temporary transition plan for bridging v1 sinks to v2
> > > interfaces.
> > >
> > > private class GlobalCommittingSinkAdapter extends
> > TwoPhaseCommittingSinkAdapter
> > > implements WithPostCommitTopology<InputT, CommT> {
> > > @Override
> > > public void addPostCommitTopology(DataStream<CommittableMessage<CommT>>
> > committables) {
> > > StandardSinkTopologies.addGlobalCommitter(
> > > committables,
> > > GlobalCommitterAdapter::new,
> > > () -> sink.getCommittableSerializer().get());
> > > }
> > > }
> > >
> > >
> > > In the Iceberg PR [1] for adopting the new sink interface, Liwei used
> the
> > > "global" partitioner to force all committables go to a single committer
> > > task 0. It will effectively force a global committer disguised in the
> > > parallel committers. It is a little weird and also can lead to
> questions
> > > why other committer tasks are not getting any messages. Plus, it will
> > > disable the future capability of small file compaction stage post
> commit.
> > > Hence, I am asking what is the right approach to achieve global
> committer
> > > behavior.
> > >
> > > Thanks,
> > > Steven
> > >
> > > [1] https://github.com/apache/iceberg/pull/4904/files#r946975047 <
> > https://github.com/apache/iceberg/pull/4904/files#r946975047 >
> > >
> >
>

Re: Sink V2 interface replacement for GlobalCommitter

Posted by Steven Wu <st...@gmail.com>.
Hi Yun,

Thanks a lot for the reply!

While we can add the global committer in the WithPostCommitTopology, the
semantics are weird. The Commit stage actually didn't commit anything to
the Iceberg table, and the PostCommit stage is where the Iceberg commit
happens.

I just took a quick look at DeltaLake Flink sink. It still uses the V1 sink
interface [1]. I think it might have the same issue when switching to the
V2 sink interface.

For data lake storages (like Iceberg, DeltaLake) or any storage with global
transactional commit, it would be more natural to have a built-in
concept/support of GlobalCommitter in the sink v2 interface.

Thanks,
Steven

[1]
https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java


On Wed, Sep 7, 2022 at 2:15 AM Yun Gao <yu...@aliyun.com.invalid> wrote:

> Hi Steven, Liwei,
> Very sorry for missing this mail and response very late.
> I think the initial thought is indeed to use `WithPostCommitTopology` as
> a replacement of the original GlobalCommitter, and currently the adapter of
> Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink V1
> interface
> onto an implementation of `WithPostCommitTopology`.
> Since `WithPostCommitTopology` supports arbitrary subgraph, thus It seems
> to
> me it could support both global committer and small file compaction? We
> might
> have an `WithPostCommitTopology` implementation like
> DataStream ds = add global committer;
> if (enable file compaction) {
>  build the compaction subgraph from ds
> }
> Best,
> Yun
> [1]
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> <
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> >
> ------------------------------------------------------------------
> From:Steven Wu <st...@gmail.com>
> Send Time:2022 Aug. 17 (Wed.) 07:30
> To:dev <de...@flink.apache.org>; hililiwei <hi...@gmail.com>
> Subject:Re: Sink V2 interface replacement for GlobalCommitter
> > Plus, it will disable the future capability of small file compaction
> stage post commit.
> I should clarify this comment. if we are using the `WithPostCommitTopology`
> for global committer, we would lose the capability of using the post commit
> stage for small files compaction.
> On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <st...@gmail.com> wrote:
> >
> > In the V1 sink interface, there is a GlobalCommitter for Iceberg. With
> the
> > V2 sink interface, GlobalCommitter has been deprecated by
> > WithPostCommitTopology. I thought the post commit stage is mainly for
> async
> > maintenance (like compaction).
> >
> > Are we supposed to do sth similar to the GlobalCommittingSinkAdapter? It
> > seems like a temporary transition plan for bridging v1 sinks to v2
> > interfaces.
> >
> > private class GlobalCommittingSinkAdapter extends
> TwoPhaseCommittingSinkAdapter
> > implements WithPostCommitTopology<InputT, CommT> {
> > @Override
> > public void addPostCommitTopology(DataStream<CommittableMessage<CommT>>
> committables) {
> > StandardSinkTopologies.addGlobalCommitter(
> > committables,
> > GlobalCommitterAdapter::new,
> > () -> sink.getCommittableSerializer().get());
> > }
> > }
> >
> >
> > In the Iceberg PR [1] for adopting the new sink interface, Liwei used the
> > "global" partitioner to force all committables go to a single committer
> > task 0. It will effectively force a global committer disguised in the
> > parallel committers. It is a little weird and also can lead to questions
> > why other committer tasks are not getting any messages. Plus, it will
> > disable the future capability of small file compaction stage post commit.
> > Hence, I am asking what is the right approach to achieve global committer
> > behavior.
> >
> > Thanks,
> > Steven
> >
> > [1] https://github.com/apache/iceberg/pull/4904/files#r946975047 <
> https://github.com/apache/iceberg/pull/4904/files#r946975047 >
> >
>

Re: Sink V2 interface replacement for GlobalCommitter

Posted by Yun Gao <yu...@aliyun.com.INVALID>.
Hi Steven, Liwei, 
Very sorry for missing this mail and response very late. 
I think the initial thought is indeed to use `WithPostCommitTopology` as
a replacement of the original GlobalCommitter, and currently the adapter of
Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink V1 interface
onto an implementation of `WithPostCommitTopology`.
Since `WithPostCommitTopology` supports arbitrary subgraph, thus It seems to
me it could support both global committer and small file compaction? We might
have an `WithPostCommitTopology` implementation like
DataStream ds = add global committer;
if (enable file compaction) {
 build the compaction subgraph from ds
}
Best,
Yun
[1] https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 <https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 >
------------------------------------------------------------------
From:Steven Wu <st...@gmail.com>
Send Time:2022 Aug. 17 (Wed.) 07:30
To:dev <de...@flink.apache.org>; hililiwei <hi...@gmail.com>
Subject:Re: Sink V2 interface replacement for GlobalCommitter
> Plus, it will disable the future capability of small file compaction
stage post commit.
I should clarify this comment. if we are using the `WithPostCommitTopology`
for global committer, we would lose the capability of using the post commit
stage for small files compaction.
On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <st...@gmail.com> wrote:
>
> In the V1 sink interface, there is a GlobalCommitter for Iceberg. With the
> V2 sink interface, GlobalCommitter has been deprecated by
> WithPostCommitTopology. I thought the post commit stage is mainly for async
> maintenance (like compaction).
>
> Are we supposed to do sth similar to the GlobalCommittingSinkAdapter? It
> seems like a temporary transition plan for bridging v1 sinks to v2
> interfaces.
>
> private class GlobalCommittingSinkAdapter extends TwoPhaseCommittingSinkAdapter
> implements WithPostCommitTopology<InputT, CommT> {
> @Override
> public void addPostCommitTopology(DataStream<CommittableMessage<CommT>> committables) {
> StandardSinkTopologies.addGlobalCommitter(
> committables,
> GlobalCommitterAdapter::new,
> () -> sink.getCommittableSerializer().get());
> }
> }
>
>
> In the Iceberg PR [1] for adopting the new sink interface, Liwei used the
> "global" partitioner to force all committables go to a single committer
> task 0. It will effectively force a global committer disguised in the
> parallel committers. It is a little weird and also can lead to questions
> why other committer tasks are not getting any messages. Plus, it will
> disable the future capability of small file compaction stage post commit.
> Hence, I am asking what is the right approach to achieve global committer
> behavior.
>
> Thanks,
> Steven
>
> [1] https://github.com/apache/iceberg/pull/4904/files#r946975047 <https://github.com/apache/iceberg/pull/4904/files#r946975047 >
>

Re: Sink V2 interface replacement for GlobalCommitter

Posted by Steven Wu <st...@gmail.com>.
>  Plus, it will disable the future capability of small file compaction
stage post commit.

I should clarify this comment. if we are using the `WithPostCommitTopology`
for global committer, we would lose the capability of using the post commit
stage for small files compaction.

On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <st...@gmail.com> wrote:

>
> In the V1 sink interface, there is a GlobalCommitter for Iceberg. With the
> V2 sink interface,  GlobalCommitter has been deprecated by
> WithPostCommitTopology. I thought the post commit stage is mainly for async
> maintenance (like compaction).
>
> Are we supposed to do sth similar to the GlobalCommittingSinkAdapter? It
> seems like a temporary transition plan for bridging v1 sinks to v2
> interfaces.
>
> private class GlobalCommittingSinkAdapter extends TwoPhaseCommittingSinkAdapter
>         implements WithPostCommitTopology<InputT, CommT> {
>     @Override
>     public void addPostCommitTopology(DataStream<CommittableMessage<CommT>> committables) {
>        StandardSinkTopologies.addGlobalCommitter(
>                 committables,
>                 GlobalCommitterAdapter::new,
>                 () -> sink.getCommittableSerializer().get());
>     }
> }
>
>
> In the Iceberg PR [1] for adopting the new sink interface, Liwei used the
> "global" partitioner to force all committables go to a single committer
> task 0. It will effectively force a global committer disguised in the
> parallel committers. It is a little weird and also can lead to questions
> why other committer tasks are not getting any messages. Plus, it will
> disable the future capability of small file compaction stage post commit.
> Hence, I am asking what is the right approach to achieve global committer
> behavior.
>
> Thanks,
> Steven
>
> [1] https://github.com/apache/iceberg/pull/4904/files#r946975047
>