You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Stephan Ewen <se...@apache.org> on 2021/04/13 19:14:32 UTC

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Thanks all for this discussion. Looks like there are lots of ideas and
folks that are eager to do things, so let's see how we can get this moving.

My take on this is the following:

There will probably not be one Hybrid source, but possibly multiple ones,
because of different strategies/requirements.
    - One may be very simple, with switching points known up-front. Would
be good to have this in a very simple implementation.
    - There may be one where the switch is dynamic and the readers need to
report back where they left off.
    - There may be one that switches back and forth multiple times during a
job, for example Kakfa going to DFS whenever it falls behind retention, in
order to catch up again.

This also seems hard to "design on paper"; I expect there are nuances in a
production setup that affect some details of the design. So I'd feel most
comfortable in adding a variant of the hybrid source to Flink that has been
used already in a real use case (not necessarily in production, but maybe
in a testing/staging environment, so it seems to meet all requirements).


What do you think about the following approach?
  - If there is a tested PoC, let's try to get it contributed to Flink
without trying to make it much more general.
  - When we see similar but a bit different requirements for another hybrid
source, then let's try to evolve the contributed one.
  - If we see new requirements that are so different that they don't fit
well with the existing hybrid source, then let us look at building a second
hybrid source for those requirements.

We need to make connector contributions in general more easy, and I think
it is not a bad thing to end up with different approaches and see how these
play out against each other when being used by users. For example switching
with known boundaries, dynamic switching, back-and-forth-switching, etc.
(I know some committers are planning to do some work on making
connector contributions easier, with standardized testing frameworks,
decoupled CI, etc.)

Best,
Stephan


On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise <th...@apache.org> wrote:

> Hi,
>
> As mentioned in my previous email, I had been working on a prototype for
> the hybrid source.
>
> You can find it at https://github.com/tweise/flink/pull/1
>
> It contains:
> * Switching with configurable chain of sources
> * Fixed or dynamic start positions
> * Test with MockSource and FileSource
>
> The purpose of the above PR is to gather feedback and help drive consensus
> on the FLIP.
>
> * How to support a dynamic start position within the source chain?
>
> Relevant in those (few?) cases where start positions are not known upfront.
> You can find an example of what that might look like in the tests:
>
>
> https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62
>
> https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132
>
> When switching, the enumerator of the previous source needs to
> supply information about consumed splits that allows to set the start
> position for the next source. That could be something like the last
> processed file, timestamp, etc. (Currently StaticFileSplitEnumerator
> doesn't track finished splits.)
>
> See previous discussion regarding start/end position. The prototype shows
> the use of checkpoint state with converter function.
>
> * Should readers be deployed dynamically?
>
> The prototype assumes a static source chain that is fixed at job submission
> time. Conceivably there could be use cases that require more flexibility.
> Such as switching one KafkaSource for another. A step in that direction
> would be to deploy the actual readers dynamically, at the time of switching
> source.
>
> Looking forward to feedback and suggestions for next steps!
>
> Thomas
>
> On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <th...@apache.org> wrote:
>
> > Hi Nicholas,
> >
> > Thanks for the reply. I had implemented a small PoC. It switches a
> > configurable sequence of sources with predefined bounds. I'm using the
> > unmodified MockSource for illustration. It does not require a
> "Switchable"
> > interface. I looked at the code you shared and the delegation and
> signaling
> > works quite similar. That's a good validation.
> >
> > Hi Kezhu,
> >
> > Thanks for bringing the more detailed discussion regarding the start/end
> > position. I think in most cases the start and end positions will be known
> > when the job is submitted. If we take a File -> Kafka source chain as
> > example, there would most likely be a timestamp at which we want to
> > transition from files to reading from Kafka. So we would either set the
> > start position for Kafka based on that timestamp or provide the offsets
> > directly. (Note that I'm skipping a few related nuances here. In order to
> > achieve an exact switch without duplication or gap, we may also need some
> > overlap and filtering, but that's a separate issue.)
> >
> > The point is that the start positions can be configured by the user,
> there
> > is no need to transfer any information from one source to another as part
> > of switching.
> >
> > It gets more complicated if we want to achieve a dynamic switch where the
> > transition timestamp isn't known when the job starts. For example,
> consider
> > a bootstrap scenario where the time taken to process historic data
> exceeds
> > the Kafka retention. Here, we would need to dynamically resolve the Kafka
> > start position based on where the file readers left off, when the
> switching
> > occurs. The file source enumerator would determine at runtime when it is
> > done handing splits to its readers, maybe when the max file timestamp
> > reaches (processing time - X). This information needs to be transferred
> to
> > the Kafka source.
> >
> > The timestamp would need to be derived from the file enumerator state,
> > either by looking at the last splits or explicitly. The natural way to do
> > that is to introspect the enumerator state which gets checkpointed. Any
> > other form of "end position" via a special interface would need to be
> > derived in the same manner.
> >
> > The converter that will be provided by the user would look at the file
> > enumerator state, derive the timestamp and then supply the "start
> position"
> > to the Kafka source. The Kafka source was created when the job started.
> It
> > needs to be augmented with the new start position. That can be achieved
> via
> > a special enumerator interface like
> SwitchableSplitEnumerator#setStartState
> > or by using restoreEnumerator with the checkpoint state constructed by
> the
> > converter function. I'm leaning towards the latter as long as there is a
> > convenient way to construct the state from a position (like
> > enumStateForTimestamp). The converter would map one enum state to another
> > and can be made very simple by providing a few utility functions instead
> of
> > mandating a new interface that enumerators need to implement to become
> > switchable.
> >
> > Again, a converter is only required when sources need to be switched
> based
> > on positions not known at graph construction time.
> >
> > I'm planning to add such deferred switching to the PoC for illustration
> > and will share the experiment when that's done.
> >
> > Cheers,
> > Thomas
> >
> >
> > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <pr...@163.com>
> wrote:
> >
> >> Hi Kezhu,
> >>
> >> Thanks for your detailed points for the Hybrid Source. I follow your
> >> opinions and make a corresponding explanation as follows:
> >>
> >> 1.Would the Hybrid Source be possible to use this feature to
> switch/chain
> >> multiple homogeneous sources?
> >>
> >> "HybridSource" supports to switch/chain multiple homogeneous sources,
> >> which
> >> have the respective implementation for "SwitchableSource" and
> >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether the
> >> Sources consisted is homogeneous. From the user's perspective, User only
> >> adds the "SwitchableSource" into "HybridSource" and leaves the smooth
> >> migration operation to "HybridSource".
> >>
> >> 2."setStartState" is actually a reposition operation for next source to
> >> start in job runtime?
> >>
> >> IMO, "setStartState" is used to determine the initial position of the
> new
> >> source for smooth migration, not reposition operation. More importantly,
> >> the
> >> "State" mentioned here refers to the start and end positions of reading
> >> source.
> >>
> >> 3.This conversion should be implementation detail of next source, not
> >> converter function in my opinion?
> >>
> >> The state conversion is of course an implementation detail and included
> in
> >> the switching mechanism, that should provide users with the conversion
> >> interface for conversion, which is defined in converter function. What's
> >> more, when users has already implemented "SwitchableSource" and added to
> >> the
> >> Hybrid Source, the users don't need to implement the "SwitchableSource"
> >> for
> >> the different conversion. From the user's perspective, users could
> define
> >> the different converter functions and create the "SwitchableSource" for
> >> the
> >> addition of "HybridSource", no need to implement a Source for the
> >> converter
> >> function.
> >>
> >> 4.No configurable start-position. In this situation combination of above
> >> three joints is a nop, and
> >> "HybridSource" is a chain of start-position pre-configured sources?
> >>
> >> Indeed there is no configurable start-position, and this configuration
> >> could
> >> be involved in the feature. Users could use
> >> "SwitchableSplitEnumerator#setStartState" interface or the configuration
> >> parameters to configure start-position.
> >>
> >> 5.I am wonder whether end-position is a must and how it could be useful
> >> for
> >> end users in a generic-enough source?
> >>
> >> "getEndState" interface is used for the smooth migration scenario, which
> >> could return null value if it is not needed. In the Hybrid Source
> >> mechanism,
> >> this interface is required for the switching between the sources
> >> consisted,
> >> otherwise there is no any way to get end-position of upstream source. In
> >> summary, Hybrid Source needs to be able to set the start position and
> get
> >> the end position of each Source, otherwise there is no use to build
> Hybrid
> >> Source.
> >>
> >> 6.Is it possible for converter function to do blocking operations? How
> to
> >> respond to checkpoint request when switching split enumerators cross
> >> sources? Does end-position or start-position need to be stored in
> >> checkpoint
> >> state or not?
> >>
> >> The converter function only simply converts the state of upstream source
> >> to
> >> the state of downstream source, not blocking operations. The way to
> >> respond
> >> the checkpoint request when switching split enumerators cross sources is
> >> send the corresponding "SourceEvent" to coordination. The end-position
> or
> >> start-position don't need to be stored in checkpoint state, only
> >> implements
> >> the "getEndState" interface for end-position.
> >>
> >> Best,
> >> Nicholas Jiang
> >>
> >>
> >>
> >> --
> >> Sent from:
> >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> >>
> >
>

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Posted by Thomas Weise <th...@apache.org>.
Here is a summary of where we are at with the PR:

* Added capability to construct sources at switch time through a
factory interface. This can support all previously discussed
scenarios. The simple case (sources with fixed start position) is
still simple, but for scenarios that require deferred instantiation,
sources can now be created through their respective builders at switch
time with access to the previous enumerator. This is a modification of
option 3 described previously.

* There is now unit test coverage for reader and enumerator.

* Ideas such as a universal interface for exchange of start positions
can be added on top of the current implementation. However, I would
like to keep that as exercise for the future and the scope of this
initial work contained.

* FLIP page will be updated to reflect the changes made since it was
originally created. Nicholas volunteered to take this up and also send
a VOTE thread.

Thanks all and especially Arvid for taking the time to review and discuss.

Thomas

On Tue, Jun 15, 2021 at 11:01 AM Thomas Weise <th...@apache.org> wrote:
>
> Hi Arvid,
>
> Thanks for your reply -->
>
> On Mon, Jun 14, 2021 at 2:55 PM Arvid Heise <ar...@ververica.com> wrote:
> >
> > Hi Thomas,
> >
> > Thanks for bringing this up. I think this is a tough nut to crack :/.
> > Imho 1 and 3 or 1+3 can work but it is ofc a pity if the source implementor
> > is not aware of HybridSource. I'm also worried that we may not have a
> > universal interface to specify start offset/time.
> > I guess it also would be much easier if we would have an abstract base
> > source class where we could implement some basic support.
> >
> > When I initially looked at the issue I was thinking that sources should
> > always be immutable (we have some bad experiences with mutable life-cycles
> > in operator implementations) and the only modifiable thing should be the
> > builder. That would mean that a HybridSource actually just gets a list of
> > source builders and creates the sources when needed with the correct
> > start/end offset. However, we neither have base builders (something that
> > I'd like to change) nor are any of the builders serializable. We could
> > convert sources back to builders, update the start offset, and convert to
> > sources again but this also seems overly complicated. So I'm assuming that
> > we should go with modifiable sources as also expressed in the FLIP draft.
>
> The need to set a start position at runtime indicates that sources
> should not be immutable. I think it would be better to have a setter
> on the source that clearly describes the mutation.
>
> Regarding deferred construction of the sources (supplier pattern):
> This is actually a very interesting idea that would also help in
> situations where the exact sequence of sources isn't known upfront.
> However, Source is also the factory for split and enumerator
> checkpoint serializers. If we were to instantiate the source at switch
> time, we would also need to distribute the serializers at switch time.
> This would lead to even more complexity and move us further away from
> the original goal of having a relatively simple implementation for the
> basic scenarios.
>
> > If we could assume that we are always switching by time, we could also
> > change Source(Enumerator)#start to take the start time as a parameter. Can
> > we deduce the end time by the record timestamp? But I guess that has all
> > been discussed already, so sorry if I derail the discussion.
>
> This actually hasn't been discussed. The original proposal left the
> type of the start position open, which also makes it less attractive
> (user still has to supply a converter).
>
> For initial internal usage of the hybrid source, we are planning to
> use a timestamp. But there may be use cases where the start position
> could be encoded in other ways, such as based on Kafka offsets.
>
> > I'm also leaning towards extending the Source interface to include these
> > methods (with defaults) to make it harder for implementers to miss.
>
> It would be possible to introduce an optional interface as a follow-up
> task. It can be implemented as the default of option 3.
>
> >
> >
> > On Fri, Jun 11, 2021 at 7:02 PM Thomas Weise <th...@apache.org> wrote:
> >
> > > Thanks for the suggestions and feedback on the PR.
> > >
> > > A variation of hybrid source that can switch back and forth was
> > > brought up before and it is something that will be eventually
> > > required. It was also suggested by Stephan that in the future there
> > > may be more than one implementation of hybrid source for different
> > > requirements.
> > >
> > > I want to bring back the topic of how enumerator end state can be
> > > converted into start position from the PR [1]. We started in the FLIP
> > > page with "switchable" interfaces, the prototype had checkpoint
> > > conversion and now the PR has a function that allows to augment the
> > > source. Each of these has pros and cons but we will need to converge.
> > >
> > > 1. Switchable interfaces
> > > * unified solution
> > > * requires sources to implement a special interface to participate in
> > > HybridSource, even when no dynamic conversion is needed
> > >
> > > 2. Checkpoint state
> > > * unified solution
> > > * no interface changes
> > > * requires implementation change to existing enumerators to include
> > > end state (like a timestamp) into their checkpoint state
> > > * existing sources work as is for fixed start position
> > >
> > > 3. Source modification at switch time to set start position
> > > * can be solved per source, least restrictive
> > > * no interface changes
> > > * requires enumerator to expose end state (as a getter) and source to
> > > be either mutable or source to be copied and augmented with the start
> > > position.
> > > * existing sources work as is for fixed start position
> > >
> > > I think more eyes might help to finalize the approach.
> > >
> > > [1] https://github.com/apache/flink/pull/15924#discussion_r649929865
> > >
> > > On Mon, Jun 7, 2021 at 11:18 PM Steven Wu <st...@gmail.com> wrote:
> > > >
> > > > > hybrid sounds to me more like the source would constantly switch back
> > > and forth
> > > >
> > > > Initially, the focus of hybrid source is more like a sequenced chain.
> > > >
> > > > But in the future it would be cool that hybrid sources can intelligently
> > > switch back and forth between historical data source (like Iceberg) and
> > > live data source (like Kafka). E.g.,
> > > > - if the Flink job is lagging behind Kafka retention, automatically
> > > switch to Iceberg source
> > > > - once job caught up, switch back to Kafka source
> > > >
> > > > That can simplify operational aspects of manually switching.
> > > >
> > > >
> > > > On Mon, Jun 7, 2021 at 8:07 AM Arvid Heise <ar...@apache.org> wrote:
> > > >>
> > > >> Sorry for joining the party so late, but it's such an interesting FLIP
> > > with
> > > >> a huge impact that I wanted to add my 2 cents. [1]
> > > >> I'm mirroring some basic question from the PR review to this thread
> > > because
> > > >> it's about the name:
> > > >>
> > > >> We could rename the thing to ConcatenatedSource(s), SourceSequence, or
> > > >> similar.
> > > >> Hybrid has the connotation of 2 for me (maybe because I'm a non-native)
> > > and
> > > >> does not carry the concatentation concept as well (hybrid sounds to me
> > > more
> > > >> like the source would constantly switch back and forth).
> > > >>
> > > >> Could we take a few minutes to think if this is the most intuitive name
> > > for
> > > >> new users? I'm especially hoping that natives might give some ideas (or
> > > >> declare that Hybrid is perfect).
> > > >>
> > > >> [1]
> > > https://github.com/apache/flink/pull/15924#pullrequestreview-677376664
> > > >>
> > > >> On Sun, Jun 6, 2021 at 7:47 PM Steven Wu <st...@gmail.com> wrote:
> > > >>
> > > >> > > Converter function relies on the specific enumerator capabilities
> > > to set
> > > >> > the new start position (e.g.
> > > >> > fileSourceEnumerator.getEndTimestamp() and
> > > >> > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> > > >> >
> > > >> > I guess the premise is that a converter is for a specific tuple of
> > > >> > (upstream source, downstream source) . We don't have to define generic
> > > >> > EndtStateT and SwitchableEnumerator interfaces. That should work.
> > > >> >
> > > >> > The benefit of defining EndtStateT and SwitchableEnumerator
> > > interfaces is
> > > >> > probably promoting uniformity across sources that support
> > > hybrid/switchable
> > > >> > source.
> > > >> >
> > > >> > On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise <th...@apache.org> wrote:
> > > >> >
> > > >> > > Hi Steven,
> > > >> > >
> > > >> > > Thank you for the thorough review of the PR and for bringing this
> > > back
> > > >> > > to the mailing list.
> > > >> > >
> > > >> > > All,
> > > >> > >
> > > >> > > I updated the FLIP-150 page to highlight aspects in which the PR
> > > >> > > deviates from the original proposal [1]. The goal would be to update
> > > >> > > the FLIP soon and bring it to a vote, as previously suggested
> > > offline
> > > >> > > by Nicholas.
> > > >> > >
> > > >> > > A few minor issues in the PR are outstanding and I'm working on test
> > > >> > > coverage for the recovery behavior, which should be completed soon.
> > > >> > >
> > > >> > > The dynamic position transfer needs to be concluded before we can
> > > move
> > > >> > > forward however.
> > > >> > >
> > > >> > > There have been various ideas, including the special
> > > >> > > "SwitchableEnumerator" interface, using enumerator checkpoint state
> > > or
> > > >> > > an enumerator interface extension to extract the end state.
> > > >> > >
> > > >> > > One goal in the FLIP is to "Reuse the existing Source connectors
> > > built
> > > >> > > with FLIP-27 without any change." and I think it is important to
> > > honor
> > > >> > > that goal given that fixed start positions do not require interface
> > > >> > > changes.
> > > >> > >
> > > >> > > Based on the feedback the following might be a good solution for
> > > >> > > runtime position transfer:
> > > >> > >
> > > >> > > * User supplies the optional converter function (not applicable for
> > > >> > > fixed positions).
> > > >> > > * Instead of relying on the enumerator checkpoint state [2], the
> > > >> > > converter function will be supplied with the current and next
> > > >> > > enumerator (source.createEnumerator).
> > > >> > > * Converter function relies on the specific enumerator capabilities
> > > to
> > > >> > > set the new start position (e.g.
> > > >> > > fileSourceEnumerator.getEndTimestamp() and
> > > >> > > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> > > >> > > * HybridSourceSplitEnumerator starts new underlying enumerator
> > > >> > >
> > > >> > > With this approach, there is no need to augment FLIP-27 interfaces
> > > and
> > > >> > > custom source capabilities are easier to integrate. Removing the
> > > >> > > mandate to rely on enumerator checkpoint state also avoids potential
> > > >> > > upgrade/compatibility issues.
> > > >> > >
> > > >> > > Thoughts?
> > > >> > >
> > > >> > > Thanks,
> > > >> > > Thomas
> > > >> > >
> > > >> > > [1]
> > > >> > >
> > > >> >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation
> > > >> > > [2]
> > > >> > >
> > > >> >
> > > https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281
> > > >> > >
> > > >> > >
> > > >> > > On Tue, Jun 1, 2021 at 3:10 PM Steven Wu <st...@gmail.com>
> > > wrote:
> > > >> > > >
> > > >> > > > discussed the PR with Thosmas offline. Thomas, please correct me
> > > if I
> > > >> > > > missed anything.
> > > >> > > >
> > > >> > > > Right now, the PR differs from the FLIP-150 doc regarding the
> > > >> > converter.
> > > >> > > > * Current PR uses the enumerator checkpoint state type as the
> > > input for
> > > >> > > the
> > > >> > > > converter
> > > >> > > > * FLIP-150 defines a new EndStateT interface.
> > > >> > > > It seems that the FLIP-150 approach of EndStateT is more
> > > flexible, as
> > > >> > > > transition EndStateT doesn't have to be included in the upstream
> > > source
> > > >> > > > checkpoint state.
> > > >> > > >
> > > >> > > > Let's look at two use cases:
> > > >> > > > 1) static cutover time at 5 pm. File source reads all data btw 9
> > > am - 5
> > > >> > > pm,
> > > >> > > > then Kafka source starts with initial position of 5 pm. In this
> > > case,
> > > >> > > there
> > > >> > > > is no need for converter or EndStateT since the starting time for
> > > Kafka
> > > >> > > > source is known and fixed.
> > > >> > > > 2) dynamic cutover time at 1 hour before now. This is useful when
> > > the
> > > >> > > > bootstrap of historic data takes a long time (like days or weeks)
> > > and
> > > >> > we
> > > >> > > > don't know the exact time of cutover when a job is launched.
> > > Instead,
> > > >> > we
> > > >> > > > are instructing the file source to stop when it gets close to live
> > > >> > data.
> > > >> > > In
> > > >> > > > this case, hybrid source construction will specify a relative
> > > time (now
> > > >> > > - 1
> > > >> > > > hour), the EndStateT (of file source) will be resolved to an
> > > absolute
> > > >> > > time
> > > >> > > > for cutover. We probably don't need to include EndStateT (end
> > > >> > timestamp)
> > > >> > > as
> > > >> > > > the file source checkpoint state. Hence, the separate EndStateT is
> > > >> > > probably
> > > >> > > > more desirable.
> > > >> > > >
> > > >> > > > We also discussed the converter for the Kafka source. Kafka source
> > > >> > > supports
> > > >> > > > different OffsetsInitializer impls (including
> > > >> > > TimestampOffsetsInitializer).
> > > >> > > > To support the dynamic cutover time (use case #2 above), we can
> > > plug
> > > >> > in a
> > > >> > > > SupplierTimestampOffsetInitializer, where the starting timestamp
> > > is not
> > > >> > > set
> > > >> > > > during source/job construction. Rather it is a supplier model
> > > where the
> > > >> > > > starting timestamp value is set to the resolved absolute timestamp
> > > >> > during
> > > >> > > > switch.
> > > >> > > >
> > > >> > > > Thanks,
> > > >> > > > Steven
> > > >> > > >
> > > >> > > >
> > > >> > > >
> > > >> > > > On Thu, May 20, 2021 at 8:59 PM Thomas Weise <th...@apache.org>
> > > wrote:
> > > >> > > >
> > > >> > > > > Hi Nicholas,
> > > >> > > > >
> > > >> > > > > Thanks for taking a look at the PR!
> > > >> > > > >
> > > >> > > > > 1. Regarding switching mechanism:
> > > >> > > > >
> > > >> > > > > There has been previous discussion in this thread regarding the
> > > pros
> > > >> > > > > and cons of how the switching can be exposed to the user.
> > > >> > > > >
> > > >> > > > > With fixed start positions, no special switching interface to
> > > >> > transfer
> > > >> > > > > information between enumerators is required. Sources are
> > > configured
> > > >> > as
> > > >> > > > > they would be when used standalone and just plugged into
> > > >> > HybridSource.
> > > >> > > > > I expect that to be a common use case. You can find an example
> > > for
> > > >> > > > > this in the ITCase:
> > > >> > > > >
> > > >> > > > >
> > > >> > > > >
> > > >> > >
> > > >> >
> > > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101
> > > >> > > > >
> > > >> > > > > For dynamic start position, the checkpoint state is used to
> > > transfer
> > > >> > > > > information from old to new enumerator. An example for that can
> > > be
> > > >> > > > > found here:
> > > >> > > > >
> > > >> > > > >
> > > >> > > > >
> > > >> > >
> > > >> >
> > > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136
> > > >> > > > >
> > > >> > > > > That may look verbose, but the code to convert from one state to
> > > >> > > > > another can be factored out into a utility and the function
> > > becomes a
> > > >> > > > > one-liner.
> > > >> > > > >
> > > >> > > > > For common sources like files and Kafka we can potentially
> > > (later)
> > > >> > > > > implement the conversion logic as part of the respective
> > > connector's
> > > >> > > > > checkpoint and split classes.
> > > >> > > > >
> > > >> > > > > I hope that with the PR up for review, we can soon reach a
> > > conclusion
> > > >> > > > > on how we want to expose this to the user.
> > > >> > > > >
> > > >> > > > > Following is an example for Files -> Files -> Kafka that I'm
> > > using
> > > >> > for
> > > >> > > > > e2e testing. It exercises both ways of setting the start
> > > position.
> > > >> > > > >
> > > >> > > > > https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > 2. Regarding the events used to implement the actual switch
> > > between
> > > >> > > > > enumerator and readers: I updated the PR with javadoc to
> > > clarify the
> > > >> > > > > intent. Please let me know if that helps or let's continue to
> > > discuss
> > > >> > > > > those details on the PR?
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > Thanks,
> > > >> > > > > Thomas
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <
> > > programgeek@163.com>
> > > >> > > > > wrote:
> > > >> > > > > >
> > > >> > > > > > Hi Thomas,
> > > >> > > > > >
> > > >> > > > > >    Sorry for later reply for your POC. I have reviewed the
> > > based
> > > >> > > abstract
> > > >> > > > > > implementation of your pull request:
> > > >> > > > > > https://github.com/apache/flink/pull/15924. IMO, for the
> > > switching
> > > >> > > > > > mechanism, this level of abstraction is not concise enough,
> > > which
> > > >> > > doesn't
> > > >> > > > > > make connector contribution easier. In theory, it is
> > > necessary to
> > > >> > > > > introduce
> > > >> > > > > > a set of interfaces to support the switching mechanism. The
> > > >> > > > > SwitchableSource
> > > >> > > > > > and SwitchableSplitEnumerator interfaces are needed for
> > > connector
> > > >> > > > > > expansibility.
> > > >> > > > > >    In other words, the whole switching process of above
> > > mentioned
> > > >> > PR
> > > >> > > is
> > > >> > > > > > different from that mentioned in FLIP-150. In the above
> > > >> > > implementation,
> > > >> > > > > the
> > > >> > > > > > source reading switching is executed after receving the
> > > >> > > > > SwitchSourceEvent,
> > > >> > > > > > which could be before the sending SourceReaderFinishEvent.
> > > This
> > > >> > > timeline
> > > >> > > > > of
> > > >> > > > > > source reading switching could be discussed here.
> > > >> > > > > >    @Stephan @Becket, if you are available, please help to
> > > review
> > > >> > the
> > > >> > > > > > abstract implementation, and compare with the interfaces
> > > mentioned
> > > >> > in
> > > >> > > > > > FLIP-150.
> > > >> > > > > >
> > > >> > > > > > Thanks,
> > > >> > > > > > Nicholas Jiang
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > > --
> > > >> > > > > > Sent from:
> > > >> > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > > >> > > > >
> > > >> > >
> > > >> >
> > >
> >
> >
> > --
> >
> > Arvid Heise | Senior Java Developer
> >
> > <https://www.ververica.com/>
> >
> > Follow us @VervericaData
> >
> > --
> >
> > Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> > Conference
> >
> > Stream Processing | Event Driven | Real Time
> >
> > --
> >
> > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >
> > --
> > Ververica GmbH
> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> > (Toni) Cheng

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Posted by Thomas Weise <th...@apache.org>.
Hi Arvid,

Thanks for your reply -->

On Mon, Jun 14, 2021 at 2:55 PM Arvid Heise <ar...@ververica.com> wrote:
>
> Hi Thomas,
>
> Thanks for bringing this up. I think this is a tough nut to crack :/.
> Imho 1 and 3 or 1+3 can work but it is ofc a pity if the source implementor
> is not aware of HybridSource. I'm also worried that we may not have a
> universal interface to specify start offset/time.
> I guess it also would be much easier if we would have an abstract base
> source class where we could implement some basic support.
>
> When I initially looked at the issue I was thinking that sources should
> always be immutable (we have some bad experiences with mutable life-cycles
> in operator implementations) and the only modifiable thing should be the
> builder. That would mean that a HybridSource actually just gets a list of
> source builders and creates the sources when needed with the correct
> start/end offset. However, we neither have base builders (something that
> I'd like to change) nor are any of the builders serializable. We could
> convert sources back to builders, update the start offset, and convert to
> sources again but this also seems overly complicated. So I'm assuming that
> we should go with modifiable sources as also expressed in the FLIP draft.

The need to set a start position at runtime indicates that sources
should not be immutable. I think it would be better to have a setter
on the source that clearly describes the mutation.

Regarding deferred construction of the sources (supplier pattern):
This is actually a very interesting idea that would also help in
situations where the exact sequence of sources isn't known upfront.
However, Source is also the factory for split and enumerator
checkpoint serializers. If we were to instantiate the source at switch
time, we would also need to distribute the serializers at switch time.
This would lead to even more complexity and move us further away from
the original goal of having a relatively simple implementation for the
basic scenarios.

> If we could assume that we are always switching by time, we could also
> change Source(Enumerator)#start to take the start time as a parameter. Can
> we deduce the end time by the record timestamp? But I guess that has all
> been discussed already, so sorry if I derail the discussion.

This actually hasn't been discussed. The original proposal left the
type of the start position open, which also makes it less attractive
(user still has to supply a converter).

For initial internal usage of the hybrid source, we are planning to
use a timestamp. But there may be use cases where the start position
could be encoded in other ways, such as based on Kafka offsets.

> I'm also leaning towards extending the Source interface to include these
> methods (with defaults) to make it harder for implementers to miss.

It would be possible to introduce an optional interface as a follow-up
task. It can be implemented as the default of option 3.

>
>
> On Fri, Jun 11, 2021 at 7:02 PM Thomas Weise <th...@apache.org> wrote:
>
> > Thanks for the suggestions and feedback on the PR.
> >
> > A variation of hybrid source that can switch back and forth was
> > brought up before and it is something that will be eventually
> > required. It was also suggested by Stephan that in the future there
> > may be more than one implementation of hybrid source for different
> > requirements.
> >
> > I want to bring back the topic of how enumerator end state can be
> > converted into start position from the PR [1]. We started in the FLIP
> > page with "switchable" interfaces, the prototype had checkpoint
> > conversion and now the PR has a function that allows to augment the
> > source. Each of these has pros and cons but we will need to converge.
> >
> > 1. Switchable interfaces
> > * unified solution
> > * requires sources to implement a special interface to participate in
> > HybridSource, even when no dynamic conversion is needed
> >
> > 2. Checkpoint state
> > * unified solution
> > * no interface changes
> > * requires implementation change to existing enumerators to include
> > end state (like a timestamp) into their checkpoint state
> > * existing sources work as is for fixed start position
> >
> > 3. Source modification at switch time to set start position
> > * can be solved per source, least restrictive
> > * no interface changes
> > * requires enumerator to expose end state (as a getter) and source to
> > be either mutable or source to be copied and augmented with the start
> > position.
> > * existing sources work as is for fixed start position
> >
> > I think more eyes might help to finalize the approach.
> >
> > [1] https://github.com/apache/flink/pull/15924#discussion_r649929865
> >
> > On Mon, Jun 7, 2021 at 11:18 PM Steven Wu <st...@gmail.com> wrote:
> > >
> > > > hybrid sounds to me more like the source would constantly switch back
> > and forth
> > >
> > > Initially, the focus of hybrid source is more like a sequenced chain.
> > >
> > > But in the future it would be cool that hybrid sources can intelligently
> > switch back and forth between historical data source (like Iceberg) and
> > live data source (like Kafka). E.g.,
> > > - if the Flink job is lagging behind Kafka retention, automatically
> > switch to Iceberg source
> > > - once job caught up, switch back to Kafka source
> > >
> > > That can simplify operational aspects of manually switching.
> > >
> > >
> > > On Mon, Jun 7, 2021 at 8:07 AM Arvid Heise <ar...@apache.org> wrote:
> > >>
> > >> Sorry for joining the party so late, but it's such an interesting FLIP
> > with
> > >> a huge impact that I wanted to add my 2 cents. [1]
> > >> I'm mirroring some basic question from the PR review to this thread
> > because
> > >> it's about the name:
> > >>
> > >> We could rename the thing to ConcatenatedSource(s), SourceSequence, or
> > >> similar.
> > >> Hybrid has the connotation of 2 for me (maybe because I'm a non-native)
> > and
> > >> does not carry the concatentation concept as well (hybrid sounds to me
> > more
> > >> like the source would constantly switch back and forth).
> > >>
> > >> Could we take a few minutes to think if this is the most intuitive name
> > for
> > >> new users? I'm especially hoping that natives might give some ideas (or
> > >> declare that Hybrid is perfect).
> > >>
> > >> [1]
> > https://github.com/apache/flink/pull/15924#pullrequestreview-677376664
> > >>
> > >> On Sun, Jun 6, 2021 at 7:47 PM Steven Wu <st...@gmail.com> wrote:
> > >>
> > >> > > Converter function relies on the specific enumerator capabilities
> > to set
> > >> > the new start position (e.g.
> > >> > fileSourceEnumerator.getEndTimestamp() and
> > >> > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> > >> >
> > >> > I guess the premise is that a converter is for a specific tuple of
> > >> > (upstream source, downstream source) . We don't have to define generic
> > >> > EndtStateT and SwitchableEnumerator interfaces. That should work.
> > >> >
> > >> > The benefit of defining EndtStateT and SwitchableEnumerator
> > interfaces is
> > >> > probably promoting uniformity across sources that support
> > hybrid/switchable
> > >> > source.
> > >> >
> > >> > On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise <th...@apache.org> wrote:
> > >> >
> > >> > > Hi Steven,
> > >> > >
> > >> > > Thank you for the thorough review of the PR and for bringing this
> > back
> > >> > > to the mailing list.
> > >> > >
> > >> > > All,
> > >> > >
> > >> > > I updated the FLIP-150 page to highlight aspects in which the PR
> > >> > > deviates from the original proposal [1]. The goal would be to update
> > >> > > the FLIP soon and bring it to a vote, as previously suggested
> > offline
> > >> > > by Nicholas.
> > >> > >
> > >> > > A few minor issues in the PR are outstanding and I'm working on test
> > >> > > coverage for the recovery behavior, which should be completed soon.
> > >> > >
> > >> > > The dynamic position transfer needs to be concluded before we can
> > move
> > >> > > forward however.
> > >> > >
> > >> > > There have been various ideas, including the special
> > >> > > "SwitchableEnumerator" interface, using enumerator checkpoint state
> > or
> > >> > > an enumerator interface extension to extract the end state.
> > >> > >
> > >> > > One goal in the FLIP is to "Reuse the existing Source connectors
> > built
> > >> > > with FLIP-27 without any change." and I think it is important to
> > honor
> > >> > > that goal given that fixed start positions do not require interface
> > >> > > changes.
> > >> > >
> > >> > > Based on the feedback the following might be a good solution for
> > >> > > runtime position transfer:
> > >> > >
> > >> > > * User supplies the optional converter function (not applicable for
> > >> > > fixed positions).
> > >> > > * Instead of relying on the enumerator checkpoint state [2], the
> > >> > > converter function will be supplied with the current and next
> > >> > > enumerator (source.createEnumerator).
> > >> > > * Converter function relies on the specific enumerator capabilities
> > to
> > >> > > set the new start position (e.g.
> > >> > > fileSourceEnumerator.getEndTimestamp() and
> > >> > > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> > >> > > * HybridSourceSplitEnumerator starts new underlying enumerator
> > >> > >
> > >> > > With this approach, there is no need to augment FLIP-27 interfaces
> > and
> > >> > > custom source capabilities are easier to integrate. Removing the
> > >> > > mandate to rely on enumerator checkpoint state also avoids potential
> > >> > > upgrade/compatibility issues.
> > >> > >
> > >> > > Thoughts?
> > >> > >
> > >> > > Thanks,
> > >> > > Thomas
> > >> > >
> > >> > > [1]
> > >> > >
> > >> >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation
> > >> > > [2]
> > >> > >
> > >> >
> > https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281
> > >> > >
> > >> > >
> > >> > > On Tue, Jun 1, 2021 at 3:10 PM Steven Wu <st...@gmail.com>
> > wrote:
> > >> > > >
> > >> > > > discussed the PR with Thosmas offline. Thomas, please correct me
> > if I
> > >> > > > missed anything.
> > >> > > >
> > >> > > > Right now, the PR differs from the FLIP-150 doc regarding the
> > >> > converter.
> > >> > > > * Current PR uses the enumerator checkpoint state type as the
> > input for
> > >> > > the
> > >> > > > converter
> > >> > > > * FLIP-150 defines a new EndStateT interface.
> > >> > > > It seems that the FLIP-150 approach of EndStateT is more
> > flexible, as
> > >> > > > transition EndStateT doesn't have to be included in the upstream
> > source
> > >> > > > checkpoint state.
> > >> > > >
> > >> > > > Let's look at two use cases:
> > >> > > > 1) static cutover time at 5 pm. File source reads all data btw 9
> > am - 5
> > >> > > pm,
> > >> > > > then Kafka source starts with initial position of 5 pm. In this
> > case,
> > >> > > there
> > >> > > > is no need for converter or EndStateT since the starting time for
> > Kafka
> > >> > > > source is known and fixed.
> > >> > > > 2) dynamic cutover time at 1 hour before now. This is useful when
> > the
> > >> > > > bootstrap of historic data takes a long time (like days or weeks)
> > and
> > >> > we
> > >> > > > don't know the exact time of cutover when a job is launched.
> > Instead,
> > >> > we
> > >> > > > are instructing the file source to stop when it gets close to live
> > >> > data.
> > >> > > In
> > >> > > > this case, hybrid source construction will specify a relative
> > time (now
> > >> > > - 1
> > >> > > > hour), the EndStateT (of file source) will be resolved to an
> > absolute
> > >> > > time
> > >> > > > for cutover. We probably don't need to include EndStateT (end
> > >> > timestamp)
> > >> > > as
> > >> > > > the file source checkpoint state. Hence, the separate EndStateT is
> > >> > > probably
> > >> > > > more desirable.
> > >> > > >
> > >> > > > We also discussed the converter for the Kafka source. Kafka source
> > >> > > supports
> > >> > > > different OffsetsInitializer impls (including
> > >> > > TimestampOffsetsInitializer).
> > >> > > > To support the dynamic cutover time (use case #2 above), we can
> > plug
> > >> > in a
> > >> > > > SupplierTimestampOffsetInitializer, where the starting timestamp
> > is not
> > >> > > set
> > >> > > > during source/job construction. Rather it is a supplier model
> > where the
> > >> > > > starting timestamp value is set to the resolved absolute timestamp
> > >> > during
> > >> > > > switch.
> > >> > > >
> > >> > > > Thanks,
> > >> > > > Steven
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > > > On Thu, May 20, 2021 at 8:59 PM Thomas Weise <th...@apache.org>
> > wrote:
> > >> > > >
> > >> > > > > Hi Nicholas,
> > >> > > > >
> > >> > > > > Thanks for taking a look at the PR!
> > >> > > > >
> > >> > > > > 1. Regarding switching mechanism:
> > >> > > > >
> > >> > > > > There has been previous discussion in this thread regarding the
> > pros
> > >> > > > > and cons of how the switching can be exposed to the user.
> > >> > > > >
> > >> > > > > With fixed start positions, no special switching interface to
> > >> > transfer
> > >> > > > > information between enumerators is required. Sources are
> > configured
> > >> > as
> > >> > > > > they would be when used standalone and just plugged into
> > >> > HybridSource.
> > >> > > > > I expect that to be a common use case. You can find an example
> > for
> > >> > > > > this in the ITCase:
> > >> > > > >
> > >> > > > >
> > >> > > > >
> > >> > >
> > >> >
> > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101
> > >> > > > >
> > >> > > > > For dynamic start position, the checkpoint state is used to
> > transfer
> > >> > > > > information from old to new enumerator. An example for that can
> > be
> > >> > > > > found here:
> > >> > > > >
> > >> > > > >
> > >> > > > >
> > >> > >
> > >> >
> > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136
> > >> > > > >
> > >> > > > > That may look verbose, but the code to convert from one state to
> > >> > > > > another can be factored out into a utility and the function
> > becomes a
> > >> > > > > one-liner.
> > >> > > > >
> > >> > > > > For common sources like files and Kafka we can potentially
> > (later)
> > >> > > > > implement the conversion logic as part of the respective
> > connector's
> > >> > > > > checkpoint and split classes.
> > >> > > > >
> > >> > > > > I hope that with the PR up for review, we can soon reach a
> > conclusion
> > >> > > > > on how we want to expose this to the user.
> > >> > > > >
> > >> > > > > Following is an example for Files -> Files -> Kafka that I'm
> > using
> > >> > for
> > >> > > > > e2e testing. It exercises both ways of setting the start
> > position.
> > >> > > > >
> > >> > > > > https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a
> > >> > > > >
> > >> > > > >
> > >> > > > > 2. Regarding the events used to implement the actual switch
> > between
> > >> > > > > enumerator and readers: I updated the PR with javadoc to
> > clarify the
> > >> > > > > intent. Please let me know if that helps or let's continue to
> > discuss
> > >> > > > > those details on the PR?
> > >> > > > >
> > >> > > > >
> > >> > > > > Thanks,
> > >> > > > > Thomas
> > >> > > > >
> > >> > > > >
> > >> > > > > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <
> > programgeek@163.com>
> > >> > > > > wrote:
> > >> > > > > >
> > >> > > > > > Hi Thomas,
> > >> > > > > >
> > >> > > > > >    Sorry for later reply for your POC. I have reviewed the
> > based
> > >> > > abstract
> > >> > > > > > implementation of your pull request:
> > >> > > > > > https://github.com/apache/flink/pull/15924. IMO, for the
> > switching
> > >> > > > > > mechanism, this level of abstraction is not concise enough,
> > which
> > >> > > doesn't
> > >> > > > > > make connector contribution easier. In theory, it is
> > necessary to
> > >> > > > > introduce
> > >> > > > > > a set of interfaces to support the switching mechanism. The
> > >> > > > > SwitchableSource
> > >> > > > > > and SwitchableSplitEnumerator interfaces are needed for
> > connector
> > >> > > > > > expansibility.
> > >> > > > > >    In other words, the whole switching process of above
> > mentioned
> > >> > PR
> > >> > > is
> > >> > > > > > different from that mentioned in FLIP-150. In the above
> > >> > > implementation,
> > >> > > > > the
> > >> > > > > > source reading switching is executed after receving the
> > >> > > > > SwitchSourceEvent,
> > >> > > > > > which could be before the sending SourceReaderFinishEvent.
> > This
> > >> > > timeline
> > >> > > > > of
> > >> > > > > > source reading switching could be discussed here.
> > >> > > > > >    @Stephan @Becket, if you are available, please help to
> > review
> > >> > the
> > >> > > > > > abstract implementation, and compare with the interfaces
> > mentioned
> > >> > in
> > >> > > > > > FLIP-150.
> > >> > > > > >
> > >> > > > > > Thanks,
> > >> > > > > > Nicholas Jiang
> > >> > > > > >
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > --
> > >> > > > > > Sent from:
> > >> > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > >> > > > >
> > >> > >
> > >> >
> >
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Posted by Arvid Heise <ar...@ververica.com>.
Hi Thomas,

Thanks for bringing this up. I think this is a tough nut to crack :/.
Imho 1 and 3 or 1+3 can work but it is ofc a pity if the source implementor
is not aware of HybridSource. I'm also worried that we may not have a
universal interface to specify start offset/time.
I guess it also would be much easier if we would have an abstract base
source class where we could implement some basic support.

When I initially looked at the issue I was thinking that sources should
always be immutable (we have some bad experiences with mutable life-cycles
in operator implementations) and the only modifiable thing should be the
builder. That would mean that a HybridSource actually just gets a list of
source builders and creates the sources when needed with the correct
start/end offset. However, we neither have base builders (something that
I'd like to change) nor are any of the builders serializable. We could
convert sources back to builders, update the start offset, and convert to
sources again but this also seems overly complicated. So I'm assuming that
we should go with modifiable sources as also expressed in the FLIP draft.

If we could assume that we are always switching by time, we could also
change Source(Enumerator)#start to take the start time as a parameter. Can
we deduce the end time by the record timestamp? But I guess that has all
been discussed already, so sorry if I derail the discussion.

I'm also leaning towards extending the Source interface to include these
methods (with defaults) to make it harder for implementers to miss.


On Fri, Jun 11, 2021 at 7:02 PM Thomas Weise <th...@apache.org> wrote:

> Thanks for the suggestions and feedback on the PR.
>
> A variation of hybrid source that can switch back and forth was
> brought up before and it is something that will be eventually
> required. It was also suggested by Stephan that in the future there
> may be more than one implementation of hybrid source for different
> requirements.
>
> I want to bring back the topic of how enumerator end state can be
> converted into start position from the PR [1]. We started in the FLIP
> page with "switchable" interfaces, the prototype had checkpoint
> conversion and now the PR has a function that allows to augment the
> source. Each of these has pros and cons but we will need to converge.
>
> 1. Switchable interfaces
> * unified solution
> * requires sources to implement a special interface to participate in
> HybridSource, even when no dynamic conversion is needed
>
> 2. Checkpoint state
> * unified solution
> * no interface changes
> * requires implementation change to existing enumerators to include
> end state (like a timestamp) into their checkpoint state
> * existing sources work as is for fixed start position
>
> 3. Source modification at switch time to set start position
> * can be solved per source, least restrictive
> * no interface changes
> * requires enumerator to expose end state (as a getter) and source to
> be either mutable or source to be copied and augmented with the start
> position.
> * existing sources work as is for fixed start position
>
> I think more eyes might help to finalize the approach.
>
> [1] https://github.com/apache/flink/pull/15924#discussion_r649929865
>
> On Mon, Jun 7, 2021 at 11:18 PM Steven Wu <st...@gmail.com> wrote:
> >
> > > hybrid sounds to me more like the source would constantly switch back
> and forth
> >
> > Initially, the focus of hybrid source is more like a sequenced chain.
> >
> > But in the future it would be cool that hybrid sources can intelligently
> switch back and forth between historical data source (like Iceberg) and
> live data source (like Kafka). E.g.,
> > - if the Flink job is lagging behind Kafka retention, automatically
> switch to Iceberg source
> > - once job caught up, switch back to Kafka source
> >
> > That can simplify operational aspects of manually switching.
> >
> >
> > On Mon, Jun 7, 2021 at 8:07 AM Arvid Heise <ar...@apache.org> wrote:
> >>
> >> Sorry for joining the party so late, but it's such an interesting FLIP
> with
> >> a huge impact that I wanted to add my 2 cents. [1]
> >> I'm mirroring some basic question from the PR review to this thread
> because
> >> it's about the name:
> >>
> >> We could rename the thing to ConcatenatedSource(s), SourceSequence, or
> >> similar.
> >> Hybrid has the connotation of 2 for me (maybe because I'm a non-native)
> and
> >> does not carry the concatentation concept as well (hybrid sounds to me
> more
> >> like the source would constantly switch back and forth).
> >>
> >> Could we take a few minutes to think if this is the most intuitive name
> for
> >> new users? I'm especially hoping that natives might give some ideas (or
> >> declare that Hybrid is perfect).
> >>
> >> [1]
> https://github.com/apache/flink/pull/15924#pullrequestreview-677376664
> >>
> >> On Sun, Jun 6, 2021 at 7:47 PM Steven Wu <st...@gmail.com> wrote:
> >>
> >> > > Converter function relies on the specific enumerator capabilities
> to set
> >> > the new start position (e.g.
> >> > fileSourceEnumerator.getEndTimestamp() and
> >> > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> >> >
> >> > I guess the premise is that a converter is for a specific tuple of
> >> > (upstream source, downstream source) . We don't have to define generic
> >> > EndtStateT and SwitchableEnumerator interfaces. That should work.
> >> >
> >> > The benefit of defining EndtStateT and SwitchableEnumerator
> interfaces is
> >> > probably promoting uniformity across sources that support
> hybrid/switchable
> >> > source.
> >> >
> >> > On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise <th...@apache.org> wrote:
> >> >
> >> > > Hi Steven,
> >> > >
> >> > > Thank you for the thorough review of the PR and for bringing this
> back
> >> > > to the mailing list.
> >> > >
> >> > > All,
> >> > >
> >> > > I updated the FLIP-150 page to highlight aspects in which the PR
> >> > > deviates from the original proposal [1]. The goal would be to update
> >> > > the FLIP soon and bring it to a vote, as previously suggested
> offline
> >> > > by Nicholas.
> >> > >
> >> > > A few minor issues in the PR are outstanding and I'm working on test
> >> > > coverage for the recovery behavior, which should be completed soon.
> >> > >
> >> > > The dynamic position transfer needs to be concluded before we can
> move
> >> > > forward however.
> >> > >
> >> > > There have been various ideas, including the special
> >> > > "SwitchableEnumerator" interface, using enumerator checkpoint state
> or
> >> > > an enumerator interface extension to extract the end state.
> >> > >
> >> > > One goal in the FLIP is to "Reuse the existing Source connectors
> built
> >> > > with FLIP-27 without any change." and I think it is important to
> honor
> >> > > that goal given that fixed start positions do not require interface
> >> > > changes.
> >> > >
> >> > > Based on the feedback the following might be a good solution for
> >> > > runtime position transfer:
> >> > >
> >> > > * User supplies the optional converter function (not applicable for
> >> > > fixed positions).
> >> > > * Instead of relying on the enumerator checkpoint state [2], the
> >> > > converter function will be supplied with the current and next
> >> > > enumerator (source.createEnumerator).
> >> > > * Converter function relies on the specific enumerator capabilities
> to
> >> > > set the new start position (e.g.
> >> > > fileSourceEnumerator.getEndTimestamp() and
> >> > > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> >> > > * HybridSourceSplitEnumerator starts new underlying enumerator
> >> > >
> >> > > With this approach, there is no need to augment FLIP-27 interfaces
> and
> >> > > custom source capabilities are easier to integrate. Removing the
> >> > > mandate to rely on enumerator checkpoint state also avoids potential
> >> > > upgrade/compatibility issues.
> >> > >
> >> > > Thoughts?
> >> > >
> >> > > Thanks,
> >> > > Thomas
> >> > >
> >> > > [1]
> >> > >
> >> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation
> >> > > [2]
> >> > >
> >> >
> https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281
> >> > >
> >> > >
> >> > > On Tue, Jun 1, 2021 at 3:10 PM Steven Wu <st...@gmail.com>
> wrote:
> >> > > >
> >> > > > discussed the PR with Thosmas offline. Thomas, please correct me
> if I
> >> > > > missed anything.
> >> > > >
> >> > > > Right now, the PR differs from the FLIP-150 doc regarding the
> >> > converter.
> >> > > > * Current PR uses the enumerator checkpoint state type as the
> input for
> >> > > the
> >> > > > converter
> >> > > > * FLIP-150 defines a new EndStateT interface.
> >> > > > It seems that the FLIP-150 approach of EndStateT is more
> flexible, as
> >> > > > transition EndStateT doesn't have to be included in the upstream
> source
> >> > > > checkpoint state.
> >> > > >
> >> > > > Let's look at two use cases:
> >> > > > 1) static cutover time at 5 pm. File source reads all data btw 9
> am - 5
> >> > > pm,
> >> > > > then Kafka source starts with initial position of 5 pm. In this
> case,
> >> > > there
> >> > > > is no need for converter or EndStateT since the starting time for
> Kafka
> >> > > > source is known and fixed.
> >> > > > 2) dynamic cutover time at 1 hour before now. This is useful when
> the
> >> > > > bootstrap of historic data takes a long time (like days or weeks)
> and
> >> > we
> >> > > > don't know the exact time of cutover when a job is launched.
> Instead,
> >> > we
> >> > > > are instructing the file source to stop when it gets close to live
> >> > data.
> >> > > In
> >> > > > this case, hybrid source construction will specify a relative
> time (now
> >> > > - 1
> >> > > > hour), the EndStateT (of file source) will be resolved to an
> absolute
> >> > > time
> >> > > > for cutover. We probably don't need to include EndStateT (end
> >> > timestamp)
> >> > > as
> >> > > > the file source checkpoint state. Hence, the separate EndStateT is
> >> > > probably
> >> > > > more desirable.
> >> > > >
> >> > > > We also discussed the converter for the Kafka source. Kafka source
> >> > > supports
> >> > > > different OffsetsInitializer impls (including
> >> > > TimestampOffsetsInitializer).
> >> > > > To support the dynamic cutover time (use case #2 above), we can
> plug
> >> > in a
> >> > > > SupplierTimestampOffsetInitializer, where the starting timestamp
> is not
> >> > > set
> >> > > > during source/job construction. Rather it is a supplier model
> where the
> >> > > > starting timestamp value is set to the resolved absolute timestamp
> >> > during
> >> > > > switch.
> >> > > >
> >> > > > Thanks,
> >> > > > Steven
> >> > > >
> >> > > >
> >> > > >
> >> > > > On Thu, May 20, 2021 at 8:59 PM Thomas Weise <th...@apache.org>
> wrote:
> >> > > >
> >> > > > > Hi Nicholas,
> >> > > > >
> >> > > > > Thanks for taking a look at the PR!
> >> > > > >
> >> > > > > 1. Regarding switching mechanism:
> >> > > > >
> >> > > > > There has been previous discussion in this thread regarding the
> pros
> >> > > > > and cons of how the switching can be exposed to the user.
> >> > > > >
> >> > > > > With fixed start positions, no special switching interface to
> >> > transfer
> >> > > > > information between enumerators is required. Sources are
> configured
> >> > as
> >> > > > > they would be when used standalone and just plugged into
> >> > HybridSource.
> >> > > > > I expect that to be a common use case. You can find an example
> for
> >> > > > > this in the ITCase:
> >> > > > >
> >> > > > >
> >> > > > >
> >> > >
> >> >
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101
> >> > > > >
> >> > > > > For dynamic start position, the checkpoint state is used to
> transfer
> >> > > > > information from old to new enumerator. An example for that can
> be
> >> > > > > found here:
> >> > > > >
> >> > > > >
> >> > > > >
> >> > >
> >> >
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136
> >> > > > >
> >> > > > > That may look verbose, but the code to convert from one state to
> >> > > > > another can be factored out into a utility and the function
> becomes a
> >> > > > > one-liner.
> >> > > > >
> >> > > > > For common sources like files and Kafka we can potentially
> (later)
> >> > > > > implement the conversion logic as part of the respective
> connector's
> >> > > > > checkpoint and split classes.
> >> > > > >
> >> > > > > I hope that with the PR up for review, we can soon reach a
> conclusion
> >> > > > > on how we want to expose this to the user.
> >> > > > >
> >> > > > > Following is an example for Files -> Files -> Kafka that I'm
> using
> >> > for
> >> > > > > e2e testing. It exercises both ways of setting the start
> position.
> >> > > > >
> >> > > > > https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a
> >> > > > >
> >> > > > >
> >> > > > > 2. Regarding the events used to implement the actual switch
> between
> >> > > > > enumerator and readers: I updated the PR with javadoc to
> clarify the
> >> > > > > intent. Please let me know if that helps or let's continue to
> discuss
> >> > > > > those details on the PR?
> >> > > > >
> >> > > > >
> >> > > > > Thanks,
> >> > > > > Thomas
> >> > > > >
> >> > > > >
> >> > > > > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <
> programgeek@163.com>
> >> > > > > wrote:
> >> > > > > >
> >> > > > > > Hi Thomas,
> >> > > > > >
> >> > > > > >    Sorry for later reply for your POC. I have reviewed the
> based
> >> > > abstract
> >> > > > > > implementation of your pull request:
> >> > > > > > https://github.com/apache/flink/pull/15924. IMO, for the
> switching
> >> > > > > > mechanism, this level of abstraction is not concise enough,
> which
> >> > > doesn't
> >> > > > > > make connector contribution easier. In theory, it is
> necessary to
> >> > > > > introduce
> >> > > > > > a set of interfaces to support the switching mechanism. The
> >> > > > > SwitchableSource
> >> > > > > > and SwitchableSplitEnumerator interfaces are needed for
> connector
> >> > > > > > expansibility.
> >> > > > > >    In other words, the whole switching process of above
> mentioned
> >> > PR
> >> > > is
> >> > > > > > different from that mentioned in FLIP-150. In the above
> >> > > implementation,
> >> > > > > the
> >> > > > > > source reading switching is executed after receving the
> >> > > > > SwitchSourceEvent,
> >> > > > > > which could be before the sending SourceReaderFinishEvent.
> This
> >> > > timeline
> >> > > > > of
> >> > > > > > source reading switching could be discussed here.
> >> > > > > >    @Stephan @Becket, if you are available, please help to
> review
> >> > the
> >> > > > > > abstract implementation, and compare with the interfaces
> mentioned
> >> > in
> >> > > > > > FLIP-150.
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > > Nicholas Jiang
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > --
> >> > > > > > Sent from:
> >> > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> >> > > > >
> >> > >
> >> >
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Posted by Thomas Weise <th...@apache.org>.
Thanks for the suggestions and feedback on the PR.

A variation of hybrid source that can switch back and forth was
brought up before and it is something that will be eventually
required. It was also suggested by Stephan that in the future there
may be more than one implementation of hybrid source for different
requirements.

I want to bring back the topic of how enumerator end state can be
converted into start position from the PR [1]. We started in the FLIP
page with "switchable" interfaces, the prototype had checkpoint
conversion and now the PR has a function that allows to augment the
source. Each of these has pros and cons but we will need to converge.

1. Switchable interfaces
* unified solution
* requires sources to implement a special interface to participate in
HybridSource, even when no dynamic conversion is needed

2. Checkpoint state
* unified solution
* no interface changes
* requires implementation change to existing enumerators to include
end state (like a timestamp) into their checkpoint state
* existing sources work as is for fixed start position

3. Source modification at switch time to set start position
* can be solved per source, least restrictive
* no interface changes
* requires enumerator to expose end state (as a getter) and source to
be either mutable or source to be copied and augmented with the start
position.
* existing sources work as is for fixed start position

I think more eyes might help to finalize the approach.

[1] https://github.com/apache/flink/pull/15924#discussion_r649929865

On Mon, Jun 7, 2021 at 11:18 PM Steven Wu <st...@gmail.com> wrote:
>
> > hybrid sounds to me more like the source would constantly switch back and forth
>
> Initially, the focus of hybrid source is more like a sequenced chain.
>
> But in the future it would be cool that hybrid sources can intelligently switch back and forth between historical data source (like Iceberg) and live data source (like Kafka). E.g.,
> - if the Flink job is lagging behind Kafka retention, automatically switch to Iceberg source
> - once job caught up, switch back to Kafka source
>
> That can simplify operational aspects of manually switching.
>
>
> On Mon, Jun 7, 2021 at 8:07 AM Arvid Heise <ar...@apache.org> wrote:
>>
>> Sorry for joining the party so late, but it's such an interesting FLIP with
>> a huge impact that I wanted to add my 2 cents. [1]
>> I'm mirroring some basic question from the PR review to this thread because
>> it's about the name:
>>
>> We could rename the thing to ConcatenatedSource(s), SourceSequence, or
>> similar.
>> Hybrid has the connotation of 2 for me (maybe because I'm a non-native) and
>> does not carry the concatentation concept as well (hybrid sounds to me more
>> like the source would constantly switch back and forth).
>>
>> Could we take a few minutes to think if this is the most intuitive name for
>> new users? I'm especially hoping that natives might give some ideas (or
>> declare that Hybrid is perfect).
>>
>> [1] https://github.com/apache/flink/pull/15924#pullrequestreview-677376664
>>
>> On Sun, Jun 6, 2021 at 7:47 PM Steven Wu <st...@gmail.com> wrote:
>>
>> > > Converter function relies on the specific enumerator capabilities to set
>> > the new start position (e.g.
>> > fileSourceEnumerator.getEndTimestamp() and
>> > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
>> >
>> > I guess the premise is that a converter is for a specific tuple of
>> > (upstream source, downstream source) . We don't have to define generic
>> > EndtStateT and SwitchableEnumerator interfaces. That should work.
>> >
>> > The benefit of defining EndtStateT and SwitchableEnumerator interfaces is
>> > probably promoting uniformity across sources that support hybrid/switchable
>> > source.
>> >
>> > On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise <th...@apache.org> wrote:
>> >
>> > > Hi Steven,
>> > >
>> > > Thank you for the thorough review of the PR and for bringing this back
>> > > to the mailing list.
>> > >
>> > > All,
>> > >
>> > > I updated the FLIP-150 page to highlight aspects in which the PR
>> > > deviates from the original proposal [1]. The goal would be to update
>> > > the FLIP soon and bring it to a vote, as previously suggested offline
>> > > by Nicholas.
>> > >
>> > > A few minor issues in the PR are outstanding and I'm working on test
>> > > coverage for the recovery behavior, which should be completed soon.
>> > >
>> > > The dynamic position transfer needs to be concluded before we can move
>> > > forward however.
>> > >
>> > > There have been various ideas, including the special
>> > > "SwitchableEnumerator" interface, using enumerator checkpoint state or
>> > > an enumerator interface extension to extract the end state.
>> > >
>> > > One goal in the FLIP is to "Reuse the existing Source connectors built
>> > > with FLIP-27 without any change." and I think it is important to honor
>> > > that goal given that fixed start positions do not require interface
>> > > changes.
>> > >
>> > > Based on the feedback the following might be a good solution for
>> > > runtime position transfer:
>> > >
>> > > * User supplies the optional converter function (not applicable for
>> > > fixed positions).
>> > > * Instead of relying on the enumerator checkpoint state [2], the
>> > > converter function will be supplied with the current and next
>> > > enumerator (source.createEnumerator).
>> > > * Converter function relies on the specific enumerator capabilities to
>> > > set the new start position (e.g.
>> > > fileSourceEnumerator.getEndTimestamp() and
>> > > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
>> > > * HybridSourceSplitEnumerator starts new underlying enumerator
>> > >
>> > > With this approach, there is no need to augment FLIP-27 interfaces and
>> > > custom source capabilities are easier to integrate. Removing the
>> > > mandate to rely on enumerator checkpoint state also avoids potential
>> > > upgrade/compatibility issues.
>> > >
>> > > Thoughts?
>> > >
>> > > Thanks,
>> > > Thomas
>> > >
>> > > [1]
>> > >
>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation
>> > > [2]
>> > >
>> > https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281
>> > >
>> > >
>> > > On Tue, Jun 1, 2021 at 3:10 PM Steven Wu <st...@gmail.com> wrote:
>> > > >
>> > > > discussed the PR with Thosmas offline. Thomas, please correct me if I
>> > > > missed anything.
>> > > >
>> > > > Right now, the PR differs from the FLIP-150 doc regarding the
>> > converter.
>> > > > * Current PR uses the enumerator checkpoint state type as the input for
>> > > the
>> > > > converter
>> > > > * FLIP-150 defines a new EndStateT interface.
>> > > > It seems that the FLIP-150 approach of EndStateT is more flexible, as
>> > > > transition EndStateT doesn't have to be included in the upstream source
>> > > > checkpoint state.
>> > > >
>> > > > Let's look at two use cases:
>> > > > 1) static cutover time at 5 pm. File source reads all data btw 9 am - 5
>> > > pm,
>> > > > then Kafka source starts with initial position of 5 pm. In this case,
>> > > there
>> > > > is no need for converter or EndStateT since the starting time for Kafka
>> > > > source is known and fixed.
>> > > > 2) dynamic cutover time at 1 hour before now. This is useful when the
>> > > > bootstrap of historic data takes a long time (like days or weeks) and
>> > we
>> > > > don't know the exact time of cutover when a job is launched. Instead,
>> > we
>> > > > are instructing the file source to stop when it gets close to live
>> > data.
>> > > In
>> > > > this case, hybrid source construction will specify a relative time (now
>> > > - 1
>> > > > hour), the EndStateT (of file source) will be resolved to an absolute
>> > > time
>> > > > for cutover. We probably don't need to include EndStateT (end
>> > timestamp)
>> > > as
>> > > > the file source checkpoint state. Hence, the separate EndStateT is
>> > > probably
>> > > > more desirable.
>> > > >
>> > > > We also discussed the converter for the Kafka source. Kafka source
>> > > supports
>> > > > different OffsetsInitializer impls (including
>> > > TimestampOffsetsInitializer).
>> > > > To support the dynamic cutover time (use case #2 above), we can plug
>> > in a
>> > > > SupplierTimestampOffsetInitializer, where the starting timestamp is not
>> > > set
>> > > > during source/job construction. Rather it is a supplier model where the
>> > > > starting timestamp value is set to the resolved absolute timestamp
>> > during
>> > > > switch.
>> > > >
>> > > > Thanks,
>> > > > Steven
>> > > >
>> > > >
>> > > >
>> > > > On Thu, May 20, 2021 at 8:59 PM Thomas Weise <th...@apache.org> wrote:
>> > > >
>> > > > > Hi Nicholas,
>> > > > >
>> > > > > Thanks for taking a look at the PR!
>> > > > >
>> > > > > 1. Regarding switching mechanism:
>> > > > >
>> > > > > There has been previous discussion in this thread regarding the pros
>> > > > > and cons of how the switching can be exposed to the user.
>> > > > >
>> > > > > With fixed start positions, no special switching interface to
>> > transfer
>> > > > > information between enumerators is required. Sources are configured
>> > as
>> > > > > they would be when used standalone and just plugged into
>> > HybridSource.
>> > > > > I expect that to be a common use case. You can find an example for
>> > > > > this in the ITCase:
>> > > > >
>> > > > >
>> > > > >
>> > >
>> > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101
>> > > > >
>> > > > > For dynamic start position, the checkpoint state is used to transfer
>> > > > > information from old to new enumerator. An example for that can be
>> > > > > found here:
>> > > > >
>> > > > >
>> > > > >
>> > >
>> > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136
>> > > > >
>> > > > > That may look verbose, but the code to convert from one state to
>> > > > > another can be factored out into a utility and the function becomes a
>> > > > > one-liner.
>> > > > >
>> > > > > For common sources like files and Kafka we can potentially (later)
>> > > > > implement the conversion logic as part of the respective connector's
>> > > > > checkpoint and split classes.
>> > > > >
>> > > > > I hope that with the PR up for review, we can soon reach a conclusion
>> > > > > on how we want to expose this to the user.
>> > > > >
>> > > > > Following is an example for Files -> Files -> Kafka that I'm using
>> > for
>> > > > > e2e testing. It exercises both ways of setting the start position.
>> > > > >
>> > > > > https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a
>> > > > >
>> > > > >
>> > > > > 2. Regarding the events used to implement the actual switch between
>> > > > > enumerator and readers: I updated the PR with javadoc to clarify the
>> > > > > intent. Please let me know if that helps or let's continue to discuss
>> > > > > those details on the PR?
>> > > > >
>> > > > >
>> > > > > Thanks,
>> > > > > Thomas
>> > > > >
>> > > > >
>> > > > > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <pr...@163.com>
>> > > > > wrote:
>> > > > > >
>> > > > > > Hi Thomas,
>> > > > > >
>> > > > > >    Sorry for later reply for your POC. I have reviewed the based
>> > > abstract
>> > > > > > implementation of your pull request:
>> > > > > > https://github.com/apache/flink/pull/15924. IMO, for the switching
>> > > > > > mechanism, this level of abstraction is not concise enough, which
>> > > doesn't
>> > > > > > make connector contribution easier. In theory, it is necessary to
>> > > > > introduce
>> > > > > > a set of interfaces to support the switching mechanism. The
>> > > > > SwitchableSource
>> > > > > > and SwitchableSplitEnumerator interfaces are needed for connector
>> > > > > > expansibility.
>> > > > > >    In other words, the whole switching process of above mentioned
>> > PR
>> > > is
>> > > > > > different from that mentioned in FLIP-150. In the above
>> > > implementation,
>> > > > > the
>> > > > > > source reading switching is executed after receving the
>> > > > > SwitchSourceEvent,
>> > > > > > which could be before the sending SourceReaderFinishEvent. This
>> > > timeline
>> > > > > of
>> > > > > > source reading switching could be discussed here.
>> > > > > >    @Stephan @Becket, if you are available, please help to review
>> > the
>> > > > > > abstract implementation, and compare with the interfaces mentioned
>> > in
>> > > > > > FLIP-150.
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Nicholas Jiang
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > --
>> > > > > > Sent from:
>> > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>> > > > >
>> > >
>> >

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Posted by Steven Wu <st...@gmail.com>.
> hybrid sounds to me more like the source would constantly switch back and
forth

Initially, the focus of hybrid source is more like a sequenced chain.

But in the future it would be cool that hybrid sources can intelligently
switch back and forth between historical data source (like Iceberg) and
live data source (like Kafka). E.g.,
- if the Flink job is lagging behind Kafka retention, automatically switch
to Iceberg source
- once job caught up, switch back to Kafka source

That can simplify operational aspects of manually switching.


On Mon, Jun 7, 2021 at 8:07 AM Arvid Heise <ar...@apache.org> wrote:

> Sorry for joining the party so late, but it's such an interesting FLIP with
> a huge impact that I wanted to add my 2 cents. [1]
> I'm mirroring some basic question from the PR review to this thread because
> it's about the name:
>
> We could rename the thing to ConcatenatedSource(s), SourceSequence, or
> similar.
> Hybrid has the connotation of 2 for me (maybe because I'm a non-native) and
> does not carry the concatentation concept as well (hybrid sounds to me more
> like the source would constantly switch back and forth).
>
> Could we take a few minutes to think if this is the most intuitive name for
> new users? I'm especially hoping that natives might give some ideas (or
> declare that Hybrid is perfect).
>
> [1] https://github.com/apache/flink/pull/15924#pullrequestreview-677376664
>
> On Sun, Jun 6, 2021 at 7:47 PM Steven Wu <st...@gmail.com> wrote:
>
> > > Converter function relies on the specific enumerator capabilities to
> set
> > the new start position (e.g.
> > fileSourceEnumerator.getEndTimestamp() and
> > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> >
> > I guess the premise is that a converter is for a specific tuple of
> > (upstream source, downstream source) . We don't have to define generic
> > EndtStateT and SwitchableEnumerator interfaces. That should work.
> >
> > The benefit of defining EndtStateT and SwitchableEnumerator interfaces is
> > probably promoting uniformity across sources that support
> hybrid/switchable
> > source.
> >
> > On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise <th...@apache.org> wrote:
> >
> > > Hi Steven,
> > >
> > > Thank you for the thorough review of the PR and for bringing this back
> > > to the mailing list.
> > >
> > > All,
> > >
> > > I updated the FLIP-150 page to highlight aspects in which the PR
> > > deviates from the original proposal [1]. The goal would be to update
> > > the FLIP soon and bring it to a vote, as previously suggested offline
> > > by Nicholas.
> > >
> > > A few minor issues in the PR are outstanding and I'm working on test
> > > coverage for the recovery behavior, which should be completed soon.
> > >
> > > The dynamic position transfer needs to be concluded before we can move
> > > forward however.
> > >
> > > There have been various ideas, including the special
> > > "SwitchableEnumerator" interface, using enumerator checkpoint state or
> > > an enumerator interface extension to extract the end state.
> > >
> > > One goal in the FLIP is to "Reuse the existing Source connectors built
> > > with FLIP-27 without any change." and I think it is important to honor
> > > that goal given that fixed start positions do not require interface
> > > changes.
> > >
> > > Based on the feedback the following might be a good solution for
> > > runtime position transfer:
> > >
> > > * User supplies the optional converter function (not applicable for
> > > fixed positions).
> > > * Instead of relying on the enumerator checkpoint state [2], the
> > > converter function will be supplied with the current and next
> > > enumerator (source.createEnumerator).
> > > * Converter function relies on the specific enumerator capabilities to
> > > set the new start position (e.g.
> > > fileSourceEnumerator.getEndTimestamp() and
> > > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> > > * HybridSourceSplitEnumerator starts new underlying enumerator
> > >
> > > With this approach, there is no need to augment FLIP-27 interfaces and
> > > custom source capabilities are easier to integrate. Removing the
> > > mandate to rely on enumerator checkpoint state also avoids potential
> > > upgrade/compatibility issues.
> > >
> > > Thoughts?
> > >
> > > Thanks,
> > > Thomas
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation
> > > [2]
> > >
> >
> https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281
> > >
> > >
> > > On Tue, Jun 1, 2021 at 3:10 PM Steven Wu <st...@gmail.com> wrote:
> > > >
> > > > discussed the PR with Thosmas offline. Thomas, please correct me if I
> > > > missed anything.
> > > >
> > > > Right now, the PR differs from the FLIP-150 doc regarding the
> > converter.
> > > > * Current PR uses the enumerator checkpoint state type as the input
> for
> > > the
> > > > converter
> > > > * FLIP-150 defines a new EndStateT interface.
> > > > It seems that the FLIP-150 approach of EndStateT is more flexible, as
> > > > transition EndStateT doesn't have to be included in the upstream
> source
> > > > checkpoint state.
> > > >
> > > > Let's look at two use cases:
> > > > 1) static cutover time at 5 pm. File source reads all data btw 9 am
> - 5
> > > pm,
> > > > then Kafka source starts with initial position of 5 pm. In this case,
> > > there
> > > > is no need for converter or EndStateT since the starting time for
> Kafka
> > > > source is known and fixed.
> > > > 2) dynamic cutover time at 1 hour before now. This is useful when the
> > > > bootstrap of historic data takes a long time (like days or weeks) and
> > we
> > > > don't know the exact time of cutover when a job is launched. Instead,
> > we
> > > > are instructing the file source to stop when it gets close to live
> > data.
> > > In
> > > > this case, hybrid source construction will specify a relative time
> (now
> > > - 1
> > > > hour), the EndStateT (of file source) will be resolved to an absolute
> > > time
> > > > for cutover. We probably don't need to include EndStateT (end
> > timestamp)
> > > as
> > > > the file source checkpoint state. Hence, the separate EndStateT is
> > > probably
> > > > more desirable.
> > > >
> > > > We also discussed the converter for the Kafka source. Kafka source
> > > supports
> > > > different OffsetsInitializer impls (including
> > > TimestampOffsetsInitializer).
> > > > To support the dynamic cutover time (use case #2 above), we can plug
> > in a
> > > > SupplierTimestampOffsetInitializer, where the starting timestamp is
> not
> > > set
> > > > during source/job construction. Rather it is a supplier model where
> the
> > > > starting timestamp value is set to the resolved absolute timestamp
> > during
> > > > switch.
> > > >
> > > > Thanks,
> > > > Steven
> > > >
> > > >
> > > >
> > > > On Thu, May 20, 2021 at 8:59 PM Thomas Weise <th...@apache.org> wrote:
> > > >
> > > > > Hi Nicholas,
> > > > >
> > > > > Thanks for taking a look at the PR!
> > > > >
> > > > > 1. Regarding switching mechanism:
> > > > >
> > > > > There has been previous discussion in this thread regarding the
> pros
> > > > > and cons of how the switching can be exposed to the user.
> > > > >
> > > > > With fixed start positions, no special switching interface to
> > transfer
> > > > > information between enumerators is required. Sources are configured
> > as
> > > > > they would be when used standalone and just plugged into
> > HybridSource.
> > > > > I expect that to be a common use case. You can find an example for
> > > > > this in the ITCase:
> > > > >
> > > > >
> > > > >
> > >
> >
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101
> > > > >
> > > > > For dynamic start position, the checkpoint state is used to
> transfer
> > > > > information from old to new enumerator. An example for that can be
> > > > > found here:
> > > > >
> > > > >
> > > > >
> > >
> >
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136
> > > > >
> > > > > That may look verbose, but the code to convert from one state to
> > > > > another can be factored out into a utility and the function
> becomes a
> > > > > one-liner.
> > > > >
> > > > > For common sources like files and Kafka we can potentially (later)
> > > > > implement the conversion logic as part of the respective
> connector's
> > > > > checkpoint and split classes.
> > > > >
> > > > > I hope that with the PR up for review, we can soon reach a
> conclusion
> > > > > on how we want to expose this to the user.
> > > > >
> > > > > Following is an example for Files -> Files -> Kafka that I'm using
> > for
> > > > > e2e testing. It exercises both ways of setting the start position.
> > > > >
> > > > > https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a
> > > > >
> > > > >
> > > > > 2. Regarding the events used to implement the actual switch between
> > > > > enumerator and readers: I updated the PR with javadoc to clarify
> the
> > > > > intent. Please let me know if that helps or let's continue to
> discuss
> > > > > those details on the PR?
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Thomas
> > > > >
> > > > >
> > > > > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <
> programgeek@163.com>
> > > > > wrote:
> > > > > >
> > > > > > Hi Thomas,
> > > > > >
> > > > > >    Sorry for later reply for your POC. I have reviewed the based
> > > abstract
> > > > > > implementation of your pull request:
> > > > > > https://github.com/apache/flink/pull/15924. IMO, for the
> switching
> > > > > > mechanism, this level of abstraction is not concise enough, which
> > > doesn't
> > > > > > make connector contribution easier. In theory, it is necessary to
> > > > > introduce
> > > > > > a set of interfaces to support the switching mechanism. The
> > > > > SwitchableSource
> > > > > > and SwitchableSplitEnumerator interfaces are needed for connector
> > > > > > expansibility.
> > > > > >    In other words, the whole switching process of above mentioned
> > PR
> > > is
> > > > > > different from that mentioned in FLIP-150. In the above
> > > implementation,
> > > > > the
> > > > > > source reading switching is executed after receving the
> > > > > SwitchSourceEvent,
> > > > > > which could be before the sending SourceReaderFinishEvent. This
> > > timeline
> > > > > of
> > > > > > source reading switching could be discussed here.
> > > > > >    @Stephan @Becket, if you are available, please help to review
> > the
> > > > > > abstract implementation, and compare with the interfaces
> mentioned
> > in
> > > > > > FLIP-150.
> > > > > >
> > > > > > Thanks,
> > > > > > Nicholas Jiang
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Sent from:
> > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > > > >
> > >
> >
>

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Posted by Arvid Heise <ar...@apache.org>.
Sorry for joining the party so late, but it's such an interesting FLIP with
a huge impact that I wanted to add my 2 cents. [1]
I'm mirroring some basic question from the PR review to this thread because
it's about the name:

We could rename the thing to ConcatenatedSource(s), SourceSequence, or
similar.
Hybrid has the connotation of 2 for me (maybe because I'm a non-native) and
does not carry the concatentation concept as well (hybrid sounds to me more
like the source would constantly switch back and forth).

Could we take a few minutes to think if this is the most intuitive name for
new users? I'm especially hoping that natives might give some ideas (or
declare that Hybrid is perfect).

[1] https://github.com/apache/flink/pull/15924#pullrequestreview-677376664

On Sun, Jun 6, 2021 at 7:47 PM Steven Wu <st...@gmail.com> wrote:

> > Converter function relies on the specific enumerator capabilities to set
> the new start position (e.g.
> fileSourceEnumerator.getEndTimestamp() and
> kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
>
> I guess the premise is that a converter is for a specific tuple of
> (upstream source, downstream source) . We don't have to define generic
> EndtStateT and SwitchableEnumerator interfaces. That should work.
>
> The benefit of defining EndtStateT and SwitchableEnumerator interfaces is
> probably promoting uniformity across sources that support hybrid/switchable
> source.
>
> On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise <th...@apache.org> wrote:
>
> > Hi Steven,
> >
> > Thank you for the thorough review of the PR and for bringing this back
> > to the mailing list.
> >
> > All,
> >
> > I updated the FLIP-150 page to highlight aspects in which the PR
> > deviates from the original proposal [1]. The goal would be to update
> > the FLIP soon and bring it to a vote, as previously suggested offline
> > by Nicholas.
> >
> > A few minor issues in the PR are outstanding and I'm working on test
> > coverage for the recovery behavior, which should be completed soon.
> >
> > The dynamic position transfer needs to be concluded before we can move
> > forward however.
> >
> > There have been various ideas, including the special
> > "SwitchableEnumerator" interface, using enumerator checkpoint state or
> > an enumerator interface extension to extract the end state.
> >
> > One goal in the FLIP is to "Reuse the existing Source connectors built
> > with FLIP-27 without any change." and I think it is important to honor
> > that goal given that fixed start positions do not require interface
> > changes.
> >
> > Based on the feedback the following might be a good solution for
> > runtime position transfer:
> >
> > * User supplies the optional converter function (not applicable for
> > fixed positions).
> > * Instead of relying on the enumerator checkpoint state [2], the
> > converter function will be supplied with the current and next
> > enumerator (source.createEnumerator).
> > * Converter function relies on the specific enumerator capabilities to
> > set the new start position (e.g.
> > fileSourceEnumerator.getEndTimestamp() and
> > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> > * HybridSourceSplitEnumerator starts new underlying enumerator
> >
> > With this approach, there is no need to augment FLIP-27 interfaces and
> > custom source capabilities are easier to integrate. Removing the
> > mandate to rely on enumerator checkpoint state also avoids potential
> > upgrade/compatibility issues.
> >
> > Thoughts?
> >
> > Thanks,
> > Thomas
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation
> > [2]
> >
> https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281
> >
> >
> > On Tue, Jun 1, 2021 at 3:10 PM Steven Wu <st...@gmail.com> wrote:
> > >
> > > discussed the PR with Thosmas offline. Thomas, please correct me if I
> > > missed anything.
> > >
> > > Right now, the PR differs from the FLIP-150 doc regarding the
> converter.
> > > * Current PR uses the enumerator checkpoint state type as the input for
> > the
> > > converter
> > > * FLIP-150 defines a new EndStateT interface.
> > > It seems that the FLIP-150 approach of EndStateT is more flexible, as
> > > transition EndStateT doesn't have to be included in the upstream source
> > > checkpoint state.
> > >
> > > Let's look at two use cases:
> > > 1) static cutover time at 5 pm. File source reads all data btw 9 am - 5
> > pm,
> > > then Kafka source starts with initial position of 5 pm. In this case,
> > there
> > > is no need for converter or EndStateT since the starting time for Kafka
> > > source is known and fixed.
> > > 2) dynamic cutover time at 1 hour before now. This is useful when the
> > > bootstrap of historic data takes a long time (like days or weeks) and
> we
> > > don't know the exact time of cutover when a job is launched. Instead,
> we
> > > are instructing the file source to stop when it gets close to live
> data.
> > In
> > > this case, hybrid source construction will specify a relative time (now
> > - 1
> > > hour), the EndStateT (of file source) will be resolved to an absolute
> > time
> > > for cutover. We probably don't need to include EndStateT (end
> timestamp)
> > as
> > > the file source checkpoint state. Hence, the separate EndStateT is
> > probably
> > > more desirable.
> > >
> > > We also discussed the converter for the Kafka source. Kafka source
> > supports
> > > different OffsetsInitializer impls (including
> > TimestampOffsetsInitializer).
> > > To support the dynamic cutover time (use case #2 above), we can plug
> in a
> > > SupplierTimestampOffsetInitializer, where the starting timestamp is not
> > set
> > > during source/job construction. Rather it is a supplier model where the
> > > starting timestamp value is set to the resolved absolute timestamp
> during
> > > switch.
> > >
> > > Thanks,
> > > Steven
> > >
> > >
> > >
> > > On Thu, May 20, 2021 at 8:59 PM Thomas Weise <th...@apache.org> wrote:
> > >
> > > > Hi Nicholas,
> > > >
> > > > Thanks for taking a look at the PR!
> > > >
> > > > 1. Regarding switching mechanism:
> > > >
> > > > There has been previous discussion in this thread regarding the pros
> > > > and cons of how the switching can be exposed to the user.
> > > >
> > > > With fixed start positions, no special switching interface to
> transfer
> > > > information between enumerators is required. Sources are configured
> as
> > > > they would be when used standalone and just plugged into
> HybridSource.
> > > > I expect that to be a common use case. You can find an example for
> > > > this in the ITCase:
> > > >
> > > >
> > > >
> >
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101
> > > >
> > > > For dynamic start position, the checkpoint state is used to transfer
> > > > information from old to new enumerator. An example for that can be
> > > > found here:
> > > >
> > > >
> > > >
> >
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136
> > > >
> > > > That may look verbose, but the code to convert from one state to
> > > > another can be factored out into a utility and the function becomes a
> > > > one-liner.
> > > >
> > > > For common sources like files and Kafka we can potentially (later)
> > > > implement the conversion logic as part of the respective connector's
> > > > checkpoint and split classes.
> > > >
> > > > I hope that with the PR up for review, we can soon reach a conclusion
> > > > on how we want to expose this to the user.
> > > >
> > > > Following is an example for Files -> Files -> Kafka that I'm using
> for
> > > > e2e testing. It exercises both ways of setting the start position.
> > > >
> > > > https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a
> > > >
> > > >
> > > > 2. Regarding the events used to implement the actual switch between
> > > > enumerator and readers: I updated the PR with javadoc to clarify the
> > > > intent. Please let me know if that helps or let's continue to discuss
> > > > those details on the PR?
> > > >
> > > >
> > > > Thanks,
> > > > Thomas
> > > >
> > > >
> > > > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <pr...@163.com>
> > > > wrote:
> > > > >
> > > > > Hi Thomas,
> > > > >
> > > > >    Sorry for later reply for your POC. I have reviewed the based
> > abstract
> > > > > implementation of your pull request:
> > > > > https://github.com/apache/flink/pull/15924. IMO, for the switching
> > > > > mechanism, this level of abstraction is not concise enough, which
> > doesn't
> > > > > make connector contribution easier. In theory, it is necessary to
> > > > introduce
> > > > > a set of interfaces to support the switching mechanism. The
> > > > SwitchableSource
> > > > > and SwitchableSplitEnumerator interfaces are needed for connector
> > > > > expansibility.
> > > > >    In other words, the whole switching process of above mentioned
> PR
> > is
> > > > > different from that mentioned in FLIP-150. In the above
> > implementation,
> > > > the
> > > > > source reading switching is executed after receving the
> > > > SwitchSourceEvent,
> > > > > which could be before the sending SourceReaderFinishEvent. This
> > timeline
> > > > of
> > > > > source reading switching could be discussed here.
> > > > >    @Stephan @Becket, if you are available, please help to review
> the
> > > > > abstract implementation, and compare with the interfaces mentioned
> in
> > > > > FLIP-150.
> > > > >
> > > > > Thanks,
> > > > > Nicholas Jiang
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Sent from:
> > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > > >
> >
>

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Posted by Steven Wu <st...@gmail.com>.
> Converter function relies on the specific enumerator capabilities to set
the new start position (e.g.
fileSourceEnumerator.getEndTimestamp() and
kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)

I guess the premise is that a converter is for a specific tuple of
(upstream source, downstream source) . We don't have to define generic
EndtStateT and SwitchableEnumerator interfaces. That should work.

The benefit of defining EndtStateT and SwitchableEnumerator interfaces is
probably promoting uniformity across sources that support hybrid/switchable
source.

On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise <th...@apache.org> wrote:

> Hi Steven,
>
> Thank you for the thorough review of the PR and for bringing this back
> to the mailing list.
>
> All,
>
> I updated the FLIP-150 page to highlight aspects in which the PR
> deviates from the original proposal [1]. The goal would be to update
> the FLIP soon and bring it to a vote, as previously suggested offline
> by Nicholas.
>
> A few minor issues in the PR are outstanding and I'm working on test
> coverage for the recovery behavior, which should be completed soon.
>
> The dynamic position transfer needs to be concluded before we can move
> forward however.
>
> There have been various ideas, including the special
> "SwitchableEnumerator" interface, using enumerator checkpoint state or
> an enumerator interface extension to extract the end state.
>
> One goal in the FLIP is to "Reuse the existing Source connectors built
> with FLIP-27 without any change." and I think it is important to honor
> that goal given that fixed start positions do not require interface
> changes.
>
> Based on the feedback the following might be a good solution for
> runtime position transfer:
>
> * User supplies the optional converter function (not applicable for
> fixed positions).
> * Instead of relying on the enumerator checkpoint state [2], the
> converter function will be supplied with the current and next
> enumerator (source.createEnumerator).
> * Converter function relies on the specific enumerator capabilities to
> set the new start position (e.g.
> fileSourceEnumerator.getEndTimestamp() and
> kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> * HybridSourceSplitEnumerator starts new underlying enumerator
>
> With this approach, there is no need to augment FLIP-27 interfaces and
> custom source capabilities are easier to integrate. Removing the
> mandate to rely on enumerator checkpoint state also avoids potential
> upgrade/compatibility issues.
>
> Thoughts?
>
> Thanks,
> Thomas
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation
> [2]
> https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281
>
>
> On Tue, Jun 1, 2021 at 3:10 PM Steven Wu <st...@gmail.com> wrote:
> >
> > discussed the PR with Thosmas offline. Thomas, please correct me if I
> > missed anything.
> >
> > Right now, the PR differs from the FLIP-150 doc regarding the converter.
> > * Current PR uses the enumerator checkpoint state type as the input for
> the
> > converter
> > * FLIP-150 defines a new EndStateT interface.
> > It seems that the FLIP-150 approach of EndStateT is more flexible, as
> > transition EndStateT doesn't have to be included in the upstream source
> > checkpoint state.
> >
> > Let's look at two use cases:
> > 1) static cutover time at 5 pm. File source reads all data btw 9 am - 5
> pm,
> > then Kafka source starts with initial position of 5 pm. In this case,
> there
> > is no need for converter or EndStateT since the starting time for Kafka
> > source is known and fixed.
> > 2) dynamic cutover time at 1 hour before now. This is useful when the
> > bootstrap of historic data takes a long time (like days or weeks) and we
> > don't know the exact time of cutover when a job is launched. Instead, we
> > are instructing the file source to stop when it gets close to live data.
> In
> > this case, hybrid source construction will specify a relative time (now
> - 1
> > hour), the EndStateT (of file source) will be resolved to an absolute
> time
> > for cutover. We probably don't need to include EndStateT (end timestamp)
> as
> > the file source checkpoint state. Hence, the separate EndStateT is
> probably
> > more desirable.
> >
> > We also discussed the converter for the Kafka source. Kafka source
> supports
> > different OffsetsInitializer impls (including
> TimestampOffsetsInitializer).
> > To support the dynamic cutover time (use case #2 above), we can plug in a
> > SupplierTimestampOffsetInitializer, where the starting timestamp is not
> set
> > during source/job construction. Rather it is a supplier model where the
> > starting timestamp value is set to the resolved absolute timestamp during
> > switch.
> >
> > Thanks,
> > Steven
> >
> >
> >
> > On Thu, May 20, 2021 at 8:59 PM Thomas Weise <th...@apache.org> wrote:
> >
> > > Hi Nicholas,
> > >
> > > Thanks for taking a look at the PR!
> > >
> > > 1. Regarding switching mechanism:
> > >
> > > There has been previous discussion in this thread regarding the pros
> > > and cons of how the switching can be exposed to the user.
> > >
> > > With fixed start positions, no special switching interface to transfer
> > > information between enumerators is required. Sources are configured as
> > > they would be when used standalone and just plugged into HybridSource.
> > > I expect that to be a common use case. You can find an example for
> > > this in the ITCase:
> > >
> > >
> > >
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101
> > >
> > > For dynamic start position, the checkpoint state is used to transfer
> > > information from old to new enumerator. An example for that can be
> > > found here:
> > >
> > >
> > >
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136
> > >
> > > That may look verbose, but the code to convert from one state to
> > > another can be factored out into a utility and the function becomes a
> > > one-liner.
> > >
> > > For common sources like files and Kafka we can potentially (later)
> > > implement the conversion logic as part of the respective connector's
> > > checkpoint and split classes.
> > >
> > > I hope that with the PR up for review, we can soon reach a conclusion
> > > on how we want to expose this to the user.
> > >
> > > Following is an example for Files -> Files -> Kafka that I'm using for
> > > e2e testing. It exercises both ways of setting the start position.
> > >
> > > https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a
> > >
> > >
> > > 2. Regarding the events used to implement the actual switch between
> > > enumerator and readers: I updated the PR with javadoc to clarify the
> > > intent. Please let me know if that helps or let's continue to discuss
> > > those details on the PR?
> > >
> > >
> > > Thanks,
> > > Thomas
> > >
> > >
> > > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <pr...@163.com>
> > > wrote:
> > > >
> > > > Hi Thomas,
> > > >
> > > >    Sorry for later reply for your POC. I have reviewed the based
> abstract
> > > > implementation of your pull request:
> > > > https://github.com/apache/flink/pull/15924. IMO, for the switching
> > > > mechanism, this level of abstraction is not concise enough, which
> doesn't
> > > > make connector contribution easier. In theory, it is necessary to
> > > introduce
> > > > a set of interfaces to support the switching mechanism. The
> > > SwitchableSource
> > > > and SwitchableSplitEnumerator interfaces are needed for connector
> > > > expansibility.
> > > >    In other words, the whole switching process of above mentioned PR
> is
> > > > different from that mentioned in FLIP-150. In the above
> implementation,
> > > the
> > > > source reading switching is executed after receving the
> > > SwitchSourceEvent,
> > > > which could be before the sending SourceReaderFinishEvent. This
> timeline
> > > of
> > > > source reading switching could be discussed here.
> > > >    @Stephan @Becket, if you are available, please help to review the
> > > > abstract implementation, and compare with the interfaces mentioned in
> > > > FLIP-150.
> > > >
> > > > Thanks,
> > > > Nicholas Jiang
> > > >
> > > >
> > > >
> > > > --
> > > > Sent from:
> > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > >
>

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Posted by Thomas Weise <th...@apache.org>.
Hi Steven,

Thank you for the thorough review of the PR and for bringing this back
to the mailing list.

All,

I updated the FLIP-150 page to highlight aspects in which the PR
deviates from the original proposal [1]. The goal would be to update
the FLIP soon and bring it to a vote, as previously suggested offline
by Nicholas.

A few minor issues in the PR are outstanding and I'm working on test
coverage for the recovery behavior, which should be completed soon.

The dynamic position transfer needs to be concluded before we can move
forward however.

There have been various ideas, including the special
"SwitchableEnumerator" interface, using enumerator checkpoint state or
an enumerator interface extension to extract the end state.

One goal in the FLIP is to "Reuse the existing Source connectors built
with FLIP-27 without any change." and I think it is important to honor
that goal given that fixed start positions do not require interface
changes.

Based on the feedback the following might be a good solution for
runtime position transfer:

* User supplies the optional converter function (not applicable for
fixed positions).
* Instead of relying on the enumerator checkpoint state [2], the
converter function will be supplied with the current and next
enumerator (source.createEnumerator).
* Converter function relies on the specific enumerator capabilities to
set the new start position (e.g.
fileSourceEnumerator.getEndTimestamp() and
kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
* HybridSourceSplitEnumerator starts new underlying enumerator

With this approach, there is no need to augment FLIP-27 interfaces and
custom source capabilities are easier to integrate. Removing the
mandate to rely on enumerator checkpoint state also avoids potential
upgrade/compatibility issues.

Thoughts?

Thanks,
Thomas

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation
[2] https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281


On Tue, Jun 1, 2021 at 3:10 PM Steven Wu <st...@gmail.com> wrote:
>
> discussed the PR with Thosmas offline. Thomas, please correct me if I
> missed anything.
>
> Right now, the PR differs from the FLIP-150 doc regarding the converter.
> * Current PR uses the enumerator checkpoint state type as the input for the
> converter
> * FLIP-150 defines a new EndStateT interface.
> It seems that the FLIP-150 approach of EndStateT is more flexible, as
> transition EndStateT doesn't have to be included in the upstream source
> checkpoint state.
>
> Let's look at two use cases:
> 1) static cutover time at 5 pm. File source reads all data btw 9 am - 5 pm,
> then Kafka source starts with initial position of 5 pm. In this case, there
> is no need for converter or EndStateT since the starting time for Kafka
> source is known and fixed.
> 2) dynamic cutover time at 1 hour before now. This is useful when the
> bootstrap of historic data takes a long time (like days or weeks) and we
> don't know the exact time of cutover when a job is launched. Instead, we
> are instructing the file source to stop when it gets close to live data. In
> this case, hybrid source construction will specify a relative time (now - 1
> hour), the EndStateT (of file source) will be resolved to an absolute time
> for cutover. We probably don't need to include EndStateT (end timestamp) as
> the file source checkpoint state. Hence, the separate EndStateT is probably
> more desirable.
>
> We also discussed the converter for the Kafka source. Kafka source supports
> different OffsetsInitializer impls (including TimestampOffsetsInitializer).
> To support the dynamic cutover time (use case #2 above), we can plug in a
> SupplierTimestampOffsetInitializer, where the starting timestamp is not set
> during source/job construction. Rather it is a supplier model where the
> starting timestamp value is set to the resolved absolute timestamp during
> switch.
>
> Thanks,
> Steven
>
>
>
> On Thu, May 20, 2021 at 8:59 PM Thomas Weise <th...@apache.org> wrote:
>
> > Hi Nicholas,
> >
> > Thanks for taking a look at the PR!
> >
> > 1. Regarding switching mechanism:
> >
> > There has been previous discussion in this thread regarding the pros
> > and cons of how the switching can be exposed to the user.
> >
> > With fixed start positions, no special switching interface to transfer
> > information between enumerators is required. Sources are configured as
> > they would be when used standalone and just plugged into HybridSource.
> > I expect that to be a common use case. You can find an example for
> > this in the ITCase:
> >
> >
> > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101
> >
> > For dynamic start position, the checkpoint state is used to transfer
> > information from old to new enumerator. An example for that can be
> > found here:
> >
> >
> > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136
> >
> > That may look verbose, but the code to convert from one state to
> > another can be factored out into a utility and the function becomes a
> > one-liner.
> >
> > For common sources like files and Kafka we can potentially (later)
> > implement the conversion logic as part of the respective connector's
> > checkpoint and split classes.
> >
> > I hope that with the PR up for review, we can soon reach a conclusion
> > on how we want to expose this to the user.
> >
> > Following is an example for Files -> Files -> Kafka that I'm using for
> > e2e testing. It exercises both ways of setting the start position.
> >
> > https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a
> >
> >
> > 2. Regarding the events used to implement the actual switch between
> > enumerator and readers: I updated the PR with javadoc to clarify the
> > intent. Please let me know if that helps or let's continue to discuss
> > those details on the PR?
> >
> >
> > Thanks,
> > Thomas
> >
> >
> > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <pr...@163.com>
> > wrote:
> > >
> > > Hi Thomas,
> > >
> > >    Sorry for later reply for your POC. I have reviewed the based abstract
> > > implementation of your pull request:
> > > https://github.com/apache/flink/pull/15924. IMO, for the switching
> > > mechanism, this level of abstraction is not concise enough, which doesn't
> > > make connector contribution easier. In theory, it is necessary to
> > introduce
> > > a set of interfaces to support the switching mechanism. The
> > SwitchableSource
> > > and SwitchableSplitEnumerator interfaces are needed for connector
> > > expansibility.
> > >    In other words, the whole switching process of above mentioned PR is
> > > different from that mentioned in FLIP-150. In the above implementation,
> > the
> > > source reading switching is executed after receving the
> > SwitchSourceEvent,
> > > which could be before the sending SourceReaderFinishEvent. This timeline
> > of
> > > source reading switching could be discussed here.
> > >    @Stephan @Becket, if you are available, please help to review the
> > > abstract implementation, and compare with the interfaces mentioned in
> > > FLIP-150.
> > >
> > > Thanks,
> > > Nicholas Jiang
> > >
> > >
> > >
> > > --
> > > Sent from:
> > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> >

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Posted by Steven Wu <st...@gmail.com>.
discussed the PR with Thosmas offline. Thomas, please correct me if I
missed anything.

Right now, the PR differs from the FLIP-150 doc regarding the converter.
* Current PR uses the enumerator checkpoint state type as the input for the
converter
* FLIP-150 defines a new EndStateT interface.
It seems that the FLIP-150 approach of EndStateT is more flexible, as
transition EndStateT doesn't have to be included in the upstream source
checkpoint state.

Let's look at two use cases:
1) static cutover time at 5 pm. File source reads all data btw 9 am - 5 pm,
then Kafka source starts with initial position of 5 pm. In this case, there
is no need for converter or EndStateT since the starting time for Kafka
source is known and fixed.
2) dynamic cutover time at 1 hour before now. This is useful when the
bootstrap of historic data takes a long time (like days or weeks) and we
don't know the exact time of cutover when a job is launched. Instead, we
are instructing the file source to stop when it gets close to live data. In
this case, hybrid source construction will specify a relative time (now - 1
hour), the EndStateT (of file source) will be resolved to an absolute time
for cutover. We probably don't need to include EndStateT (end timestamp) as
the file source checkpoint state. Hence, the separate EndStateT is probably
more desirable.

We also discussed the converter for the Kafka source. Kafka source supports
different OffsetsInitializer impls (including TimestampOffsetsInitializer).
To support the dynamic cutover time (use case #2 above), we can plug in a
SupplierTimestampOffsetInitializer, where the starting timestamp is not set
during source/job construction. Rather it is a supplier model where the
starting timestamp value is set to the resolved absolute timestamp during
switch.

Thanks,
Steven



On Thu, May 20, 2021 at 8:59 PM Thomas Weise <th...@apache.org> wrote:

> Hi Nicholas,
>
> Thanks for taking a look at the PR!
>
> 1. Regarding switching mechanism:
>
> There has been previous discussion in this thread regarding the pros
> and cons of how the switching can be exposed to the user.
>
> With fixed start positions, no special switching interface to transfer
> information between enumerators is required. Sources are configured as
> they would be when used standalone and just plugged into HybridSource.
> I expect that to be a common use case. You can find an example for
> this in the ITCase:
>
>
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101
>
> For dynamic start position, the checkpoint state is used to transfer
> information from old to new enumerator. An example for that can be
> found here:
>
>
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136
>
> That may look verbose, but the code to convert from one state to
> another can be factored out into a utility and the function becomes a
> one-liner.
>
> For common sources like files and Kafka we can potentially (later)
> implement the conversion logic as part of the respective connector's
> checkpoint and split classes.
>
> I hope that with the PR up for review, we can soon reach a conclusion
> on how we want to expose this to the user.
>
> Following is an example for Files -> Files -> Kafka that I'm using for
> e2e testing. It exercises both ways of setting the start position.
>
> https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a
>
>
> 2. Regarding the events used to implement the actual switch between
> enumerator and readers: I updated the PR with javadoc to clarify the
> intent. Please let me know if that helps or let's continue to discuss
> those details on the PR?
>
>
> Thanks,
> Thomas
>
>
> On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <pr...@163.com>
> wrote:
> >
> > Hi Thomas,
> >
> >    Sorry for later reply for your POC. I have reviewed the based abstract
> > implementation of your pull request:
> > https://github.com/apache/flink/pull/15924. IMO, for the switching
> > mechanism, this level of abstraction is not concise enough, which doesn't
> > make connector contribution easier. In theory, it is necessary to
> introduce
> > a set of interfaces to support the switching mechanism. The
> SwitchableSource
> > and SwitchableSplitEnumerator interfaces are needed for connector
> > expansibility.
> >    In other words, the whole switching process of above mentioned PR is
> > different from that mentioned in FLIP-150. In the above implementation,
> the
> > source reading switching is executed after receving the
> SwitchSourceEvent,
> > which could be before the sending SourceReaderFinishEvent. This timeline
> of
> > source reading switching could be discussed here.
> >    @Stephan @Becket, if you are available, please help to review the
> > abstract implementation, and compare with the interfaces mentioned in
> > FLIP-150.
> >
> > Thanks,
> > Nicholas Jiang
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Posted by Thomas Weise <th...@apache.org>.
Hi Nicholas,

Thanks for taking a look at the PR!

1. Regarding switching mechanism:

There has been previous discussion in this thread regarding the pros
and cons of how the switching can be exposed to the user.

With fixed start positions, no special switching interface to transfer
information between enumerators is required. Sources are configured as
they would be when used standalone and just plugged into HybridSource.
I expect that to be a common use case. You can find an example for
this in the ITCase:

https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101

For dynamic start position, the checkpoint state is used to transfer
information from old to new enumerator. An example for that can be
found here:

https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136

That may look verbose, but the code to convert from one state to
another can be factored out into a utility and the function becomes a
one-liner.

For common sources like files and Kafka we can potentially (later)
implement the conversion logic as part of the respective connector's
checkpoint and split classes.

I hope that with the PR up for review, we can soon reach a conclusion
on how we want to expose this to the user.

Following is an example for Files -> Files -> Kafka that I'm using for
e2e testing. It exercises both ways of setting the start position.

https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a


2. Regarding the events used to implement the actual switch between
enumerator and readers: I updated the PR with javadoc to clarify the
intent. Please let me know if that helps or let's continue to discuss
those details on the PR?


Thanks,
Thomas


On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <pr...@163.com> wrote:
>
> Hi Thomas,
>
>    Sorry for later reply for your POC. I have reviewed the based abstract
> implementation of your pull request:
> https://github.com/apache/flink/pull/15924. IMO, for the switching
> mechanism, this level of abstraction is not concise enough, which doesn't
> make connector contribution easier. In theory, it is necessary to introduce
> a set of interfaces to support the switching mechanism. The SwitchableSource
> and SwitchableSplitEnumerator interfaces are needed for connector
> expansibility.
>    In other words, the whole switching process of above mentioned PR is
> different from that mentioned in FLIP-150. In the above implementation, the
> source reading switching is executed after receving the SwitchSourceEvent,
> which could be before the sending SourceReaderFinishEvent. This timeline of
> source reading switching could be discussed here.
>    @Stephan @Becket, if you are available, please help to review the
> abstract implementation, and compare with the interfaces mentioned in
> FLIP-150.
>
> Thanks,
> Nicholas Jiang
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Posted by Nicholas Jiang <pr...@163.com>.
Hi Thomas,

   Sorry for later reply for your POC. I have reviewed the based abstract
implementation of your pull request:
https://github.com/apache/flink/pull/15924. IMO, for the switching
mechanism, this level of abstraction is not concise enough, which doesn't
make connector contribution easier. In theory, it is necessary to introduce
a set of interfaces to support the switching mechanism. The SwitchableSource
and SwitchableSplitEnumerator interfaces are needed for connector
expansibility. 
   In other words, the whole switching process of above mentioned PR is
different from that mentioned in FLIP-150. In the above implementation, the
source reading switching is executed after receving the SwitchSourceEvent,
which could be before the sending SourceReaderFinishEvent. This timeline of
source reading switching could be discussed here.
   @Stephan @Becket, if you are available, please help to review the
abstract implementation, and compare with the interfaces mentioned in
FLIP-150.

Thanks,
Nicholas Jiang



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Posted by Becket Qin <be...@gmail.com>.
Thanks for the clarification, Thomas. Yes, that makes sense to me.

Cheers,

Jiangjie (Becket) Qin

On Mon, Apr 26, 2021 at 1:03 AM Thomas Weise <th...@apache.org> wrote:

> Hi Becket,
>
> I agree and am not planning to hard wire a specific combination of
> sources (like S3 + Kafka). That also wouldn't help for the use case I
> want to address, because there are customized connectors that we need
> to be able to plug in.
>
> Rather, the suggested simplification would be for the flexibility of
> switching mechanism.
>
> The prototype already supports fixed start positions and checkpoint
> conversion for any combination of sources; no need to undo that.
>
> But for testing/example purposes, we will need to settle on a specific
> combination.
>
> Thomas
>
> On Sat, Apr 24, 2021 at 8:20 PM Becket Qin <be...@gmail.com> wrote:
> >
> > Sorry for the late reply. Starting from a specific connector sounds
> > reasonable to me.
> >
> > That said, I would suggest to keep the possibility of future
> generalization
> > as much as possible. We have already seen some variation of source
> > combinations from different users, HDFS + Kafka, S3 + Kafka, S3 + SQL
> > Binlog, etc. So it would be good if we can reuse some base abstraction in
> > the future instead of having to write each combination from scratch.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Sat, Apr 17, 2021 at 7:34 PM Stephan Ewen <se...@apache.org> wrote:
> >
> > > Thanks, Thomas!
> > >
> > > @Becket and @Nicholas - would you be ok with that approach?
> > >
> > >
> > > On Thu, Apr 15, 2021 at 6:30 PM Thomas Weise <th...@apache.org> wrote:
> > >
> > > > Hi Stephan,
> > > >
> > > > Thanks for the feedback!
> > > >
> > > > I agree with the approach of starting with a simple implementation
> > > > that can address a well understood, significant portion of use cases.
> > > >
> > > > I'm planning to continue work on the prototype that I had shared.
> > > > There is production level usage waiting for it fairly soon. I expect
> > > > to open a PR in the coming weeks.
> > > >
> > > > Thomas
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Tue, Apr 13, 2021 at 12:15 PM Stephan Ewen <se...@apache.org>
> wrote:
> > > > >
> > > > > Thanks all for this discussion. Looks like there are lots of ideas
> and
> > > > > folks that are eager to do things, so let's see how we can get this
> > > > moving.
> > > > >
> > > > > My take on this is the following:
> > > > >
> > > > > There will probably not be one Hybrid source, but possibly multiple
> > > ones,
> > > > > because of different strategies/requirements.
> > > > >     - One may be very simple, with switching points known up-front.
> > > Would
> > > > > be good to have this in a very simple implementation.
> > > > >     - There may be one where the switch is dynamic and the readers
> need
> > > > to
> > > > > report back where they left off.
> > > > >     - There may be one that switches back and forth multiple times
> > > > during a
> > > > > job, for example Kakfa going to DFS whenever it falls behind
> retention,
> > > > in
> > > > > order to catch up again.
> > > > >
> > > > > This also seems hard to "design on paper"; I expect there are
> nuances
> > > in
> > > > a
> > > > > production setup that affect some details of the design. So I'd
> feel
> > > most
> > > > > comfortable in adding a variant of the hybrid source to Flink that
> has
> > > > been
> > > > > used already in a real use case (not necessarily in production, but
> > > maybe
> > > > > in a testing/staging environment, so it seems to meet all
> > > requirements).
> > > > >
> > > > >
> > > > > What do you think about the following approach?
> > > > >   - If there is a tested PoC, let's try to get it contributed to
> Flink
> > > > > without trying to make it much more general.
> > > > >   - When we see similar but a bit different requirements for
> another
> > > > hybrid
> > > > > source, then let's try to evolve the contributed one.
> > > > >   - If we see new requirements that are so different that they
> don't
> > > fit
> > > > > well with the existing hybrid source, then let us look at building
> a
> > > > second
> > > > > hybrid source for those requirements.
> > > > >
> > > > > We need to make connector contributions in general more easy, and I
> > > think
> > > > > it is not a bad thing to end up with different approaches and see
> how
> > > > these
> > > > > play out against each other when being used by users. For example
> > > > switching
> > > > > with known boundaries, dynamic switching, back-and-forth-switching,
> > > etc.
> > > > > (I know some committers are planning to do some work on making
> > > > > connector contributions easier, with standardized testing
> frameworks,
> > > > > decoupled CI, etc.)
> > > > >
> > > > > Best,
> > > > > Stephan
> > > > >
> > > > >
> > > > > On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise <th...@apache.org>
> wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > As mentioned in my previous email, I had been working on a
> prototype
> > > > for
> > > > > > the hybrid source.
> > > > > >
> > > > > > You can find it at https://github.com/tweise/flink/pull/1
> > > > > >
> > > > > > It contains:
> > > > > > * Switching with configurable chain of sources
> > > > > > * Fixed or dynamic start positions
> > > > > > * Test with MockSource and FileSource
> > > > > >
> > > > > > The purpose of the above PR is to gather feedback and help drive
> > > > consensus
> > > > > > on the FLIP.
> > > > > >
> > > > > > * How to support a dynamic start position within the source
> chain?
> > > > > >
> > > > > > Relevant in those (few?) cases where start positions are not
> known
> > > > upfront.
> > > > > > You can find an example of what that might look like in the
> tests:
> > > > > >
> > > > > >
> > > > > >
> > > >
> > >
> https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62
> > > > > >
> > > > > >
> > > >
> > >
> https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132
> > > > > >
> > > > > > When switching, the enumerator of the previous source needs to
> > > > > > supply information about consumed splits that allows to set the
> start
> > > > > > position for the next source. That could be something like the
> last
> > > > > > processed file, timestamp, etc. (Currently
> StaticFileSplitEnumerator
> > > > > > doesn't track finished splits.)
> > > > > >
> > > > > > See previous discussion regarding start/end position. The
> prototype
> > > > shows
> > > > > > the use of checkpoint state with converter function.
> > > > > >
> > > > > > * Should readers be deployed dynamically?
> > > > > >
> > > > > > The prototype assumes a static source chain that is fixed at job
> > > > submission
> > > > > > time. Conceivably there could be use cases that require more
> > > > flexibility.
> > > > > > Such as switching one KafkaSource for another. A step in that
> > > direction
> > > > > > would be to deploy the actual readers dynamically, at the time of
> > > > switching
> > > > > > source.
> > > > > >
> > > > > > Looking forward to feedback and suggestions for next steps!
> > > > > >
> > > > > > Thomas
> > > > > >
> > > > > > On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <th...@apache.org>
> > > wrote:
> > > > > >
> > > > > > > Hi Nicholas,
> > > > > > >
> > > > > > > Thanks for the reply. I had implemented a small PoC. It
> switches a
> > > > > > > configurable sequence of sources with predefined bounds. I'm
> using
> > > > the
> > > > > > > unmodified MockSource for illustration. It does not require a
> > > > > > "Switchable"
> > > > > > > interface. I looked at the code you shared and the delegation
> and
> > > > > > signaling
> > > > > > > works quite similar. That's a good validation.
> > > > > > >
> > > > > > > Hi Kezhu,
> > > > > > >
> > > > > > > Thanks for bringing the more detailed discussion regarding the
> > > > start/end
> > > > > > > position. I think in most cases the start and end positions
> will be
> > > > known
> > > > > > > when the job is submitted. If we take a File -> Kafka source
> chain
> > > as
> > > > > > > example, there would most likely be a timestamp at which we
> want to
> > > > > > > transition from files to reading from Kafka. So we would
> either set
> > > > the
> > > > > > > start position for Kafka based on that timestamp or provide the
> > > > offsets
> > > > > > > directly. (Note that I'm skipping a few related nuances here.
> In
> > > > order to
> > > > > > > achieve an exact switch without duplication or gap, we may also
> > > need
> > > > some
> > > > > > > overlap and filtering, but that's a separate issue.)
> > > > > > >
> > > > > > > The point is that the start positions can be configured by the
> > > user,
> > > > > > there
> > > > > > > is no need to transfer any information from one source to
> another
> > > as
> > > > part
> > > > > > > of switching.
> > > > > > >
> > > > > > > It gets more complicated if we want to achieve a dynamic switch
> > > > where the
> > > > > > > transition timestamp isn't known when the job starts. For
> example,
> > > > > > consider
> > > > > > > a bootstrap scenario where the time taken to process historic
> data
> > > > > > exceeds
> > > > > > > the Kafka retention. Here, we would need to dynamically
> resolve the
> > > > Kafka
> > > > > > > start position based on where the file readers left off, when
> the
> > > > > > switching
> > > > > > > occurs. The file source enumerator would determine at runtime
> when
> > > > it is
> > > > > > > done handing splits to its readers, maybe when the max file
> > > timestamp
> > > > > > > reaches (processing time - X). This information needs to be
> > > > transferred
> > > > > > to
> > > > > > > the Kafka source.
> > > > > > >
> > > > > > > The timestamp would need to be derived from the file enumerator
> > > > state,
> > > > > > > either by looking at the last splits or explicitly. The
> natural way
> > > > to do
> > > > > > > that is to introspect the enumerator state which gets
> checkpointed.
> > > > Any
> > > > > > > other form of "end position" via a special interface would
> need to
> > > be
> > > > > > > derived in the same manner.
> > > > > > >
> > > > > > > The converter that will be provided by the user would look at
> the
> > > > file
> > > > > > > enumerator state, derive the timestamp and then supply the
> "start
> > > > > > position"
> > > > > > > to the Kafka source. The Kafka source was created when the job
> > > > started.
> > > > > > It
> > > > > > > needs to be augmented with the new start position. That can be
> > > > achieved
> > > > > > via
> > > > > > > a special enumerator interface like
> > > > > > SwitchableSplitEnumerator#setStartState
> > > > > > > or by using restoreEnumerator with the checkpoint state
> constructed
> > > > by
> > > > > > the
> > > > > > > converter function. I'm leaning towards the latter as long as
> there
> > > > is a
> > > > > > > convenient way to construct the state from a position (like
> > > > > > > enumStateForTimestamp). The converter would map one enum state
> to
> > > > another
> > > > > > > and can be made very simple by providing a few utility
> functions
> > > > instead
> > > > > > of
> > > > > > > mandating a new interface that enumerators need to implement to
> > > > become
> > > > > > > switchable.
> > > > > > >
> > > > > > > Again, a converter is only required when sources need to be
> > > switched
> > > > > > based
> > > > > > > on positions not known at graph construction time.
> > > > > > >
> > > > > > > I'm planning to add such deferred switching to the PoC for
> > > > illustration
> > > > > > > and will share the experiment when that's done.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Thomas
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <
> programgeek@163.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > >> Hi Kezhu,
> > > > > > >>
> > > > > > >> Thanks for your detailed points for the Hybrid Source. I
> follow
> > > your
> > > > > > >> opinions and make a corresponding explanation as follows:
> > > > > > >>
> > > > > > >> 1.Would the Hybrid Source be possible to use this feature to
> > > > > > switch/chain
> > > > > > >> multiple homogeneous sources?
> > > > > > >>
> > > > > > >> "HybridSource" supports to switch/chain multiple homogeneous
> > > > sources,
> > > > > > >> which
> > > > > > >> have the respective implementation for "SwitchableSource" and
> > > > > > >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit
> whether
> > > > the
> > > > > > >> Sources consisted is homogeneous. From the user's perspective,
> > > User
> > > > only
> > > > > > >> adds the "SwitchableSource" into "HybridSource" and leaves the
> > > > smooth
> > > > > > >> migration operation to "HybridSource".
> > > > > > >>
> > > > > > >> 2."setStartState" is actually a reposition operation for next
> > > > source to
> > > > > > >> start in job runtime?
> > > > > > >>
> > > > > > >> IMO, "setStartState" is used to determine the initial
> position of
> > > > the
> > > > > > new
> > > > > > >> source for smooth migration, not reposition operation. More
> > > > importantly,
> > > > > > >> the
> > > > > > >> "State" mentioned here refers to the start and end positions
> of
> > > > reading
> > > > > > >> source.
> > > > > > >>
> > > > > > >> 3.This conversion should be implementation detail of next
> source,
> > > > not
> > > > > > >> converter function in my opinion?
> > > > > > >>
> > > > > > >> The state conversion is of course an implementation detail and
> > > > included
> > > > > > in
> > > > > > >> the switching mechanism, that should provide users with the
> > > > conversion
> > > > > > >> interface for conversion, which is defined in converter
> function.
> > > > What's
> > > > > > >> more, when users has already implemented "SwitchableSource"
> and
> > > > added to
> > > > > > >> the
> > > > > > >> Hybrid Source, the users don't need to implement the
> > > > "SwitchableSource"
> > > > > > >> for
> > > > > > >> the different conversion. From the user's perspective, users
> could
> > > > > > define
> > > > > > >> the different converter functions and create the
> > > "SwitchableSource"
> > > > for
> > > > > > >> the
> > > > > > >> addition of "HybridSource", no need to implement a Source for
> the
> > > > > > >> converter
> > > > > > >> function.
> > > > > > >>
> > > > > > >> 4.No configurable start-position. In this situation
> combination of
> > > > above
> > > > > > >> three joints is a nop, and
> > > > > > >> "HybridSource" is a chain of start-position pre-configured
> > > sources?
> > > > > > >>
> > > > > > >> Indeed there is no configurable start-position, and this
> > > > configuration
> > > > > > >> could
> > > > > > >> be involved in the feature. Users could use
> > > > > > >> "SwitchableSplitEnumerator#setStartState" interface or the
> > > > configuration
> > > > > > >> parameters to configure start-position.
> > > > > > >>
> > > > > > >> 5.I am wonder whether end-position is a must and how it could
> be
> > > > useful
> > > > > > >> for
> > > > > > >> end users in a generic-enough source?
> > > > > > >>
> > > > > > >> "getEndState" interface is used for the smooth migration
> scenario,
> > > > which
> > > > > > >> could return null value if it is not needed. In the Hybrid
> Source
> > > > > > >> mechanism,
> > > > > > >> this interface is required for the switching between the
> sources
> > > > > > >> consisted,
> > > > > > >> otherwise there is no any way to get end-position of upstream
> > > > source. In
> > > > > > >> summary, Hybrid Source needs to be able to set the start
> position
> > > > and
> > > > > > get
> > > > > > >> the end position of each Source, otherwise there is no use to
> > > build
> > > > > > Hybrid
> > > > > > >> Source.
> > > > > > >>
> > > > > > >> 6.Is it possible for converter function to do blocking
> operations?
> > > > How
> > > > > > to
> > > > > > >> respond to checkpoint request when switching split enumerators
> > > cross
> > > > > > >> sources? Does end-position or start-position need to be
> stored in
> > > > > > >> checkpoint
> > > > > > >> state or not?
> > > > > > >>
> > > > > > >> The converter function only simply converts the state of
> upstream
> > > > source
> > > > > > >> to
> > > > > > >> the state of downstream source, not blocking operations. The
> way
> > > to
> > > > > > >> respond
> > > > > > >> the checkpoint request when switching split enumerators cross
> > > > sources is
> > > > > > >> send the corresponding "SourceEvent" to coordination. The
> > > > end-position
> > > > > > or
> > > > > > >> start-position don't need to be stored in checkpoint state,
> only
> > > > > > >> implements
> > > > > > >> the "getEndState" interface for end-position.
> > > > > > >>
> > > > > > >> Best,
> > > > > > >> Nicholas Jiang
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >> --
> > > > > > >> Sent from:
> > > > > > >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > > > > > >>
> > > > > > >
> > > > > >
> > > >
> > >
>

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Posted by Thomas Weise <th...@apache.org>.
Hi Becket,

I agree and am not planning to hard wire a specific combination of
sources (like S3 + Kafka). That also wouldn't help for the use case I
want to address, because there are customized connectors that we need
to be able to plug in.

Rather, the suggested simplification would be for the flexibility of
switching mechanism.

The prototype already supports fixed start positions and checkpoint
conversion for any combination of sources; no need to undo that.

But for testing/example purposes, we will need to settle on a specific
combination.

Thomas

On Sat, Apr 24, 2021 at 8:20 PM Becket Qin <be...@gmail.com> wrote:
>
> Sorry for the late reply. Starting from a specific connector sounds
> reasonable to me.
>
> That said, I would suggest to keep the possibility of future generalization
> as much as possible. We have already seen some variation of source
> combinations from different users, HDFS + Kafka, S3 + Kafka, S3 + SQL
> Binlog, etc. So it would be good if we can reuse some base abstraction in
> the future instead of having to write each combination from scratch.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Sat, Apr 17, 2021 at 7:34 PM Stephan Ewen <se...@apache.org> wrote:
>
> > Thanks, Thomas!
> >
> > @Becket and @Nicholas - would you be ok with that approach?
> >
> >
> > On Thu, Apr 15, 2021 at 6:30 PM Thomas Weise <th...@apache.org> wrote:
> >
> > > Hi Stephan,
> > >
> > > Thanks for the feedback!
> > >
> > > I agree with the approach of starting with a simple implementation
> > > that can address a well understood, significant portion of use cases.
> > >
> > > I'm planning to continue work on the prototype that I had shared.
> > > There is production level usage waiting for it fairly soon. I expect
> > > to open a PR in the coming weeks.
> > >
> > > Thomas
> > >
> > >
> > >
> > >
> > >
> > > On Tue, Apr 13, 2021 at 12:15 PM Stephan Ewen <se...@apache.org> wrote:
> > > >
> > > > Thanks all for this discussion. Looks like there are lots of ideas and
> > > > folks that are eager to do things, so let's see how we can get this
> > > moving.
> > > >
> > > > My take on this is the following:
> > > >
> > > > There will probably not be one Hybrid source, but possibly multiple
> > ones,
> > > > because of different strategies/requirements.
> > > >     - One may be very simple, with switching points known up-front.
> > Would
> > > > be good to have this in a very simple implementation.
> > > >     - There may be one where the switch is dynamic and the readers need
> > > to
> > > > report back where they left off.
> > > >     - There may be one that switches back and forth multiple times
> > > during a
> > > > job, for example Kakfa going to DFS whenever it falls behind retention,
> > > in
> > > > order to catch up again.
> > > >
> > > > This also seems hard to "design on paper"; I expect there are nuances
> > in
> > > a
> > > > production setup that affect some details of the design. So I'd feel
> > most
> > > > comfortable in adding a variant of the hybrid source to Flink that has
> > > been
> > > > used already in a real use case (not necessarily in production, but
> > maybe
> > > > in a testing/staging environment, so it seems to meet all
> > requirements).
> > > >
> > > >
> > > > What do you think about the following approach?
> > > >   - If there is a tested PoC, let's try to get it contributed to Flink
> > > > without trying to make it much more general.
> > > >   - When we see similar but a bit different requirements for another
> > > hybrid
> > > > source, then let's try to evolve the contributed one.
> > > >   - If we see new requirements that are so different that they don't
> > fit
> > > > well with the existing hybrid source, then let us look at building a
> > > second
> > > > hybrid source for those requirements.
> > > >
> > > > We need to make connector contributions in general more easy, and I
> > think
> > > > it is not a bad thing to end up with different approaches and see how
> > > these
> > > > play out against each other when being used by users. For example
> > > switching
> > > > with known boundaries, dynamic switching, back-and-forth-switching,
> > etc.
> > > > (I know some committers are planning to do some work on making
> > > > connector contributions easier, with standardized testing frameworks,
> > > > decoupled CI, etc.)
> > > >
> > > > Best,
> > > > Stephan
> > > >
> > > >
> > > > On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise <th...@apache.org> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > As mentioned in my previous email, I had been working on a prototype
> > > for
> > > > > the hybrid source.
> > > > >
> > > > > You can find it at https://github.com/tweise/flink/pull/1
> > > > >
> > > > > It contains:
> > > > > * Switching with configurable chain of sources
> > > > > * Fixed or dynamic start positions
> > > > > * Test with MockSource and FileSource
> > > > >
> > > > > The purpose of the above PR is to gather feedback and help drive
> > > consensus
> > > > > on the FLIP.
> > > > >
> > > > > * How to support a dynamic start position within the source chain?
> > > > >
> > > > > Relevant in those (few?) cases where start positions are not known
> > > upfront.
> > > > > You can find an example of what that might look like in the tests:
> > > > >
> > > > >
> > > > >
> > >
> > https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62
> > > > >
> > > > >
> > >
> > https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132
> > > > >
> > > > > When switching, the enumerator of the previous source needs to
> > > > > supply information about consumed splits that allows to set the start
> > > > > position for the next source. That could be something like the last
> > > > > processed file, timestamp, etc. (Currently StaticFileSplitEnumerator
> > > > > doesn't track finished splits.)
> > > > >
> > > > > See previous discussion regarding start/end position. The prototype
> > > shows
> > > > > the use of checkpoint state with converter function.
> > > > >
> > > > > * Should readers be deployed dynamically?
> > > > >
> > > > > The prototype assumes a static source chain that is fixed at job
> > > submission
> > > > > time. Conceivably there could be use cases that require more
> > > flexibility.
> > > > > Such as switching one KafkaSource for another. A step in that
> > direction
> > > > > would be to deploy the actual readers dynamically, at the time of
> > > switching
> > > > > source.
> > > > >
> > > > > Looking forward to feedback and suggestions for next steps!
> > > > >
> > > > > Thomas
> > > > >
> > > > > On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <th...@apache.org>
> > wrote:
> > > > >
> > > > > > Hi Nicholas,
> > > > > >
> > > > > > Thanks for the reply. I had implemented a small PoC. It switches a
> > > > > > configurable sequence of sources with predefined bounds. I'm using
> > > the
> > > > > > unmodified MockSource for illustration. It does not require a
> > > > > "Switchable"
> > > > > > interface. I looked at the code you shared and the delegation and
> > > > > signaling
> > > > > > works quite similar. That's a good validation.
> > > > > >
> > > > > > Hi Kezhu,
> > > > > >
> > > > > > Thanks for bringing the more detailed discussion regarding the
> > > start/end
> > > > > > position. I think in most cases the start and end positions will be
> > > known
> > > > > > when the job is submitted. If we take a File -> Kafka source chain
> > as
> > > > > > example, there would most likely be a timestamp at which we want to
> > > > > > transition from files to reading from Kafka. So we would either set
> > > the
> > > > > > start position for Kafka based on that timestamp or provide the
> > > offsets
> > > > > > directly. (Note that I'm skipping a few related nuances here. In
> > > order to
> > > > > > achieve an exact switch without duplication or gap, we may also
> > need
> > > some
> > > > > > overlap and filtering, but that's a separate issue.)
> > > > > >
> > > > > > The point is that the start positions can be configured by the
> > user,
> > > > > there
> > > > > > is no need to transfer any information from one source to another
> > as
> > > part
> > > > > > of switching.
> > > > > >
> > > > > > It gets more complicated if we want to achieve a dynamic switch
> > > where the
> > > > > > transition timestamp isn't known when the job starts. For example,
> > > > > consider
> > > > > > a bootstrap scenario where the time taken to process historic data
> > > > > exceeds
> > > > > > the Kafka retention. Here, we would need to dynamically resolve the
> > > Kafka
> > > > > > start position based on where the file readers left off, when the
> > > > > switching
> > > > > > occurs. The file source enumerator would determine at runtime when
> > > it is
> > > > > > done handing splits to its readers, maybe when the max file
> > timestamp
> > > > > > reaches (processing time - X). This information needs to be
> > > transferred
> > > > > to
> > > > > > the Kafka source.
> > > > > >
> > > > > > The timestamp would need to be derived from the file enumerator
> > > state,
> > > > > > either by looking at the last splits or explicitly. The natural way
> > > to do
> > > > > > that is to introspect the enumerator state which gets checkpointed.
> > > Any
> > > > > > other form of "end position" via a special interface would need to
> > be
> > > > > > derived in the same manner.
> > > > > >
> > > > > > The converter that will be provided by the user would look at the
> > > file
> > > > > > enumerator state, derive the timestamp and then supply the "start
> > > > > position"
> > > > > > to the Kafka source. The Kafka source was created when the job
> > > started.
> > > > > It
> > > > > > needs to be augmented with the new start position. That can be
> > > achieved
> > > > > via
> > > > > > a special enumerator interface like
> > > > > SwitchableSplitEnumerator#setStartState
> > > > > > or by using restoreEnumerator with the checkpoint state constructed
> > > by
> > > > > the
> > > > > > converter function. I'm leaning towards the latter as long as there
> > > is a
> > > > > > convenient way to construct the state from a position (like
> > > > > > enumStateForTimestamp). The converter would map one enum state to
> > > another
> > > > > > and can be made very simple by providing a few utility functions
> > > instead
> > > > > of
> > > > > > mandating a new interface that enumerators need to implement to
> > > become
> > > > > > switchable.
> > > > > >
> > > > > > Again, a converter is only required when sources need to be
> > switched
> > > > > based
> > > > > > on positions not known at graph construction time.
> > > > > >
> > > > > > I'm planning to add such deferred switching to the PoC for
> > > illustration
> > > > > > and will share the experiment when that's done.
> > > > > >
> > > > > > Cheers,
> > > > > > Thomas
> > > > > >
> > > > > >
> > > > > > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <programgeek@163.com
> > >
> > > > > wrote:
> > > > > >
> > > > > >> Hi Kezhu,
> > > > > >>
> > > > > >> Thanks for your detailed points for the Hybrid Source. I follow
> > your
> > > > > >> opinions and make a corresponding explanation as follows:
> > > > > >>
> > > > > >> 1.Would the Hybrid Source be possible to use this feature to
> > > > > switch/chain
> > > > > >> multiple homogeneous sources?
> > > > > >>
> > > > > >> "HybridSource" supports to switch/chain multiple homogeneous
> > > sources,
> > > > > >> which
> > > > > >> have the respective implementation for "SwitchableSource" and
> > > > > >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether
> > > the
> > > > > >> Sources consisted is homogeneous. From the user's perspective,
> > User
> > > only
> > > > > >> adds the "SwitchableSource" into "HybridSource" and leaves the
> > > smooth
> > > > > >> migration operation to "HybridSource".
> > > > > >>
> > > > > >> 2."setStartState" is actually a reposition operation for next
> > > source to
> > > > > >> start in job runtime?
> > > > > >>
> > > > > >> IMO, "setStartState" is used to determine the initial position of
> > > the
> > > > > new
> > > > > >> source for smooth migration, not reposition operation. More
> > > importantly,
> > > > > >> the
> > > > > >> "State" mentioned here refers to the start and end positions of
> > > reading
> > > > > >> source.
> > > > > >>
> > > > > >> 3.This conversion should be implementation detail of next source,
> > > not
> > > > > >> converter function in my opinion?
> > > > > >>
> > > > > >> The state conversion is of course an implementation detail and
> > > included
> > > > > in
> > > > > >> the switching mechanism, that should provide users with the
> > > conversion
> > > > > >> interface for conversion, which is defined in converter function.
> > > What's
> > > > > >> more, when users has already implemented "SwitchableSource" and
> > > added to
> > > > > >> the
> > > > > >> Hybrid Source, the users don't need to implement the
> > > "SwitchableSource"
> > > > > >> for
> > > > > >> the different conversion. From the user's perspective, users could
> > > > > define
> > > > > >> the different converter functions and create the
> > "SwitchableSource"
> > > for
> > > > > >> the
> > > > > >> addition of "HybridSource", no need to implement a Source for the
> > > > > >> converter
> > > > > >> function.
> > > > > >>
> > > > > >> 4.No configurable start-position. In this situation combination of
> > > above
> > > > > >> three joints is a nop, and
> > > > > >> "HybridSource" is a chain of start-position pre-configured
> > sources?
> > > > > >>
> > > > > >> Indeed there is no configurable start-position, and this
> > > configuration
> > > > > >> could
> > > > > >> be involved in the feature. Users could use
> > > > > >> "SwitchableSplitEnumerator#setStartState" interface or the
> > > configuration
> > > > > >> parameters to configure start-position.
> > > > > >>
> > > > > >> 5.I am wonder whether end-position is a must and how it could be
> > > useful
> > > > > >> for
> > > > > >> end users in a generic-enough source?
> > > > > >>
> > > > > >> "getEndState" interface is used for the smooth migration scenario,
> > > which
> > > > > >> could return null value if it is not needed. In the Hybrid Source
> > > > > >> mechanism,
> > > > > >> this interface is required for the switching between the sources
> > > > > >> consisted,
> > > > > >> otherwise there is no any way to get end-position of upstream
> > > source. In
> > > > > >> summary, Hybrid Source needs to be able to set the start position
> > > and
> > > > > get
> > > > > >> the end position of each Source, otherwise there is no use to
> > build
> > > > > Hybrid
> > > > > >> Source.
> > > > > >>
> > > > > >> 6.Is it possible for converter function to do blocking operations?
> > > How
> > > > > to
> > > > > >> respond to checkpoint request when switching split enumerators
> > cross
> > > > > >> sources? Does end-position or start-position need to be stored in
> > > > > >> checkpoint
> > > > > >> state or not?
> > > > > >>
> > > > > >> The converter function only simply converts the state of upstream
> > > source
> > > > > >> to
> > > > > >> the state of downstream source, not blocking operations. The way
> > to
> > > > > >> respond
> > > > > >> the checkpoint request when switching split enumerators cross
> > > sources is
> > > > > >> send the corresponding "SourceEvent" to coordination. The
> > > end-position
> > > > > or
> > > > > >> start-position don't need to be stored in checkpoint state, only
> > > > > >> implements
> > > > > >> the "getEndState" interface for end-position.
> > > > > >>
> > > > > >> Best,
> > > > > >> Nicholas Jiang
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> --
> > > > > >> Sent from:
> > > > > >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > > > > >>
> > > > > >
> > > > >
> > >
> >

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Posted by Becket Qin <be...@gmail.com>.
Sorry for the late reply. Starting from a specific connector sounds
reasonable to me.

That said, I would suggest to keep the possibility of future generalization
as much as possible. We have already seen some variation of source
combinations from different users, HDFS + Kafka, S3 + Kafka, S3 + SQL
Binlog, etc. So it would be good if we can reuse some base abstraction in
the future instead of having to write each combination from scratch.

Thanks,

Jiangjie (Becket) Qin

On Sat, Apr 17, 2021 at 7:34 PM Stephan Ewen <se...@apache.org> wrote:

> Thanks, Thomas!
>
> @Becket and @Nicholas - would you be ok with that approach?
>
>
> On Thu, Apr 15, 2021 at 6:30 PM Thomas Weise <th...@apache.org> wrote:
>
> > Hi Stephan,
> >
> > Thanks for the feedback!
> >
> > I agree with the approach of starting with a simple implementation
> > that can address a well understood, significant portion of use cases.
> >
> > I'm planning to continue work on the prototype that I had shared.
> > There is production level usage waiting for it fairly soon. I expect
> > to open a PR in the coming weeks.
> >
> > Thomas
> >
> >
> >
> >
> >
> > On Tue, Apr 13, 2021 at 12:15 PM Stephan Ewen <se...@apache.org> wrote:
> > >
> > > Thanks all for this discussion. Looks like there are lots of ideas and
> > > folks that are eager to do things, so let's see how we can get this
> > moving.
> > >
> > > My take on this is the following:
> > >
> > > There will probably not be one Hybrid source, but possibly multiple
> ones,
> > > because of different strategies/requirements.
> > >     - One may be very simple, with switching points known up-front.
> Would
> > > be good to have this in a very simple implementation.
> > >     - There may be one where the switch is dynamic and the readers need
> > to
> > > report back where they left off.
> > >     - There may be one that switches back and forth multiple times
> > during a
> > > job, for example Kakfa going to DFS whenever it falls behind retention,
> > in
> > > order to catch up again.
> > >
> > > This also seems hard to "design on paper"; I expect there are nuances
> in
> > a
> > > production setup that affect some details of the design. So I'd feel
> most
> > > comfortable in adding a variant of the hybrid source to Flink that has
> > been
> > > used already in a real use case (not necessarily in production, but
> maybe
> > > in a testing/staging environment, so it seems to meet all
> requirements).
> > >
> > >
> > > What do you think about the following approach?
> > >   - If there is a tested PoC, let's try to get it contributed to Flink
> > > without trying to make it much more general.
> > >   - When we see similar but a bit different requirements for another
> > hybrid
> > > source, then let's try to evolve the contributed one.
> > >   - If we see new requirements that are so different that they don't
> fit
> > > well with the existing hybrid source, then let us look at building a
> > second
> > > hybrid source for those requirements.
> > >
> > > We need to make connector contributions in general more easy, and I
> think
> > > it is not a bad thing to end up with different approaches and see how
> > these
> > > play out against each other when being used by users. For example
> > switching
> > > with known boundaries, dynamic switching, back-and-forth-switching,
> etc.
> > > (I know some committers are planning to do some work on making
> > > connector contributions easier, with standardized testing frameworks,
> > > decoupled CI, etc.)
> > >
> > > Best,
> > > Stephan
> > >
> > >
> > > On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise <th...@apache.org> wrote:
> > >
> > > > Hi,
> > > >
> > > > As mentioned in my previous email, I had been working on a prototype
> > for
> > > > the hybrid source.
> > > >
> > > > You can find it at https://github.com/tweise/flink/pull/1
> > > >
> > > > It contains:
> > > > * Switching with configurable chain of sources
> > > > * Fixed or dynamic start positions
> > > > * Test with MockSource and FileSource
> > > >
> > > > The purpose of the above PR is to gather feedback and help drive
> > consensus
> > > > on the FLIP.
> > > >
> > > > * How to support a dynamic start position within the source chain?
> > > >
> > > > Relevant in those (few?) cases where start positions are not known
> > upfront.
> > > > You can find an example of what that might look like in the tests:
> > > >
> > > >
> > > >
> >
> https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62
> > > >
> > > >
> >
> https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132
> > > >
> > > > When switching, the enumerator of the previous source needs to
> > > > supply information about consumed splits that allows to set the start
> > > > position for the next source. That could be something like the last
> > > > processed file, timestamp, etc. (Currently StaticFileSplitEnumerator
> > > > doesn't track finished splits.)
> > > >
> > > > See previous discussion regarding start/end position. The prototype
> > shows
> > > > the use of checkpoint state with converter function.
> > > >
> > > > * Should readers be deployed dynamically?
> > > >
> > > > The prototype assumes a static source chain that is fixed at job
> > submission
> > > > time. Conceivably there could be use cases that require more
> > flexibility.
> > > > Such as switching one KafkaSource for another. A step in that
> direction
> > > > would be to deploy the actual readers dynamically, at the time of
> > switching
> > > > source.
> > > >
> > > > Looking forward to feedback and suggestions for next steps!
> > > >
> > > > Thomas
> > > >
> > > > On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <th...@apache.org>
> wrote:
> > > >
> > > > > Hi Nicholas,
> > > > >
> > > > > Thanks for the reply. I had implemented a small PoC. It switches a
> > > > > configurable sequence of sources with predefined bounds. I'm using
> > the
> > > > > unmodified MockSource for illustration. It does not require a
> > > > "Switchable"
> > > > > interface. I looked at the code you shared and the delegation and
> > > > signaling
> > > > > works quite similar. That's a good validation.
> > > > >
> > > > > Hi Kezhu,
> > > > >
> > > > > Thanks for bringing the more detailed discussion regarding the
> > start/end
> > > > > position. I think in most cases the start and end positions will be
> > known
> > > > > when the job is submitted. If we take a File -> Kafka source chain
> as
> > > > > example, there would most likely be a timestamp at which we want to
> > > > > transition from files to reading from Kafka. So we would either set
> > the
> > > > > start position for Kafka based on that timestamp or provide the
> > offsets
> > > > > directly. (Note that I'm skipping a few related nuances here. In
> > order to
> > > > > achieve an exact switch without duplication or gap, we may also
> need
> > some
> > > > > overlap and filtering, but that's a separate issue.)
> > > > >
> > > > > The point is that the start positions can be configured by the
> user,
> > > > there
> > > > > is no need to transfer any information from one source to another
> as
> > part
> > > > > of switching.
> > > > >
> > > > > It gets more complicated if we want to achieve a dynamic switch
> > where the
> > > > > transition timestamp isn't known when the job starts. For example,
> > > > consider
> > > > > a bootstrap scenario where the time taken to process historic data
> > > > exceeds
> > > > > the Kafka retention. Here, we would need to dynamically resolve the
> > Kafka
> > > > > start position based on where the file readers left off, when the
> > > > switching
> > > > > occurs. The file source enumerator would determine at runtime when
> > it is
> > > > > done handing splits to its readers, maybe when the max file
> timestamp
> > > > > reaches (processing time - X). This information needs to be
> > transferred
> > > > to
> > > > > the Kafka source.
> > > > >
> > > > > The timestamp would need to be derived from the file enumerator
> > state,
> > > > > either by looking at the last splits or explicitly. The natural way
> > to do
> > > > > that is to introspect the enumerator state which gets checkpointed.
> > Any
> > > > > other form of "end position" via a special interface would need to
> be
> > > > > derived in the same manner.
> > > > >
> > > > > The converter that will be provided by the user would look at the
> > file
> > > > > enumerator state, derive the timestamp and then supply the "start
> > > > position"
> > > > > to the Kafka source. The Kafka source was created when the job
> > started.
> > > > It
> > > > > needs to be augmented with the new start position. That can be
> > achieved
> > > > via
> > > > > a special enumerator interface like
> > > > SwitchableSplitEnumerator#setStartState
> > > > > or by using restoreEnumerator with the checkpoint state constructed
> > by
> > > > the
> > > > > converter function. I'm leaning towards the latter as long as there
> > is a
> > > > > convenient way to construct the state from a position (like
> > > > > enumStateForTimestamp). The converter would map one enum state to
> > another
> > > > > and can be made very simple by providing a few utility functions
> > instead
> > > > of
> > > > > mandating a new interface that enumerators need to implement to
> > become
> > > > > switchable.
> > > > >
> > > > > Again, a converter is only required when sources need to be
> switched
> > > > based
> > > > > on positions not known at graph construction time.
> > > > >
> > > > > I'm planning to add such deferred switching to the PoC for
> > illustration
> > > > > and will share the experiment when that's done.
> > > > >
> > > > > Cheers,
> > > > > Thomas
> > > > >
> > > > >
> > > > > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <programgeek@163.com
> >
> > > > wrote:
> > > > >
> > > > >> Hi Kezhu,
> > > > >>
> > > > >> Thanks for your detailed points for the Hybrid Source. I follow
> your
> > > > >> opinions and make a corresponding explanation as follows:
> > > > >>
> > > > >> 1.Would the Hybrid Source be possible to use this feature to
> > > > switch/chain
> > > > >> multiple homogeneous sources?
> > > > >>
> > > > >> "HybridSource" supports to switch/chain multiple homogeneous
> > sources,
> > > > >> which
> > > > >> have the respective implementation for "SwitchableSource" and
> > > > >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether
> > the
> > > > >> Sources consisted is homogeneous. From the user's perspective,
> User
> > only
> > > > >> adds the "SwitchableSource" into "HybridSource" and leaves the
> > smooth
> > > > >> migration operation to "HybridSource".
> > > > >>
> > > > >> 2."setStartState" is actually a reposition operation for next
> > source to
> > > > >> start in job runtime?
> > > > >>
> > > > >> IMO, "setStartState" is used to determine the initial position of
> > the
> > > > new
> > > > >> source for smooth migration, not reposition operation. More
> > importantly,
> > > > >> the
> > > > >> "State" mentioned here refers to the start and end positions of
> > reading
> > > > >> source.
> > > > >>
> > > > >> 3.This conversion should be implementation detail of next source,
> > not
> > > > >> converter function in my opinion?
> > > > >>
> > > > >> The state conversion is of course an implementation detail and
> > included
> > > > in
> > > > >> the switching mechanism, that should provide users with the
> > conversion
> > > > >> interface for conversion, which is defined in converter function.
> > What's
> > > > >> more, when users has already implemented "SwitchableSource" and
> > added to
> > > > >> the
> > > > >> Hybrid Source, the users don't need to implement the
> > "SwitchableSource"
> > > > >> for
> > > > >> the different conversion. From the user's perspective, users could
> > > > define
> > > > >> the different converter functions and create the
> "SwitchableSource"
> > for
> > > > >> the
> > > > >> addition of "HybridSource", no need to implement a Source for the
> > > > >> converter
> > > > >> function.
> > > > >>
> > > > >> 4.No configurable start-position. In this situation combination of
> > above
> > > > >> three joints is a nop, and
> > > > >> "HybridSource" is a chain of start-position pre-configured
> sources?
> > > > >>
> > > > >> Indeed there is no configurable start-position, and this
> > configuration
> > > > >> could
> > > > >> be involved in the feature. Users could use
> > > > >> "SwitchableSplitEnumerator#setStartState" interface or the
> > configuration
> > > > >> parameters to configure start-position.
> > > > >>
> > > > >> 5.I am wonder whether end-position is a must and how it could be
> > useful
> > > > >> for
> > > > >> end users in a generic-enough source?
> > > > >>
> > > > >> "getEndState" interface is used for the smooth migration scenario,
> > which
> > > > >> could return null value if it is not needed. In the Hybrid Source
> > > > >> mechanism,
> > > > >> this interface is required for the switching between the sources
> > > > >> consisted,
> > > > >> otherwise there is no any way to get end-position of upstream
> > source. In
> > > > >> summary, Hybrid Source needs to be able to set the start position
> > and
> > > > get
> > > > >> the end position of each Source, otherwise there is no use to
> build
> > > > Hybrid
> > > > >> Source.
> > > > >>
> > > > >> 6.Is it possible for converter function to do blocking operations?
> > How
> > > > to
> > > > >> respond to checkpoint request when switching split enumerators
> cross
> > > > >> sources? Does end-position or start-position need to be stored in
> > > > >> checkpoint
> > > > >> state or not?
> > > > >>
> > > > >> The converter function only simply converts the state of upstream
> > source
> > > > >> to
> > > > >> the state of downstream source, not blocking operations. The way
> to
> > > > >> respond
> > > > >> the checkpoint request when switching split enumerators cross
> > sources is
> > > > >> send the corresponding "SourceEvent" to coordination. The
> > end-position
> > > > or
> > > > >> start-position don't need to be stored in checkpoint state, only
> > > > >> implements
> > > > >> the "getEndState" interface for end-position.
> > > > >>
> > > > >> Best,
> > > > >> Nicholas Jiang
> > > > >>
> > > > >>
> > > > >>
> > > > >> --
> > > > >> Sent from:
> > > > >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > > > >>
> > > > >
> > > >
> >
>

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Posted by Stephan Ewen <se...@apache.org>.
Thanks, Thomas!

@Becket and @Nicholas - would you be ok with that approach?


On Thu, Apr 15, 2021 at 6:30 PM Thomas Weise <th...@apache.org> wrote:

> Hi Stephan,
>
> Thanks for the feedback!
>
> I agree with the approach of starting with a simple implementation
> that can address a well understood, significant portion of use cases.
>
> I'm planning to continue work on the prototype that I had shared.
> There is production level usage waiting for it fairly soon. I expect
> to open a PR in the coming weeks.
>
> Thomas
>
>
>
>
>
> On Tue, Apr 13, 2021 at 12:15 PM Stephan Ewen <se...@apache.org> wrote:
> >
> > Thanks all for this discussion. Looks like there are lots of ideas and
> > folks that are eager to do things, so let's see how we can get this
> moving.
> >
> > My take on this is the following:
> >
> > There will probably not be one Hybrid source, but possibly multiple ones,
> > because of different strategies/requirements.
> >     - One may be very simple, with switching points known up-front. Would
> > be good to have this in a very simple implementation.
> >     - There may be one where the switch is dynamic and the readers need
> to
> > report back where they left off.
> >     - There may be one that switches back and forth multiple times
> during a
> > job, for example Kakfa going to DFS whenever it falls behind retention,
> in
> > order to catch up again.
> >
> > This also seems hard to "design on paper"; I expect there are nuances in
> a
> > production setup that affect some details of the design. So I'd feel most
> > comfortable in adding a variant of the hybrid source to Flink that has
> been
> > used already in a real use case (not necessarily in production, but maybe
> > in a testing/staging environment, so it seems to meet all requirements).
> >
> >
> > What do you think about the following approach?
> >   - If there is a tested PoC, let's try to get it contributed to Flink
> > without trying to make it much more general.
> >   - When we see similar but a bit different requirements for another
> hybrid
> > source, then let's try to evolve the contributed one.
> >   - If we see new requirements that are so different that they don't fit
> > well with the existing hybrid source, then let us look at building a
> second
> > hybrid source for those requirements.
> >
> > We need to make connector contributions in general more easy, and I think
> > it is not a bad thing to end up with different approaches and see how
> these
> > play out against each other when being used by users. For example
> switching
> > with known boundaries, dynamic switching, back-and-forth-switching, etc.
> > (I know some committers are planning to do some work on making
> > connector contributions easier, with standardized testing frameworks,
> > decoupled CI, etc.)
> >
> > Best,
> > Stephan
> >
> >
> > On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise <th...@apache.org> wrote:
> >
> > > Hi,
> > >
> > > As mentioned in my previous email, I had been working on a prototype
> for
> > > the hybrid source.
> > >
> > > You can find it at https://github.com/tweise/flink/pull/1
> > >
> > > It contains:
> > > * Switching with configurable chain of sources
> > > * Fixed or dynamic start positions
> > > * Test with MockSource and FileSource
> > >
> > > The purpose of the above PR is to gather feedback and help drive
> consensus
> > > on the FLIP.
> > >
> > > * How to support a dynamic start position within the source chain?
> > >
> > > Relevant in those (few?) cases where start positions are not known
> upfront.
> > > You can find an example of what that might look like in the tests:
> > >
> > >
> > >
> https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62
> > >
> > >
> https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132
> > >
> > > When switching, the enumerator of the previous source needs to
> > > supply information about consumed splits that allows to set the start
> > > position for the next source. That could be something like the last
> > > processed file, timestamp, etc. (Currently StaticFileSplitEnumerator
> > > doesn't track finished splits.)
> > >
> > > See previous discussion regarding start/end position. The prototype
> shows
> > > the use of checkpoint state with converter function.
> > >
> > > * Should readers be deployed dynamically?
> > >
> > > The prototype assumes a static source chain that is fixed at job
> submission
> > > time. Conceivably there could be use cases that require more
> flexibility.
> > > Such as switching one KafkaSource for another. A step in that direction
> > > would be to deploy the actual readers dynamically, at the time of
> switching
> > > source.
> > >
> > > Looking forward to feedback and suggestions for next steps!
> > >
> > > Thomas
> > >
> > > On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <th...@apache.org> wrote:
> > >
> > > > Hi Nicholas,
> > > >
> > > > Thanks for the reply. I had implemented a small PoC. It switches a
> > > > configurable sequence of sources with predefined bounds. I'm using
> the
> > > > unmodified MockSource for illustration. It does not require a
> > > "Switchable"
> > > > interface. I looked at the code you shared and the delegation and
> > > signaling
> > > > works quite similar. That's a good validation.
> > > >
> > > > Hi Kezhu,
> > > >
> > > > Thanks for bringing the more detailed discussion regarding the
> start/end
> > > > position. I think in most cases the start and end positions will be
> known
> > > > when the job is submitted. If we take a File -> Kafka source chain as
> > > > example, there would most likely be a timestamp at which we want to
> > > > transition from files to reading from Kafka. So we would either set
> the
> > > > start position for Kafka based on that timestamp or provide the
> offsets
> > > > directly. (Note that I'm skipping a few related nuances here. In
> order to
> > > > achieve an exact switch without duplication or gap, we may also need
> some
> > > > overlap and filtering, but that's a separate issue.)
> > > >
> > > > The point is that the start positions can be configured by the user,
> > > there
> > > > is no need to transfer any information from one source to another as
> part
> > > > of switching.
> > > >
> > > > It gets more complicated if we want to achieve a dynamic switch
> where the
> > > > transition timestamp isn't known when the job starts. For example,
> > > consider
> > > > a bootstrap scenario where the time taken to process historic data
> > > exceeds
> > > > the Kafka retention. Here, we would need to dynamically resolve the
> Kafka
> > > > start position based on where the file readers left off, when the
> > > switching
> > > > occurs. The file source enumerator would determine at runtime when
> it is
> > > > done handing splits to its readers, maybe when the max file timestamp
> > > > reaches (processing time - X). This information needs to be
> transferred
> > > to
> > > > the Kafka source.
> > > >
> > > > The timestamp would need to be derived from the file enumerator
> state,
> > > > either by looking at the last splits or explicitly. The natural way
> to do
> > > > that is to introspect the enumerator state which gets checkpointed.
> Any
> > > > other form of "end position" via a special interface would need to be
> > > > derived in the same manner.
> > > >
> > > > The converter that will be provided by the user would look at the
> file
> > > > enumerator state, derive the timestamp and then supply the "start
> > > position"
> > > > to the Kafka source. The Kafka source was created when the job
> started.
> > > It
> > > > needs to be augmented with the new start position. That can be
> achieved
> > > via
> > > > a special enumerator interface like
> > > SwitchableSplitEnumerator#setStartState
> > > > or by using restoreEnumerator with the checkpoint state constructed
> by
> > > the
> > > > converter function. I'm leaning towards the latter as long as there
> is a
> > > > convenient way to construct the state from a position (like
> > > > enumStateForTimestamp). The converter would map one enum state to
> another
> > > > and can be made very simple by providing a few utility functions
> instead
> > > of
> > > > mandating a new interface that enumerators need to implement to
> become
> > > > switchable.
> > > >
> > > > Again, a converter is only required when sources need to be switched
> > > based
> > > > on positions not known at graph construction time.
> > > >
> > > > I'm planning to add such deferred switching to the PoC for
> illustration
> > > > and will share the experiment when that's done.
> > > >
> > > > Cheers,
> > > > Thomas
> > > >
> > > >
> > > > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <pr...@163.com>
> > > wrote:
> > > >
> > > >> Hi Kezhu,
> > > >>
> > > >> Thanks for your detailed points for the Hybrid Source. I follow your
> > > >> opinions and make a corresponding explanation as follows:
> > > >>
> > > >> 1.Would the Hybrid Source be possible to use this feature to
> > > switch/chain
> > > >> multiple homogeneous sources?
> > > >>
> > > >> "HybridSource" supports to switch/chain multiple homogeneous
> sources,
> > > >> which
> > > >> have the respective implementation for "SwitchableSource" and
> > > >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether
> the
> > > >> Sources consisted is homogeneous. From the user's perspective, User
> only
> > > >> adds the "SwitchableSource" into "HybridSource" and leaves the
> smooth
> > > >> migration operation to "HybridSource".
> > > >>
> > > >> 2."setStartState" is actually a reposition operation for next
> source to
> > > >> start in job runtime?
> > > >>
> > > >> IMO, "setStartState" is used to determine the initial position of
> the
> > > new
> > > >> source for smooth migration, not reposition operation. More
> importantly,
> > > >> the
> > > >> "State" mentioned here refers to the start and end positions of
> reading
> > > >> source.
> > > >>
> > > >> 3.This conversion should be implementation detail of next source,
> not
> > > >> converter function in my opinion?
> > > >>
> > > >> The state conversion is of course an implementation detail and
> included
> > > in
> > > >> the switching mechanism, that should provide users with the
> conversion
> > > >> interface for conversion, which is defined in converter function.
> What's
> > > >> more, when users has already implemented "SwitchableSource" and
> added to
> > > >> the
> > > >> Hybrid Source, the users don't need to implement the
> "SwitchableSource"
> > > >> for
> > > >> the different conversion. From the user's perspective, users could
> > > define
> > > >> the different converter functions and create the "SwitchableSource"
> for
> > > >> the
> > > >> addition of "HybridSource", no need to implement a Source for the
> > > >> converter
> > > >> function.
> > > >>
> > > >> 4.No configurable start-position. In this situation combination of
> above
> > > >> three joints is a nop, and
> > > >> "HybridSource" is a chain of start-position pre-configured sources?
> > > >>
> > > >> Indeed there is no configurable start-position, and this
> configuration
> > > >> could
> > > >> be involved in the feature. Users could use
> > > >> "SwitchableSplitEnumerator#setStartState" interface or the
> configuration
> > > >> parameters to configure start-position.
> > > >>
> > > >> 5.I am wonder whether end-position is a must and how it could be
> useful
> > > >> for
> > > >> end users in a generic-enough source?
> > > >>
> > > >> "getEndState" interface is used for the smooth migration scenario,
> which
> > > >> could return null value if it is not needed. In the Hybrid Source
> > > >> mechanism,
> > > >> this interface is required for the switching between the sources
> > > >> consisted,
> > > >> otherwise there is no any way to get end-position of upstream
> source. In
> > > >> summary, Hybrid Source needs to be able to set the start position
> and
> > > get
> > > >> the end position of each Source, otherwise there is no use to build
> > > Hybrid
> > > >> Source.
> > > >>
> > > >> 6.Is it possible for converter function to do blocking operations?
> How
> > > to
> > > >> respond to checkpoint request when switching split enumerators cross
> > > >> sources? Does end-position or start-position need to be stored in
> > > >> checkpoint
> > > >> state or not?
> > > >>
> > > >> The converter function only simply converts the state of upstream
> source
> > > >> to
> > > >> the state of downstream source, not blocking operations. The way to
> > > >> respond
> > > >> the checkpoint request when switching split enumerators cross
> sources is
> > > >> send the corresponding "SourceEvent" to coordination. The
> end-position
> > > or
> > > >> start-position don't need to be stored in checkpoint state, only
> > > >> implements
> > > >> the "getEndState" interface for end-position.
> > > >>
> > > >> Best,
> > > >> Nicholas Jiang
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >> Sent from:
> > > >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > > >>
> > > >
> > >
>

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Posted by Thomas Weise <th...@apache.org>.
Hi Stephan,

Thanks for the feedback!

I agree with the approach of starting with a simple implementation
that can address a well understood, significant portion of use cases.

I'm planning to continue work on the prototype that I had shared.
There is production level usage waiting for it fairly soon. I expect
to open a PR in the coming weeks.

Thomas





On Tue, Apr 13, 2021 at 12:15 PM Stephan Ewen <se...@apache.org> wrote:
>
> Thanks all for this discussion. Looks like there are lots of ideas and
> folks that are eager to do things, so let's see how we can get this moving.
>
> My take on this is the following:
>
> There will probably not be one Hybrid source, but possibly multiple ones,
> because of different strategies/requirements.
>     - One may be very simple, with switching points known up-front. Would
> be good to have this in a very simple implementation.
>     - There may be one where the switch is dynamic and the readers need to
> report back where they left off.
>     - There may be one that switches back and forth multiple times during a
> job, for example Kakfa going to DFS whenever it falls behind retention, in
> order to catch up again.
>
> This also seems hard to "design on paper"; I expect there are nuances in a
> production setup that affect some details of the design. So I'd feel most
> comfortable in adding a variant of the hybrid source to Flink that has been
> used already in a real use case (not necessarily in production, but maybe
> in a testing/staging environment, so it seems to meet all requirements).
>
>
> What do you think about the following approach?
>   - If there is a tested PoC, let's try to get it contributed to Flink
> without trying to make it much more general.
>   - When we see similar but a bit different requirements for another hybrid
> source, then let's try to evolve the contributed one.
>   - If we see new requirements that are so different that they don't fit
> well with the existing hybrid source, then let us look at building a second
> hybrid source for those requirements.
>
> We need to make connector contributions in general more easy, and I think
> it is not a bad thing to end up with different approaches and see how these
> play out against each other when being used by users. For example switching
> with known boundaries, dynamic switching, back-and-forth-switching, etc.
> (I know some committers are planning to do some work on making
> connector contributions easier, with standardized testing frameworks,
> decoupled CI, etc.)
>
> Best,
> Stephan
>
>
> On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise <th...@apache.org> wrote:
>
> > Hi,
> >
> > As mentioned in my previous email, I had been working on a prototype for
> > the hybrid source.
> >
> > You can find it at https://github.com/tweise/flink/pull/1
> >
> > It contains:
> > * Switching with configurable chain of sources
> > * Fixed or dynamic start positions
> > * Test with MockSource and FileSource
> >
> > The purpose of the above PR is to gather feedback and help drive consensus
> > on the FLIP.
> >
> > * How to support a dynamic start position within the source chain?
> >
> > Relevant in those (few?) cases where start positions are not known upfront.
> > You can find an example of what that might look like in the tests:
> >
> >
> > https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62
> >
> > https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132
> >
> > When switching, the enumerator of the previous source needs to
> > supply information about consumed splits that allows to set the start
> > position for the next source. That could be something like the last
> > processed file, timestamp, etc. (Currently StaticFileSplitEnumerator
> > doesn't track finished splits.)
> >
> > See previous discussion regarding start/end position. The prototype shows
> > the use of checkpoint state with converter function.
> >
> > * Should readers be deployed dynamically?
> >
> > The prototype assumes a static source chain that is fixed at job submission
> > time. Conceivably there could be use cases that require more flexibility.
> > Such as switching one KafkaSource for another. A step in that direction
> > would be to deploy the actual readers dynamically, at the time of switching
> > source.
> >
> > Looking forward to feedback and suggestions for next steps!
> >
> > Thomas
> >
> > On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <th...@apache.org> wrote:
> >
> > > Hi Nicholas,
> > >
> > > Thanks for the reply. I had implemented a small PoC. It switches a
> > > configurable sequence of sources with predefined bounds. I'm using the
> > > unmodified MockSource for illustration. It does not require a
> > "Switchable"
> > > interface. I looked at the code you shared and the delegation and
> > signaling
> > > works quite similar. That's a good validation.
> > >
> > > Hi Kezhu,
> > >
> > > Thanks for bringing the more detailed discussion regarding the start/end
> > > position. I think in most cases the start and end positions will be known
> > > when the job is submitted. If we take a File -> Kafka source chain as
> > > example, there would most likely be a timestamp at which we want to
> > > transition from files to reading from Kafka. So we would either set the
> > > start position for Kafka based on that timestamp or provide the offsets
> > > directly. (Note that I'm skipping a few related nuances here. In order to
> > > achieve an exact switch without duplication or gap, we may also need some
> > > overlap and filtering, but that's a separate issue.)
> > >
> > > The point is that the start positions can be configured by the user,
> > there
> > > is no need to transfer any information from one source to another as part
> > > of switching.
> > >
> > > It gets more complicated if we want to achieve a dynamic switch where the
> > > transition timestamp isn't known when the job starts. For example,
> > consider
> > > a bootstrap scenario where the time taken to process historic data
> > exceeds
> > > the Kafka retention. Here, we would need to dynamically resolve the Kafka
> > > start position based on where the file readers left off, when the
> > switching
> > > occurs. The file source enumerator would determine at runtime when it is
> > > done handing splits to its readers, maybe when the max file timestamp
> > > reaches (processing time - X). This information needs to be transferred
> > to
> > > the Kafka source.
> > >
> > > The timestamp would need to be derived from the file enumerator state,
> > > either by looking at the last splits or explicitly. The natural way to do
> > > that is to introspect the enumerator state which gets checkpointed. Any
> > > other form of "end position" via a special interface would need to be
> > > derived in the same manner.
> > >
> > > The converter that will be provided by the user would look at the file
> > > enumerator state, derive the timestamp and then supply the "start
> > position"
> > > to the Kafka source. The Kafka source was created when the job started.
> > It
> > > needs to be augmented with the new start position. That can be achieved
> > via
> > > a special enumerator interface like
> > SwitchableSplitEnumerator#setStartState
> > > or by using restoreEnumerator with the checkpoint state constructed by
> > the
> > > converter function. I'm leaning towards the latter as long as there is a
> > > convenient way to construct the state from a position (like
> > > enumStateForTimestamp). The converter would map one enum state to another
> > > and can be made very simple by providing a few utility functions instead
> > of
> > > mandating a new interface that enumerators need to implement to become
> > > switchable.
> > >
> > > Again, a converter is only required when sources need to be switched
> > based
> > > on positions not known at graph construction time.
> > >
> > > I'm planning to add such deferred switching to the PoC for illustration
> > > and will share the experiment when that's done.
> > >
> > > Cheers,
> > > Thomas
> > >
> > >
> > > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <pr...@163.com>
> > wrote:
> > >
> > >> Hi Kezhu,
> > >>
> > >> Thanks for your detailed points for the Hybrid Source. I follow your
> > >> opinions and make a corresponding explanation as follows:
> > >>
> > >> 1.Would the Hybrid Source be possible to use this feature to
> > switch/chain
> > >> multiple homogeneous sources?
> > >>
> > >> "HybridSource" supports to switch/chain multiple homogeneous sources,
> > >> which
> > >> have the respective implementation for "SwitchableSource" and
> > >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether the
> > >> Sources consisted is homogeneous. From the user's perspective, User only
> > >> adds the "SwitchableSource" into "HybridSource" and leaves the smooth
> > >> migration operation to "HybridSource".
> > >>
> > >> 2."setStartState" is actually a reposition operation for next source to
> > >> start in job runtime?
> > >>
> > >> IMO, "setStartState" is used to determine the initial position of the
> > new
> > >> source for smooth migration, not reposition operation. More importantly,
> > >> the
> > >> "State" mentioned here refers to the start and end positions of reading
> > >> source.
> > >>
> > >> 3.This conversion should be implementation detail of next source, not
> > >> converter function in my opinion?
> > >>
> > >> The state conversion is of course an implementation detail and included
> > in
> > >> the switching mechanism, that should provide users with the conversion
> > >> interface for conversion, which is defined in converter function. What's
> > >> more, when users has already implemented "SwitchableSource" and added to
> > >> the
> > >> Hybrid Source, the users don't need to implement the "SwitchableSource"
> > >> for
> > >> the different conversion. From the user's perspective, users could
> > define
> > >> the different converter functions and create the "SwitchableSource" for
> > >> the
> > >> addition of "HybridSource", no need to implement a Source for the
> > >> converter
> > >> function.
> > >>
> > >> 4.No configurable start-position. In this situation combination of above
> > >> three joints is a nop, and
> > >> "HybridSource" is a chain of start-position pre-configured sources?
> > >>
> > >> Indeed there is no configurable start-position, and this configuration
> > >> could
> > >> be involved in the feature. Users could use
> > >> "SwitchableSplitEnumerator#setStartState" interface or the configuration
> > >> parameters to configure start-position.
> > >>
> > >> 5.I am wonder whether end-position is a must and how it could be useful
> > >> for
> > >> end users in a generic-enough source?
> > >>
> > >> "getEndState" interface is used for the smooth migration scenario, which
> > >> could return null value if it is not needed. In the Hybrid Source
> > >> mechanism,
> > >> this interface is required for the switching between the sources
> > >> consisted,
> > >> otherwise there is no any way to get end-position of upstream source. In
> > >> summary, Hybrid Source needs to be able to set the start position and
> > get
> > >> the end position of each Source, otherwise there is no use to build
> > Hybrid
> > >> Source.
> > >>
> > >> 6.Is it possible for converter function to do blocking operations? How
> > to
> > >> respond to checkpoint request when switching split enumerators cross
> > >> sources? Does end-position or start-position need to be stored in
> > >> checkpoint
> > >> state or not?
> > >>
> > >> The converter function only simply converts the state of upstream source
> > >> to
> > >> the state of downstream source, not blocking operations. The way to
> > >> respond
> > >> the checkpoint request when switching split enumerators cross sources is
> > >> send the corresponding "SourceEvent" to coordination. The end-position
> > or
> > >> start-position don't need to be stored in checkpoint state, only
> > >> implements
> > >> the "getEndState" interface for end-position.
> > >>
> > >> Best,
> > >> Nicholas Jiang
> > >>
> > >>
> > >>
> > >> --
> > >> Sent from:
> > >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > >>
> > >
> >