You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 吕宴全 <13...@qq.com.INVALID> on 2022/04/24 06:44:46 UTC

FlinkSQL 对接k8s的提交问题

我准备使用Kyuubi对接FlinkSQL,将任务运行在k8s环境中,任务可能包括离线作业(Application mode)和即席查询(Session mode)。在Application模式下,从jar中构建jobgraph。SQL示例如下:


CREATE TABLE T (
id INT
&nbsp;) WITH (
'connector.type' = 'filesystem',
'connector.path' = 'file:///tmp/tmp.csv',
'format.type' = 'csv',
'format.derive-schema' = 'true'
);



insert into T values(1);




insert这条语句我希望在Application模式下执行,但是我不能只把当前的这条SQL作为参数执行, 因为这可能会缺少上下文信息(catalog)。因此我需要记录之前的一些SQL,并在每个任务都将之前的SQL一起作为参数,这会比较麻烦。 另一方面,在table的Executor里其实可以构建好jobgraph,这样将jobgraph作为参数提交就不需要传递上下文SQL了,但是k8s中已经没有per job这种模式的实现了。


我对这种场景下提交离线任务感到困惑,希望大家能给我一些建议,或者说我的理解是不是存在问题的?

Re: FlinkSQL 对接k8s的提交问题

Posted by Yang Wang <da...@gmail.com>.
目前Application模式确实不能支持已经生成好的JobGraph运行,我能想到一个work around的办法是就先写一个user
jar直接把JobGraph提交到local的集群里面

就像下面这样

public class JobGraphRunner {

    private static final Logger LOG =
LoggerFactory.getLogger(JobGraphRunner.class);

    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);

        final String restServerAddress = "http://localhost:8081";
        LOG.info("Creating RestClusterClient({})", restServerAddress);

        Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
        try (ClusterClient<String> clusterClient =
                new RestClusterClient<>(
                        flinkConfig,
                        flinkConfig.toMap().get("kubernetes.cluster-id"),
                        (c, e) -> new
StandaloneClientHAServices(restServerAddress))) {
            final String jobGraphPath = params.get("jobgraph");
            Preconditions.checkNotNull(jobGraphPath, "--jobgraph
should be configured.");

            LOG.info("Loading jobgraph from {}", jobGraphPath);
            FileInputStream fileInputStream = new FileInputStream(jobGraphPath);
            ObjectInputStream objectInputStream = new
ObjectInputStream(fileInputStream);
            JobGraph jobGraph = (JobGraph) objectInputStream.readObject();
            objectInputStream.close();

            final JobID jobID = clusterClient.submitJob(jobGraph).get();
            LOG.info("Job {} is submitted successfully", jobID);
        }
    }
}


Best,
Yang

吕宴全 <13...@qq.com.invalid> 于2022年4月24日周日 14:45写道:

> 我准备使用Kyuubi对接FlinkSQL,将任务运行在k8s环境中,任务可能包括离线作业(Application
> mode)和即席查询(Session mode)。在Application模式下,从jar中构建jobgraph。SQL示例如下:
>
>
> CREATE TABLE T (
> id INT
> &nbsp;) WITH (
> 'connector.type' = 'filesystem',
> 'connector.path' = 'file:///tmp/tmp.csv',
> 'format.type' = 'csv',
> 'format.derive-schema' = 'true'
> );
>
>
>
> insert into T values(1);
>
>
>
>
> insert这条语句我希望在Application模式下执行,但是我不能只把当前的这条SQL作为参数执行,
> 因为这可能会缺少上下文信息(catalog)。因此我需要记录之前的一些SQL,并在每个任务都将之前的SQL一起作为参数,这会比较麻烦。
> 另一方面,在table的Executor里其实可以构建好jobgraph,这样将jobgraph作为参数提交就不需要传递上下文SQL了,但是k8s中已经没有per
> job这种模式的实现了。
>
>
> 我对这种场景下提交离线任务感到困惑,希望大家能给我一些建议,或者说我的理解是不是存在问题的?

回复: FlinkSQL 对接k8s的提交问题

Posted by 文末丶 <80...@qq.com.INVALID>.
您好,可以试试 dlink 提交和托管 FlinkSQL。
https://github.com/DataLinkDC/dlink


------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <wang4luning@gmail.com&gt;;
发送时间:&nbsp;2022年4月25日(星期一) 下午4:27
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: FlinkSQL 对接k8s的提交问题



SQL Client的Application模式现在还不支持,方案在设计中。
https://issues.apache.org/jira/browse/FLINK-26541

吕宴全 <1365976815@qq.com.invalid&gt; 于2022年4月24日周日 14:45写道:

&gt; 我准备使用Kyuubi对接FlinkSQL,将任务运行在k8s环境中,任务可能包括离线作业(Application
&gt; mode)和即席查询(Session mode)。在Application模式下,从jar中构建jobgraph。SQL示例如下:
&gt;
&gt;
&gt; CREATE TABLE T (
&gt; id INT
&gt; &amp;nbsp;) WITH (
&gt; 'connector.type' = 'filesystem',
&gt; 'connector.path' = 'file:///tmp/tmp.csv',
&gt; 'format.type' = 'csv',
&gt; 'format.derive-schema' = 'true'
&gt; );
&gt;
&gt;
&gt;
&gt; insert into T values(1);
&gt;
&gt;
&gt;
&gt;
&gt; insert这条语句我希望在Application模式下执行,但是我不能只把当前的这条SQL作为参数执行,
&gt; 因为这可能会缺少上下文信息(catalog)。因此我需要记录之前的一些SQL,并在每个任务都将之前的SQL一起作为参数,这会比较麻烦。
&gt; 另一方面,在table的Executor里其实可以构建好jobgraph,这样将jobgraph作为参数提交就不需要传递上下文SQL了,但是k8s中已经没有per
&gt; job这种模式的实现了。
&gt;
&gt;
&gt; 我对这种场景下提交离线任务感到困惑,希望大家能给我一些建议,或者说我的理解是不是存在问题的?

Re: FlinkSQL 对接k8s的提交问题

Posted by LuNing Wang <wa...@gmail.com>.
SQL Client的Application模式现在还不支持,方案在设计中。
https://issues.apache.org/jira/browse/FLINK-26541

吕宴全 <13...@qq.com.invalid> 于2022年4月24日周日 14:45写道:

> 我准备使用Kyuubi对接FlinkSQL,将任务运行在k8s环境中,任务可能包括离线作业(Application
> mode)和即席查询(Session mode)。在Application模式下,从jar中构建jobgraph。SQL示例如下:
>
>
> CREATE TABLE T (
> id INT
> &nbsp;) WITH (
> 'connector.type' = 'filesystem',
> 'connector.path' = 'file:///tmp/tmp.csv',
> 'format.type' = 'csv',
> 'format.derive-schema' = 'true'
> );
>
>
>
> insert into T values(1);
>
>
>
>
> insert这条语句我希望在Application模式下执行,但是我不能只把当前的这条SQL作为参数执行,
> 因为这可能会缺少上下文信息(catalog)。因此我需要记录之前的一些SQL,并在每个任务都将之前的SQL一起作为参数,这会比较麻烦。
> 另一方面,在table的Executor里其实可以构建好jobgraph,这样将jobgraph作为参数提交就不需要传递上下文SQL了,但是k8s中已经没有per
> job这种模式的实现了。
>
>
> 我对这种场景下提交离线任务感到困惑,希望大家能给我一些建议,或者说我的理解是不是存在问题的?