You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jingsong Lee (JIRA)" <ji...@apache.org> on 2019/04/23 09:17:00 UTC

[jira] [Updated] (FLINK-11974) Introduce StreamOperatorFactory to help table perform the whole Operator CodeGen

     [ https://issues.apache.org/jira/browse/FLINK-11974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jingsong Lee updated FLINK-11974:
---------------------------------
    Summary: Introduce StreamOperatorFactory to help table perform the whole Operator CodeGen  (was: Introduce StreamOperatorSubstitutor to help table perform the whole Operator CodeGen)

> Introduce StreamOperatorFactory to help table perform the whole Operator CodeGen
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-11974
>                 URL: https://issues.apache.org/jira/browse/FLINK-11974
>             Project: Flink
>          Issue Type: New Feature
>          Components: Runtime / Operators
>            Reporter: Jingsong Lee
>            Assignee: Jingsong Lee
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> If we need CodeGen an entire Operator, one possible solution is to introduce an OperatorWrapper, then generate a CodeGen sub-Operator in OperatorWrapper's open, and then proxy all methods to the sub-Operator.
> Doing so results in multiple virtual function calls, so we introduce a StreamOperatorSubstitutor:
> {code:java}
> /**
>  * Basic interface for stream operator substitutes. It is transferred to the streamTask by
>  * serialization, and produce an actual stream operator to the streamTask, who uses the actual
>  * stream operator to run.
>  *
>  * @param <OUT> output type of the actual stream operator
>  */
> public interface StreamOperatorSubstitutor<OUT> {
>    /**
>     * Produces the actual stream operator.
>     *
>     * @param userCodeClassLoader the user code class loader to use.
>     * @return the actual stream operator created on {@code StreamTask}.
>     */
>    StreamOperator<OUT> getActualStreamOperator(ClassLoader userCodeClassLoader);
> }
> {code}
> In StreamConfig.getStreamOperator, we need:
> {code:java}
> if (operator != null && operator instanceof StreamOperatorSubstitutor) {
>    return (T) ((StreamOperatorSubstitutor) operator).getActualStreamOperator(cl);
> } else {
>    return (T) operator;
> }
> {code}
> to get the real operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)