You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by jindy_liu <28...@qq.com> on 2020/07/14 09:38:40 UTC

flink 1.11 自定义RichFlatMapFunction中使用JdbcRowDataOutputFormat 写pgsql数据问题,RuntimeContext初始化问题,空指针或RuntimeContext未初始化,哪里用的不对!

代码,编译没问题,但运行的时候,RichFlatMapFunction在open的时候,JdbcRowDataOutputFormat.open会core,说RuntimeContext为空,如果去掉outputFormatStatus.setRuntimeContext(this.getRuntimeContext()),又会提示没有初始化?

麻烦大佬帮看看,什么问题啊,是我哪里用的不对吗?


        at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
        at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.createSimpleRowDataExecutor(JdbcRowDataOutputFormat.java:198)
        at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.lambda$new$2d156164$1(JdbcRowDataOutputFormat.java:94)
        at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:131)
        at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:113)
        at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.open(JdbcRowDataOutputFormat.java:103)
        at
com.qqmusic.quku.cdcSync.PostgresSinkMapFunction.open(PostgresSinkMapFunction.java:132)
        at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
        at
org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:43)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
        at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.lang.Thread.run(Thread.java:748)



代码=====>

public class PostgresSinkMapFunction extends RichFlatMapFunction<String,
String> {
    private static String driverClass = "org.postgresql.Driver";
    private static String dbUrl =
"jdbc:postgresql://localhost:5432/ai_audio_lyric_task";
    private static String userNmae = "postgres";
    private static String passWord = "1";

    // 表status
    private static JdbcRowDataOutputFormat outputFormatStatus;
    private static String[] fieldNames = new String[] {"id", "name"};
    private static DataType[] fieldDataTypes = new DataType[]{
            DataTypes.INT(),
            DataTypes.STRING()};

    private static RowType rowType = RowType.of(
            Arrays.stream(fieldDataTypes)
                    .map(DataType::getLogicalType)
                    .toArray(LogicalType[]::new),
            fieldNames);
    private static RowDataTypeInfo rowDataTypeInfo =
RowDataTypeInfo.of(rowType);

    @Override
    public void flatMap(String s, Collector<String> collector) throws
Exception {
            GenericRowData row = new GenericRowData(2);

             row.setRowKind(INSERT);
             row.setField(0, count);
             row.setField(1, "jindy" + Integer.toString(count));

            outputFormatStatus.writeRecord(row);

    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        JdbcOptions jdbcOptions = JdbcOptions.builder()
                .setDriverName(driverClass)
                .setDBUrl(dbUrl)
                .setTableName("status_mirror")
                .setUsername(userNmae)
                .setPassword(passWord)
                .build();

        JdbcDmlOptions dmlOptions = JdbcDmlOptions.builder()
                .withTableName(jdbcOptions.getTableName())
                .withDialect(jdbcOptions.getDialect())
                .withFieldNames(fieldNames)
                .build();

        outputFormatStatus =
JdbcRowDataOutputFormat.dynamicOutputFormatBuilder()
                .setJdbcOptions(jdbcOptions)
                .setFieldDataTypes(fieldDataTypes)
                .setJdbcDmlOptions(dmlOptions)
               
.setJdbcExecutionOptions(JdbcExecutionOptions.builder().build())
                .build();

        // set context,这里有问题!!!!!!!!!!!!!!!!!!
        outputFormatStatus.setRuntimeContext(this.getRuntimeContext());
        outputFormatStatus.open(0, 1);
    }

    public void close() throws Exception {
        super.close();
        outputFormatStatus.close();
    }
}





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 自定义RichFlatMapFunction中使用JdbcRowDataOutputFormat 写pgsql数据问题,RuntimeContext初始化问题,空指针或RuntimeContext未初始化,哪里用的不对!

Posted by jindy_liu <28...@qq.com>.
确实是这行导致的,
如果都重构了,那应该怎么用较好的?
我需要知道每一行对应的是insert, update还是delete事件。
或者问题变一下,对于这种api,一般遵守什么规则,flink的版本兼容性会更好?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 自定义RichFlatMapFunction中使用JdbcRowDataOutputFormat 写pgsql数据问题,RuntimeContext初始化问题,空指针或RuntimeContext未初始化,哪里用的不对!

Posted by Jark Wu <im...@gmail.com>.
Hi,

从异常来看,应该是少了如下这一行:

       outputFormatStatus =
JdbcRowDataOutputFormat.dynamicOutputFormatBuilder()
               .setJdbcOptions(jdbcOptions)
               .setFieldDataTypes(fieldDataTypes)
               .setJdbcDmlOptions(dmlOptions)

 .setJdbcExecutionOptions(JdbcExecutionOptions.builder().build())
               .setRowDataTypeInfo(rowDataTypeInfo) // 少了这一行
               .build();

顺便提醒下, `RowDataTypeInfo` 和 JdbcRowDataOutputFormat
都是内部类,不保证跨版本的兼容性(其实,在下个版本,这两个类都被重构了)。

Best,
Jark


On Tue, 14 Jul 2020 at 17:38, jindy_liu <28...@qq.com> wrote:

>
> 代码,编译没问题,但运行的时候,RichFlatMapFunction在open的时候,JdbcRowDataOutputFormat.open会core,说RuntimeContext为空,如果去掉outputFormatStatus.setRuntimeContext(this.getRuntimeContext()),又会提示没有初始化?
>
> 麻烦大佬帮看看,什么问题啊,是我哪里用的不对吗?
>
>
>         at
>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.NullPointerException
>         at
>
> org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.createSimpleRowDataExecutor(JdbcRowDataOutputFormat.java:198)
>         at
>
> org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.lambda$new$2d156164$1(JdbcRowDataOutputFormat.java:94)
>         at
>
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:131)
>         at
>
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:113)
>         at
>
> org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.open(JdbcRowDataOutputFormat.java:103)
>         at
>
> com.qqmusic.quku.cdcSync.PostgresSinkMapFunction.open(PostgresSinkMapFunction.java:132)
>         at
>
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>         at
>
> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:43)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>         at java.lang.Thread.run(Thread.java:748)
>
>
>
> 代码=====>
>
> public class PostgresSinkMapFunction extends RichFlatMapFunction<String,
> String> {
>     private static String driverClass = "org.postgresql.Driver";
>     private static String dbUrl =
> "jdbc:postgresql://localhost:5432/ai_audio_lyric_task";
>     private static String userNmae = "postgres";
>     private static String passWord = "1";
>
>     // 表status
>     private static JdbcRowDataOutputFormat outputFormatStatus;
>     private static String[] fieldNames = new String[] {"id", "name"};
>     private static DataType[] fieldDataTypes = new DataType[]{
>             DataTypes.INT(),
>             DataTypes.STRING()};
>
>     private static RowType rowType = RowType.of(
>             Arrays.stream(fieldDataTypes)
>                     .map(DataType::getLogicalType)
>                     .toArray(LogicalType[]::new),
>             fieldNames);
>     private static RowDataTypeInfo rowDataTypeInfo =
> RowDataTypeInfo.of(rowType);
>
>     @Override
>     public void flatMap(String s, Collector<String> collector) throws
> Exception {
>             GenericRowData row = new GenericRowData(2);
>
>              row.setRowKind(INSERT);
>              row.setField(0, count);
>              row.setField(1, "jindy" + Integer.toString(count));
>
>             outputFormatStatus.writeRecord(row);
>
>     }
>
>     public void open(Configuration parameters) throws Exception {
>         super.open(parameters);
>
>         JdbcOptions jdbcOptions = JdbcOptions.builder()
>                 .setDriverName(driverClass)
>                 .setDBUrl(dbUrl)
>                 .setTableName("status_mirror")
>                 .setUsername(userNmae)
>                 .setPassword(passWord)
>                 .build();
>
>         JdbcDmlOptions dmlOptions = JdbcDmlOptions.builder()
>                 .withTableName(jdbcOptions.getTableName())
>                 .withDialect(jdbcOptions.getDialect())
>                 .withFieldNames(fieldNames)
>                 .build();
>
>         outputFormatStatus =
> JdbcRowDataOutputFormat.dynamicOutputFormatBuilder()
>                 .setJdbcOptions(jdbcOptions)
>                 .setFieldDataTypes(fieldDataTypes)
>                 .setJdbcDmlOptions(dmlOptions)
>
> .setJdbcExecutionOptions(JdbcExecutionOptions.builder().build())
>                 .build();
>
>         // set context,这里有问题!!!!!!!!!!!!!!!!!!
>         outputFormatStatus.setRuntimeContext(this.getRuntimeContext());
>         outputFormatStatus.open(0, 1);
>     }
>
>     public void close() throws Exception {
>         super.close();
>         outputFormatStatus.close();
>     }
> }
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>