You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "hl9902@126.com" <hl...@126.com> on 2020/11/16 05:55:45 UTC

flink算子类在多个subtask中是各自初始化1个实例对象吗?

Hi,all:
flink算子在多个并行度的job中,每个算子(假如1个算子都有一个对应的java类)在多个subtask中会共享1个java类实例吗?还是每个subtask都会各自的实例?
我做了一个简单测试,写了一个flatmap类,在构造方法和open方法中打印类的toString方法,发现输出都不同,是否可以证明每个subtask都初始化了自己的类实例?
希望有朋友能解释下算子在job运行中初始化的过程。

测试相关代码如下:
// flink 1.10.2版本,并行度为3
@Slf4j
public class PersonFlatMap extends RichFlatMapFunction<Tuple2<String, String>, Person> {
    private transient ValueState<Integer> state;

    public PersonFlatMap(){
        log.info(String.format("PersonFlatMap【%s】: 创建实例",this.toString()));
    }

    @Override
    public void open(Configuration parameters) throws IOException {
        //略去无关代码...
        log.info(String.format("PersonFlatMap【%s】:初始化状态!", this.toString()));
    }

    @Override
    public void flatMap(Tuple2<String, String> t, Collector<Person> collector) throws Exception {
        Person p = JSONUtil.toObject(t.f1,Person.class);
        collector.collect(p);
        if(state.value() == null){state.update(0);}
        state.update(state.value() + 1);
        log.info("state: "+state.value());
    }
}

//测试日志输出
...
flink-10.2 - [2020-11-16 13:41:54.360] - INFO  [main] com.toonyoo.operator.PersonFlatMap  - PersonFlatMap【com.toonyoo.operator.PersonFlatMap@ba8d91c】: 创建实例
//此处略去无关日志...
flink-10.2 - [2020-11-16 13:42:00.326] - INFO  [Flat Map -> Sink: Print to Std. Out (1/3)] org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  - Initializing heap keyed state backend with stream factory.
flink-10.2 - [2020-11-16 13:42:00.351] - INFO  [Flat Map -> Sink: Print to Std. Out (1/3)] com.toonyoo.operator.PersonFlatMap  - PersonFlatMap【com.toonyoo.operator.PersonFlatMap@5c9d895d】:初始化状态!
flink-10.2 - [2020-11-16 13:42:00.354] - INFO  [Flat Map -> Sink: Print to Std. Out (3/3)] com.toonyoo.operator.PersonFlatMap  - PersonFlatMap【com.toonyoo.operator.PersonFlatMap@5c489c40】:初始化状态!
flink-10.2 - [2020-11-16 13:42:00.356] - INFO  [Flat Map -> Sink: Print to Std. Out (2/3)] com.toonyoo.operator.PersonFlatMap  - PersonFlatMap【com.toonyoo.operator.PersonFlatMap@1cd4f5c9】:初始化状态!
...




hl9902@126.com

Re: flink算子类在多个subtask中是各自初始化1个实例对象吗?

Posted by tison <wa...@gmail.com>.
可以这么认为,大体上你可以认为每个并发有自己的环境。

技术上,算子对象是每个并发会实例化一个,而 static 变量的【共享】程度跟你设置的 slot per TM
值还有其他一些调度相关的参数有关,但是最好不要依赖这种实现层面的东西。

一种常见的误解是我创建一个 static HashMap 就神奇地拥有了全局的键值存储,这当然是不对的,只有在同一个 JVM 实例上也就是同一个 TM
上的任务才会看到同一个 HashMap 对象,而这几乎是不可控的。

可以看一下这篇文档[1]对物理部署的实际情况有一个基本的认知。

Best,
tison.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/flink-architecture.html


hl9902@126.com <hl...@126.com> 于2020年11月16日周一 下午1:55写道:

> Hi,all:
>
> flink算子在多个并行度的job中,每个算子(假如1个算子都有一个对应的java类)在多个subtask中会共享1个java类实例吗?还是每个subtask都会各自的实例?
>
> 我做了一个简单测试,写了一个flatmap类,在构造方法和open方法中打印类的toString方法,发现输出都不同,是否可以证明每个subtask都初始化了自己的类实例?
> 希望有朋友能解释下算子在job运行中初始化的过程。
>
> 测试相关代码如下:
> // flink 1.10.2版本,并行度为3
> @Slf4j
> public class PersonFlatMap extends RichFlatMapFunction<Tuple2<String,
> String>, Person> {
>     private transient ValueState<Integer> state;
>
>     public PersonFlatMap(){
>         log.info(String.format("PersonFlatMap【%s】:
> 创建实例",this.toString()));
>     }
>
>     @Override
>     public void open(Configuration parameters) throws IOException {
>         //略去无关代码...
>         log.info(String.format("PersonFlatMap【%s】:初始化状态!",
> this.toString()));
>     }
>
>     @Override
>     public void flatMap(Tuple2<String, String> t, Collector<Person>
> collector) throws Exception {
>         Person p = JSONUtil.toObject(t.f1,Person.class);
>         collector.collect(p);
>         if(state.value() == null){state.update(0);}
>         state.update(state.value() + 1);
>         log.info("state: "+state.value());
>     }
> }
>
> //测试日志输出
> ...
> flink-10.2 - [2020-11-16 13:41:54.360] - INFO  [main]
> com.toonyoo.operator.PersonFlatMap  -
> PersonFlatMap【com.toonyoo.operator.PersonFlatMap@ba8d91c】: 创建实例
> //此处略去无关日志...
> flink-10.2 - [2020-11-16 13:42:00.326] - INFO  [Flat Map -> Sink: Print to
> Std. Out (1/3)] org.apache.flink.runtime.state.heap.HeapKeyedStateBackend
> - Initializing heap keyed state backend with stream factory.
> flink-10.2 - [2020-11-16 13:42:00.351] - INFO  [Flat Map -> Sink: Print to
> Std. Out (1/3)] com.toonyoo.operator.PersonFlatMap  -
> PersonFlatMap【com.toonyoo.operator.PersonFlatMap@5c9d895d】:初始化状态!
> flink-10.2 - [2020-11-16 13:42:00.354] - INFO  [Flat Map -> Sink: Print to
> Std. Out (3/3)] com.toonyoo.operator.PersonFlatMap  -
> PersonFlatMap【com.toonyoo.operator.PersonFlatMap@5c489c40】:初始化状态!
> flink-10.2 - [2020-11-16 13:42:00.356] - INFO  [Flat Map -> Sink: Print to
> Std. Out (2/3)] com.toonyoo.operator.PersonFlatMap  -
> PersonFlatMap【com.toonyoo.operator.PersonFlatMap@1cd4f5c9】:初始化状态!
> ...
>
>
>
>
> hl9902@126.com
>