You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by butnet <bu...@163.com> on 2020/02/21 12:43:01 UTC

请问Flink Sink的close方法为什么会被反复执行,但open并没有被调用,他们不是成对出现吗?

Hi all:
请问Flink Sink的close方法为什么会被反复执行,但open并没有被调用,他们不是成对出现吗?
以下是Sink的实现和日志,Sink主要做数据库的异步输出,我在open和close中输出日志,
通过日志发现,open只调用了一次,后面非常多的close,请问什么原因,他们不应该是成对出现吗?
环境:JDK8, Flink: flink-1.8.2
Flink是通过标准集群方式(./start-cluster.sh)启动
感谢大家。


|

public class MySqlSink<T> extends CounterRichSinkFunction<T> {
private static final Logger log = LoggerFactory.getLogger(MySqlSink.class);
    private final MySqlUpsertor<T> upsertor;
    private transient volatile BatchThread<T> batchThread;
    private transient volatile DataBaseUtil dataBaseUtil;
    private final String name;

    public MySqlSink(String name) {
this(null, name);
}

public MySqlSink(MySqlUpsertor<T> upsertor, String name) {
this.upsertor = upsertor;
        this.name = name;
}

@Override
public String getName() {
return name;
}

@Override
public void invoke(T value, Context context) throws Exception {
try {
this.counter.inc();
batchThread.push(value);
} catch (Throwable e) {
this.counterError.inc();
log.info("异步输出异常: {}@{} {}", name, hashCode(), e.toString(), e);
}
    }

@Override
public void open(Configuration parameters) throws Exception {
log.info("异步输出 {}@{} open", name, hashCode());
        super.open(parameters);
        try {
if (dataBaseUtil == null) {
dataBaseUtil = DataBaseUtil.getInstance();
}
if (batchThread == null) {
batchThread = new BatchThread<>(getRuntimeContext(), dataBaseUtil, upsertor, name, () -> batchThread = null);
batchThread.start();
}
        } catch (Throwable e) {
log.info("创建异常输出线程异常: {}@{} {}", name, hashCode(), e.toString(), e);
}
    }

@Override
public void close() throws Exception {
log.info("异步输出 {}@{} close", name, hashCode());
        super.close();
}
}

|


日志:
2020-02-21 20:29:49 [           main] INFO [util.FlinkUtil            ] windowTime: 60000, paramWaterInterval: 40000
2020-02-21 20:29:50 [           main] WARN [job.alarm.LimitAlarmJob   ] 未配置 activityNgKafka 参数
2020-02-21 20:29:50 [           main] WARN [job.alarm.LimitAlarmJob   ] 未配置 remoteKafka 参数
2020-02-21 20:29:52 [: ng-host (3/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@97861042 open
2020-02-21 20:29:52 [k: ng-url (3/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@1582284274 open
2020-02-21 20:29:52 [: ng-host (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@1255075694 open
2020-02-21 20:29:52 [: ng-host (4/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@971447764 open
2020-02-21 20:29:52 [k: ng-url (2/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@793060686 open
2020-02-21 20:29:52 [k: ng-url (5/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@275482810 open
2020-02-21 20:29:52 [k: ng-url (4/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@543530928 open
2020-02-21 20:29:52 [: ng-host (2/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@989133776 open
2020-02-21 20:29:52 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834 open
2020-02-21 20:29:52 [: ng-host (5/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@268528771 open
2020-02-21 20:29:52 [        ng-host] INFO [common.BatchThread        ] BatchThread ng-host-1 start
2020-02-21 20:29:52 [        ng-host] INFO [common.BatchThread        ] BatchThread ng-host-4 start
2020-02-21 20:29:52 [        ng-host] INFO [common.BatchThread        ] BatchThread ng-host-2 start
2020-02-21 20:29:52 [         ng-url] INFO [common.BatchThread        ] BatchThread ng-url-3 start
2020-02-21 20:29:52 [         ng-url] INFO [common.BatchThread        ] BatchThread ng-url-5 start
2020-02-21 20:29:52 [         ng-url] INFO [common.BatchThread        ] BatchThread ng-url-6 start
2020-02-21 20:29:52 [         ng-url] INFO [common.BatchThread        ] BatchThread ng-url-7 start
2020-02-21 20:29:52 [        ng-host] INFO [common.BatchThread        ] BatchThread ng-host-8 start
2020-02-21 20:29:52 [         ng-url] INFO [common.BatchThread        ] BatchThread ng-url-9 start
2020-02-21 20:29:52 [        ng-host] INFO [common.BatchThread        ] BatchThread ng-host-10 start
2020-02-21 20:30:41 [k: ng-url (2/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@793060686 close
2020-02-21 20:30:41 [k: ng-url (4/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@543530928 close
2020-02-21 20:30:41 [k: ng-url (4/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@543530928 close
2020-02-21 20:30:41 [: ng-host (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@1255075694 close
2020-02-21 20:30:41 [: ng-host (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@1255075694 close
2020-02-21 20:30:41 [: ng-host (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@1255075694 close
2020-02-21 20:30:41 [: ng-host (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@1255075694 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [: ng-host (4/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@971447764 close
2020-02-21 20:30:41 [k: ng-url (3/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@1582284274 close
2020-02-21 20:30:41 [: ng-host (4/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@971447764 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [: ng-host (5/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@268528771 close
2020-02-21 20:30:41 [: ng-host (3/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@97861042 close
2020-02-21 20:30:41 [k: ng-url (4/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@543530928 close
2020-02-21 20:30:41 [: ng-host (2/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@989133776 close
2020-02-21 20:30:41 [k: ng-url (2/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@793060686 close






--
天下事有难易乎,为之,则难者亦易矣;不为,则易者亦难矣。