You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 吕宴全 <13...@qq.com.INVALID> on 2022/04/24 06:44:46 UTC
FlinkSQL 对接k8s的提交问题
我准备使用Kyuubi对接FlinkSQL,将任务运行在k8s环境中,任务可能包括离线作业(Application mode)和即席查询(Session mode)。在Application模式下,从jar中构建jobgraph。SQL示例如下:
CREATE TABLE T (
id INT
) WITH (
'connector.type' = 'filesystem',
'connector.path' = 'file:///tmp/tmp.csv',
'format.type' = 'csv',
'format.derive-schema' = 'true'
);
insert into T values(1);
insert这条语句我希望在Application模式下执行,但是我不能只把当前的这条SQL作为参数执行, 因为这可能会缺少上下文信息(catalog)。因此我需要记录之前的一些SQL,并在每个任务都将之前的SQL一起作为参数,这会比较麻烦。 另一方面,在table的Executor里其实可以构建好jobgraph,这样将jobgraph作为参数提交就不需要传递上下文SQL了,但是k8s中已经没有per job这种模式的实现了。
我对这种场景下提交离线任务感到困惑,希望大家能给我一些建议,或者说我的理解是不是存在问题的?
Re: FlinkSQL 对接k8s的提交问题
Posted by Yang Wang <da...@gmail.com>.
目前Application模式确实不能支持已经生成好的JobGraph运行,我能想到一个work around的办法是就先写一个user
jar直接把JobGraph提交到local的集群里面
就像下面这样
public class JobGraphRunner {
private static final Logger LOG =
LoggerFactory.getLogger(JobGraphRunner.class);
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final String restServerAddress = "http://localhost:8081";
LOG.info("Creating RestClusterClient({})", restServerAddress);
Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
try (ClusterClient<String> clusterClient =
new RestClusterClient<>(
flinkConfig,
flinkConfig.toMap().get("kubernetes.cluster-id"),
(c, e) -> new
StandaloneClientHAServices(restServerAddress))) {
final String jobGraphPath = params.get("jobgraph");
Preconditions.checkNotNull(jobGraphPath, "--jobgraph
should be configured.");
LOG.info("Loading jobgraph from {}", jobGraphPath);
FileInputStream fileInputStream = new FileInputStream(jobGraphPath);
ObjectInputStream objectInputStream = new
ObjectInputStream(fileInputStream);
JobGraph jobGraph = (JobGraph) objectInputStream.readObject();
objectInputStream.close();
final JobID jobID = clusterClient.submitJob(jobGraph).get();
LOG.info("Job {} is submitted successfully", jobID);
}
}
}
Best,
Yang
吕宴全 <13...@qq.com.invalid> 于2022年4月24日周日 14:45写道:
> 我准备使用Kyuubi对接FlinkSQL,将任务运行在k8s环境中,任务可能包括离线作业(Application
> mode)和即席查询(Session mode)。在Application模式下,从jar中构建jobgraph。SQL示例如下:
>
>
> CREATE TABLE T (
> id INT
> ) WITH (
> 'connector.type' = 'filesystem',
> 'connector.path' = 'file:///tmp/tmp.csv',
> 'format.type' = 'csv',
> 'format.derive-schema' = 'true'
> );
>
>
>
> insert into T values(1);
>
>
>
>
> insert这条语句我希望在Application模式下执行,但是我不能只把当前的这条SQL作为参数执行,
> 因为这可能会缺少上下文信息(catalog)。因此我需要记录之前的一些SQL,并在每个任务都将之前的SQL一起作为参数,这会比较麻烦。
> 另一方面,在table的Executor里其实可以构建好jobgraph,这样将jobgraph作为参数提交就不需要传递上下文SQL了,但是k8s中已经没有per
> job这种模式的实现了。
>
>
> 我对这种场景下提交离线任务感到困惑,希望大家能给我一些建议,或者说我的理解是不是存在问题的?
回复: FlinkSQL 对接k8s的提交问题
Posted by 文末丶 <80...@qq.com.INVALID>.
您好,可以试试 dlink 提交和托管 FlinkSQL。
https://github.com/DataLinkDC/dlink
------------------ 原始邮件 ------------------
发件人: "user-zh" <wang4luning@gmail.com>;
发送时间: 2022年4月25日(星期一) 下午4:27
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: Re: FlinkSQL 对接k8s的提交问题
SQL Client的Application模式现在还不支持,方案在设计中。
https://issues.apache.org/jira/browse/FLINK-26541
吕宴全 <1365976815@qq.com.invalid> 于2022年4月24日周日 14:45写道:
> 我准备使用Kyuubi对接FlinkSQL,将任务运行在k8s环境中,任务可能包括离线作业(Application
> mode)和即席查询(Session mode)。在Application模式下,从jar中构建jobgraph。SQL示例如下:
>
>
> CREATE TABLE T (
> id INT
> &nbsp;) WITH (
> 'connector.type' = 'filesystem',
> 'connector.path' = 'file:///tmp/tmp.csv',
> 'format.type' = 'csv',
> 'format.derive-schema' = 'true'
> );
>
>
>
> insert into T values(1);
>
>
>
>
> insert这条语句我希望在Application模式下执行,但是我不能只把当前的这条SQL作为参数执行,
> 因为这可能会缺少上下文信息(catalog)。因此我需要记录之前的一些SQL,并在每个任务都将之前的SQL一起作为参数,这会比较麻烦。
> 另一方面,在table的Executor里其实可以构建好jobgraph,这样将jobgraph作为参数提交就不需要传递上下文SQL了,但是k8s中已经没有per
> job这种模式的实现了。
>
>
> 我对这种场景下提交离线任务感到困惑,希望大家能给我一些建议,或者说我的理解是不是存在问题的?
Re: FlinkSQL 对接k8s的提交问题
Posted by LuNing Wang <wa...@gmail.com>.
SQL Client的Application模式现在还不支持,方案在设计中。
https://issues.apache.org/jira/browse/FLINK-26541
吕宴全 <13...@qq.com.invalid> 于2022年4月24日周日 14:45写道:
> 我准备使用Kyuubi对接FlinkSQL,将任务运行在k8s环境中,任务可能包括离线作业(Application
> mode)和即席查询(Session mode)。在Application模式下,从jar中构建jobgraph。SQL示例如下:
>
>
> CREATE TABLE T (
> id INT
> ) WITH (
> 'connector.type' = 'filesystem',
> 'connector.path' = 'file:///tmp/tmp.csv',
> 'format.type' = 'csv',
> 'format.derive-schema' = 'true'
> );
>
>
>
> insert into T values(1);
>
>
>
>
> insert这条语句我希望在Application模式下执行,但是我不能只把当前的这条SQL作为参数执行,
> 因为这可能会缺少上下文信息(catalog)。因此我需要记录之前的一些SQL,并在每个任务都将之前的SQL一起作为参数,这会比较麻烦。
> 另一方面,在table的Executor里其实可以构建好jobgraph,这样将jobgraph作为参数提交就不需要传递上下文SQL了,但是k8s中已经没有per
> job这种模式的实现了。
>
>
> 我对这种场景下提交离线任务感到困惑,希望大家能给我一些建议,或者说我的理解是不是存在问题的?