You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Steven Zhen Wu (Jira)" <ji...@apache.org> on 2020/10/27 03:18:00 UTC

[jira] [Comment Edited] (FLINK-19799) Make FileSource extensible

    [ https://issues.apache.org/jira/browse/FLINK-19799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17221102#comment-17221102 ] 

Steven Zhen Wu edited comment on FLINK-19799 at 10/27/20, 3:17 AM:
-------------------------------------------------------------------

We just went through a similar exercise with the [Iceberg source PoC|https://github.com/stevenzwu/iceberg/pull/2/files]. We want to make assigner pluggable (no order/locale guarantee, some ordering guarantee, local aware etc.). Different assigner may have different state type for checkpoint. That is why we have to add generic types for assigner state and serializer too.

We did make IcebergSource generic
{code}
public class IcebergSource<T,
    SplitAssignerStateT extends SplitAssignerState,
    SplitAssignerT extends SplitAssigner<SplitAssignerStateT>,
    SplitAssignerStateSerializerT extends SplitAssignerStateSerializer<SplitAssignerStateT>>
{code}

We simplified the construction in builder.
{code}
  public static <T> Builder<T, SimpleSplitAssignerState, SimpleSplitAssigner,
      SimpleSplitAssignerStateSerializer> useSimpleAssigner(TableLoader tableLoader) {
    SimpleSplitAssignerFactory assignerFactory = new SimpleSplitAssignerFactory();
    return new Builder<>(tableLoader, assignerFactory);
  }
{code}

The end result is still simple for users if they don't need to keep a reference to the IcebergSource object.
{code}
    final DataStream<RowData> stream = env.fromSource(
        IcebergSource.<RowData>useSimpleAssigner(tableLoader())
            .iteratorFactory(new RowDataIteratorFactory())
            .config(config)
            .scanContext(scanContext)
        .build(),
    ...
{code}


was (Author: stevenz3wu):
We just went through a similar exercise with the Iceberg source PoC. We want to make assigner pluggable (no order/locale guarantee, some ordering guarantee, local aware etc.). Different assigner may have different state type for checkpoint. That is why we have to add generic types for assigner state and serializer too.

We did make IcebergSource generic
{code}
public class IcebergSource<T,
    SplitAssignerStateT extends SplitAssignerState,
    SplitAssignerT extends SplitAssigner<SplitAssignerStateT>,
    SplitAssignerStateSerializerT extends SplitAssignerStateSerializer<SplitAssignerStateT>>
{code}

We simplified the construction in builder.
{code}
  public static <T> Builder<T, SimpleSplitAssignerState, SimpleSplitAssigner,
      SimpleSplitAssignerStateSerializer> useSimpleAssigner(TableLoader tableLoader) {
    SimpleSplitAssignerFactory assignerFactory = new SimpleSplitAssignerFactory();
    return new Builder<>(tableLoader, assignerFactory);
  }
{code}

The end result is still simple for users if they don't need to keep a reference to the IcebergSource object.
{code}
    final DataStream<RowData> stream = env.fromSource(
        IcebergSource.<RowData>useSimpleAssigner(tableLoader())
            .iteratorFactory(new RowDataIteratorFactory())
            .config(config)
            .scanContext(scanContext)
        .build(),
    ...
{code}

> Make FileSource extensible
> --------------------------
>
>                 Key: FLINK-19799
>                 URL: https://issues.apache.org/jira/browse/FLINK-19799
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / FileSystem
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Major
>             Fix For: 1.12.0
>
>
> The File System Source currently assumes all formats can represent their work units as {{FileSourceSplit}}. If that is not the case, the formats cannot be implemented using the {{FileSource}}.
> We need to support extending the splits to carry additional information in the splits, and to use that information when creating bulk readers and handling split state.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)