You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by butnet <bu...@163.com> on 2020/02/21 12:43:01 UTC
请问Flink Sink的close方法为什么会被反复执行,但open并没有被调用,他们不是成对出现吗?
Hi all:
请问Flink Sink的close方法为什么会被反复执行,但open并没有被调用,他们不是成对出现吗?
以下是Sink的实现和日志,Sink主要做数据库的异步输出,我在open和close中输出日志,
通过日志发现,open只调用了一次,后面非常多的close,请问什么原因,他们不应该是成对出现吗?
环境:JDK8, Flink: flink-1.8.2
Flink是通过标准集群方式(./start-cluster.sh)启动
感谢大家。
|
public class MySqlSink<T> extends CounterRichSinkFunction<T> {
private static final Logger log = LoggerFactory.getLogger(MySqlSink.class);
private final MySqlUpsertor<T> upsertor;
private transient volatile BatchThread<T> batchThread;
private transient volatile DataBaseUtil dataBaseUtil;
private final String name;
public MySqlSink(String name) {
this(null, name);
}
public MySqlSink(MySqlUpsertor<T> upsertor, String name) {
this.upsertor = upsertor;
this.name = name;
}
@Override
public String getName() {
return name;
}
@Override
public void invoke(T value, Context context) throws Exception {
try {
this.counter.inc();
batchThread.push(value);
} catch (Throwable e) {
this.counterError.inc();
log.info("异步输出异常: {}@{} {}", name, hashCode(), e.toString(), e);
}
}
@Override
public void open(Configuration parameters) throws Exception {
log.info("异步输出 {}@{} open", name, hashCode());
super.open(parameters);
try {
if (dataBaseUtil == null) {
dataBaseUtil = DataBaseUtil.getInstance();
}
if (batchThread == null) {
batchThread = new BatchThread<>(getRuntimeContext(), dataBaseUtil, upsertor, name, () -> batchThread = null);
batchThread.start();
}
} catch (Throwable e) {
log.info("创建异常输出线程异常: {}@{} {}", name, hashCode(), e.toString(), e);
}
}
@Override
public void close() throws Exception {
log.info("异步输出 {}@{} close", name, hashCode());
super.close();
}
}
|
日志:
2020-02-21 20:29:49 [ main] INFO [util.FlinkUtil ] windowTime: 60000, paramWaterInterval: 40000
2020-02-21 20:29:50 [ main] WARN [job.alarm.LimitAlarmJob ] 未配置 activityNgKafka 参数
2020-02-21 20:29:50 [ main] WARN [job.alarm.LimitAlarmJob ] 未配置 remoteKafka 参数
2020-02-21 20:29:52 [: ng-host (3/5)] INFO [common.MySqlSink ] 异步输出 ng-host@97861042 open
2020-02-21 20:29:52 [k: ng-url (3/5)] INFO [common.MySqlSink ] 异步输出 ng-url@1582284274 open
2020-02-21 20:29:52 [: ng-host (1/5)] INFO [common.MySqlSink ] 异步输出 ng-host@1255075694 open
2020-02-21 20:29:52 [: ng-host (4/5)] INFO [common.MySqlSink ] 异步输出 ng-host@971447764 open
2020-02-21 20:29:52 [k: ng-url (2/5)] INFO [common.MySqlSink ] 异步输出 ng-url@793060686 open
2020-02-21 20:29:52 [k: ng-url (5/5)] INFO [common.MySqlSink ] 异步输出 ng-url@275482810 open
2020-02-21 20:29:52 [k: ng-url (4/5)] INFO [common.MySqlSink ] 异步输出 ng-url@543530928 open
2020-02-21 20:29:52 [: ng-host (2/5)] INFO [common.MySqlSink ] 异步输出 ng-host@989133776 open
2020-02-21 20:29:52 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 open
2020-02-21 20:29:52 [: ng-host (5/5)] INFO [common.MySqlSink ] 异步输出 ng-host@268528771 open
2020-02-21 20:29:52 [ ng-host] INFO [common.BatchThread ] BatchThread ng-host-1 start
2020-02-21 20:29:52 [ ng-host] INFO [common.BatchThread ] BatchThread ng-host-4 start
2020-02-21 20:29:52 [ ng-host] INFO [common.BatchThread ] BatchThread ng-host-2 start
2020-02-21 20:29:52 [ ng-url] INFO [common.BatchThread ] BatchThread ng-url-3 start
2020-02-21 20:29:52 [ ng-url] INFO [common.BatchThread ] BatchThread ng-url-5 start
2020-02-21 20:29:52 [ ng-url] INFO [common.BatchThread ] BatchThread ng-url-6 start
2020-02-21 20:29:52 [ ng-url] INFO [common.BatchThread ] BatchThread ng-url-7 start
2020-02-21 20:29:52 [ ng-host] INFO [common.BatchThread ] BatchThread ng-host-8 start
2020-02-21 20:29:52 [ ng-url] INFO [common.BatchThread ] BatchThread ng-url-9 start
2020-02-21 20:29:52 [ ng-host] INFO [common.BatchThread ] BatchThread ng-host-10 start
2020-02-21 20:30:41 [k: ng-url (2/5)] INFO [common.MySqlSink ] 异步输出 ng-url@793060686 close
2020-02-21 20:30:41 [k: ng-url (4/5)] INFO [common.MySqlSink ] 异步输出 ng-url@543530928 close
2020-02-21 20:30:41 [k: ng-url (4/5)] INFO [common.MySqlSink ] 异步输出 ng-url@543530928 close
2020-02-21 20:30:41 [: ng-host (1/5)] INFO [common.MySqlSink ] 异步输出 ng-host@1255075694 close
2020-02-21 20:30:41 [: ng-host (1/5)] INFO [common.MySqlSink ] 异步输出 ng-host@1255075694 close
2020-02-21 20:30:41 [: ng-host (1/5)] INFO [common.MySqlSink ] 异步输出 ng-host@1255075694 close
2020-02-21 20:30:41 [: ng-host (1/5)] INFO [common.MySqlSink ] 异步输出 ng-host@1255075694 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [: ng-host (4/5)] INFO [common.MySqlSink ] 异步输出 ng-host@971447764 close
2020-02-21 20:30:41 [k: ng-url (3/5)] INFO [common.MySqlSink ] 异步输出 ng-url@1582284274 close
2020-02-21 20:30:41 [: ng-host (4/5)] INFO [common.MySqlSink ] 异步输出 ng-host@971447764 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close
2020-02-21 20:30:41 [: ng-host (5/5)] INFO [common.MySqlSink ] 异步输出 ng-host@268528771 close
2020-02-21 20:30:41 [: ng-host (3/5)] INFO [common.MySqlSink ] 异步输出 ng-host@97861042 close
2020-02-21 20:30:41 [k: ng-url (4/5)] INFO [common.MySqlSink ] 异步输出 ng-url@543530928 close
2020-02-21 20:30:41 [: ng-host (2/5)] INFO [common.MySqlSink ] 异步输出 ng-host@989133776 close
2020-02-21 20:30:41 [k: ng-url (2/5)] INFO [common.MySqlSink ] 异步输出 ng-url@793060686 close
--
天下事有难易乎,为之,则难者亦易矣;不为,则易者亦难矣。