You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "wchxii@163.com" <wc...@163.com> on 2022/04/18 07:00:27 UTC

回复: flink发布集群端运行,如何在sink或source中使用springboot的bean

如果需要在flink使用spring的话, 需要在open方法加载applicationContext对象. 
你这里需要在sink的open方法初始化spring上下文对象. 

  override def open(conf: Configuration): Unit = {
    super.open(conf)
    if (Option(SpringContextHolder.getApplicationContext).isEmpty) {
      SpringContextHolder.startupApplicationContext(SelectStockJob.getClass, profiles)
    }
    repository = SpringContextHolder.getBean(classOf[xxxRepository])
  }



wchxii@163.com
 
发件人: 676360004@qq.com.INVALID
发送时间: 2022-04-18 14:28
收件人: user-zh@flink.apache.org
主题: flink发布集群端运行,如何在sink或source中使用springboot的bean
您好:
    首先很感谢您能在百忙之中看到我的邮件。在使用flink框架过程中我遇到了一些问题,希望能得到您的解答。
    我通过网上已有的资料进行学习,在本地环境将springboot框架与flink进行结合,并可以成功运行。但是当我将项目通过maven打包成jar包后,发布到flink集群端时,在自定义sink和source类中无法获取到springboot的ApplicationContext,所以我想问下针对此情况是否有解决方案。
    下面是我代码的具体实现思路:
    1.通过实现Springboot的CommandLineRunner的run方法来起到等同于main方法的作用
@Component
@Slf4j
public class InitRunner implements CommandLineRunner {
 
    @Autowired
    private Constant constant;
 
    @Override
    public void run(String... args) throws Exception {
        //初始化启动
        log.error("初始化启动");
        start();
    }
 
    private void start() throws Exception {
        String actives = constant.getActive();
        if (StrUtil.isNotBlank(actives)){
            String[] active = actives.split(",");
            for (String act : active) {
                Class cls = Class.forName(constant.getProperty("streaming.tasks." + act + ".package"));
                AbstractStreamingTask streamingTask = (AbstractStreamingTask) cls
                        .getConstructor(new Class[]{String.class,Constant.class})
                        .newInstance(new Object[]{act, constant});
                System.err.println("1---");
                //此方法为flink调用
                streamingTask.streaming();
            }
        }
    }
}
    2.自定义类实现ApplicationContextAware接口,获得ApplicationContext的值,并用static修饰,准备在自定义sink和source类中通过该值获得相关的bean
 
@Component
@Slf4j
public class SpringApplicationContext implements ApplicationContextAware, Serializable {
 
    private static ApplicationContext applicationContext;
 
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        log.error("context先执行");
        if (this.applicationContext == null) {
            synchronized (SpringApplicationContext.class){
                if (this.applicationContext == null){
                    this.applicationContext = applicationContext;
                }
            }
        }
    }
 
 
    public static Object getBean(String name) {
        log.error("获取bean:"+name);
        return applicationContext.getBean(name);
    }
}
    3.在自定义sink类中,通过ApplicationContext获取我需要的bean
 
@Slf4j
public class MysqlSink extends RichSinkFunction<Test> implements BaseSink<Test>, Serializable {
 
    private BaseService baseService;
 
    private Constant constant;
 
    private String active;
 
    public MysqlSink(String active) {
        this.active = active;
    }
 
    @Override
    public void open(Configuration parameters) throws Exception {
        log.info("------open mysqlSink");
        super.open(parameters);
        init();
    }
 
    @Override
    public void invoke(Test value, Context context) throws Exception {
        log.info("------invoke mysqlSink");
        if (value != null){
            baseService.operate(value);
        }
    }
 
    @Override
    public SinkFunction<Test>[] getSinkFunction() {
        return new SinkFunction[]{this};
    }
 
    private void init(){
        if (constant == null){
            constant = (Constant) SpringApplicationContext.getBean("constant");
        }
        if (baseService == null){
            baseService = (BaseService) SpringApplicationContext.getBean(constant.getProperty("streaming.tasks."+active+".sink.mysql.service"));
        }
    }
}
    经过我的测试,在发布到flink集群端并启动jar包时,applicationContext是可以获得正常的值得,但是在sink类中,值变为null。针对此种情况希望能从您那里获得相应解决方案,十分感谢。
 
 
676360004@qq.com