You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by jindy_liu <28...@qq.com> on 2020/07/14 09:38:40 UTC
flink 1.11 自定义RichFlatMapFunction中使用JdbcRowDataOutputFormat 写pgsql数据问题,RuntimeContext初始化问题,空指针或RuntimeContext未初始化,哪里用的不对!
代码,编译没问题,但运行的时候,RichFlatMapFunction在open的时候,JdbcRowDataOutputFormat.open会core,说RuntimeContext为空,如果去掉outputFormatStatus.setRuntimeContext(this.getRuntimeContext()),又会提示没有初始化?
麻烦大佬帮看看,什么问题啊,是我哪里用的不对吗?
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.createSimpleRowDataExecutor(JdbcRowDataOutputFormat.java:198)
at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.lambda$new$2d156164$1(JdbcRowDataOutputFormat.java:94)
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:131)
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:113)
at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.open(JdbcRowDataOutputFormat.java:103)
at
com.qqmusic.quku.cdcSync.PostgresSinkMapFunction.open(PostgresSinkMapFunction.java:132)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:43)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
代码=====>
public class PostgresSinkMapFunction extends RichFlatMapFunction<String,
String> {
private static String driverClass = "org.postgresql.Driver";
private static String dbUrl =
"jdbc:postgresql://localhost:5432/ai_audio_lyric_task";
private static String userNmae = "postgres";
private static String passWord = "1";
// 表status
private static JdbcRowDataOutputFormat outputFormatStatus;
private static String[] fieldNames = new String[] {"id", "name"};
private static DataType[] fieldDataTypes = new DataType[]{
DataTypes.INT(),
DataTypes.STRING()};
private static RowType rowType = RowType.of(
Arrays.stream(fieldDataTypes)
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new),
fieldNames);
private static RowDataTypeInfo rowDataTypeInfo =
RowDataTypeInfo.of(rowType);
@Override
public void flatMap(String s, Collector<String> collector) throws
Exception {
GenericRowData row = new GenericRowData(2);
row.setRowKind(INSERT);
row.setField(0, count);
row.setField(1, "jindy" + Integer.toString(count));
outputFormatStatus.writeRecord(row);
}
public void open(Configuration parameters) throws Exception {
super.open(parameters);
JdbcOptions jdbcOptions = JdbcOptions.builder()
.setDriverName(driverClass)
.setDBUrl(dbUrl)
.setTableName("status_mirror")
.setUsername(userNmae)
.setPassword(passWord)
.build();
JdbcDmlOptions dmlOptions = JdbcDmlOptions.builder()
.withTableName(jdbcOptions.getTableName())
.withDialect(jdbcOptions.getDialect())
.withFieldNames(fieldNames)
.build();
outputFormatStatus =
JdbcRowDataOutputFormat.dynamicOutputFormatBuilder()
.setJdbcOptions(jdbcOptions)
.setFieldDataTypes(fieldDataTypes)
.setJdbcDmlOptions(dmlOptions)
.setJdbcExecutionOptions(JdbcExecutionOptions.builder().build())
.build();
// set context,这里有问题!!!!!!!!!!!!!!!!!!
outputFormatStatus.setRuntimeContext(this.getRuntimeContext());
outputFormatStatus.open(0, 1);
}
public void close() throws Exception {
super.close();
outputFormatStatus.close();
}
}
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink 1.11 自定义RichFlatMapFunction中使用JdbcRowDataOutputFormat 写pgsql数据问题,RuntimeContext初始化问题,空指针或RuntimeContext未初始化,哪里用的不对!
Posted by jindy_liu <28...@qq.com>.
确实是这行导致的,
如果都重构了,那应该怎么用较好的?
我需要知道每一行对应的是insert, update还是delete事件。
或者问题变一下,对于这种api,一般遵守什么规则,flink的版本兼容性会更好?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink 1.11 自定义RichFlatMapFunction中使用JdbcRowDataOutputFormat 写pgsql数据问题,RuntimeContext初始化问题,空指针或RuntimeContext未初始化,哪里用的不对!
Posted by Jark Wu <im...@gmail.com>.
Hi,
从异常来看,应该是少了如下这一行:
outputFormatStatus =
JdbcRowDataOutputFormat.dynamicOutputFormatBuilder()
.setJdbcOptions(jdbcOptions)
.setFieldDataTypes(fieldDataTypes)
.setJdbcDmlOptions(dmlOptions)
.setJdbcExecutionOptions(JdbcExecutionOptions.builder().build())
.setRowDataTypeInfo(rowDataTypeInfo) // 少了这一行
.build();
顺便提醒下, `RowDataTypeInfo` 和 JdbcRowDataOutputFormat
都是内部类,不保证跨版本的兼容性(其实,在下个版本,这两个类都被重构了)。
Best,
Jark
On Tue, 14 Jul 2020 at 17:38, jindy_liu <28...@qq.com> wrote:
>
> 代码,编译没问题,但运行的时候,RichFlatMapFunction在open的时候,JdbcRowDataOutputFormat.open会core,说RuntimeContext为空,如果去掉outputFormatStatus.setRuntimeContext(this.getRuntimeContext()),又会提示没有初始化?
>
> 麻烦大佬帮看看,什么问题啊,是我哪里用的不对吗?
>
>
> at
>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.NullPointerException
> at
>
> org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.createSimpleRowDataExecutor(JdbcRowDataOutputFormat.java:198)
> at
>
> org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.lambda$new$2d156164$1(JdbcRowDataOutputFormat.java:94)
> at
>
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:131)
> at
>
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:113)
> at
>
> org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.open(JdbcRowDataOutputFormat.java:103)
> at
>
> com.qqmusic.quku.cdcSync.PostgresSinkMapFunction.open(PostgresSinkMapFunction.java:132)
> at
>
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
>
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
>
> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:43)
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
>
>
>
> 代码=====>
>
> public class PostgresSinkMapFunction extends RichFlatMapFunction<String,
> String> {
> private static String driverClass = "org.postgresql.Driver";
> private static String dbUrl =
> "jdbc:postgresql://localhost:5432/ai_audio_lyric_task";
> private static String userNmae = "postgres";
> private static String passWord = "1";
>
> // 表status
> private static JdbcRowDataOutputFormat outputFormatStatus;
> private static String[] fieldNames = new String[] {"id", "name"};
> private static DataType[] fieldDataTypes = new DataType[]{
> DataTypes.INT(),
> DataTypes.STRING()};
>
> private static RowType rowType = RowType.of(
> Arrays.stream(fieldDataTypes)
> .map(DataType::getLogicalType)
> .toArray(LogicalType[]::new),
> fieldNames);
> private static RowDataTypeInfo rowDataTypeInfo =
> RowDataTypeInfo.of(rowType);
>
> @Override
> public void flatMap(String s, Collector<String> collector) throws
> Exception {
> GenericRowData row = new GenericRowData(2);
>
> row.setRowKind(INSERT);
> row.setField(0, count);
> row.setField(1, "jindy" + Integer.toString(count));
>
> outputFormatStatus.writeRecord(row);
>
> }
>
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
>
> JdbcOptions jdbcOptions = JdbcOptions.builder()
> .setDriverName(driverClass)
> .setDBUrl(dbUrl)
> .setTableName("status_mirror")
> .setUsername(userNmae)
> .setPassword(passWord)
> .build();
>
> JdbcDmlOptions dmlOptions = JdbcDmlOptions.builder()
> .withTableName(jdbcOptions.getTableName())
> .withDialect(jdbcOptions.getDialect())
> .withFieldNames(fieldNames)
> .build();
>
> outputFormatStatus =
> JdbcRowDataOutputFormat.dynamicOutputFormatBuilder()
> .setJdbcOptions(jdbcOptions)
> .setFieldDataTypes(fieldDataTypes)
> .setJdbcDmlOptions(dmlOptions)
>
> .setJdbcExecutionOptions(JdbcExecutionOptions.builder().build())
> .build();
>
> // set context,这里有问题!!!!!!!!!!!!!!!!!!
> outputFormatStatus.setRuntimeContext(this.getRuntimeContext());
> outputFormatStatus.open(0, 1);
> }
>
> public void close() throws Exception {
> super.close();
> outputFormatStatus.close();
> }
> }
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>