You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 奔跑的小飞袁 <s_...@126.com> on 2020/10/13 10:06:51 UTC

flink-SQL1.11版本对map类型中value的空指针异常

hello
我在使用flink-sql1.11版本是使用到了map类型,但是我遇到了问题,当map中的value为空时会产生空指针异常,下面附上我的错误以及源代码
org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
	at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_152]
	at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_152]
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_152]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_152]
	at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.11-1.11.1.jar:1.11.1]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.11-1.11.1.jar:1.11.1]
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
[flink-dist_2.11-1.11.1.jar:1.11.1]
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.11-1.11.1.jar:1.11.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
[flink-dist_2.11-1.11.1.jar:1.11.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.11.1.jar:1.11.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.11.1.jar:1.11.1]
	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dist_2.11-1.11.1.jar:1.11.1]
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.11-1.11.1.jar:1.11.1]
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.11-1.11.1.jar:1.11.1]
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.11-1.11.1.jar:1.11.1]
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.11-1.11.1.jar:1.11.1]
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.11-1.11.1.jar:1.11.1]
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.11-1.11.1.jar:1.11.1]
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.11.1.jar:1.11.1]
	at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.11-1.11.1.jar:1.11.1]
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.11.1.jar:1.11.1]
	at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.11-1.11.1.jar:1.11.1]
Caused by: java.io.IOException: Failed to deserialize Avro record.
	at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:151)
~[flink-avro-1.11.1.jar:1.11.1]
	at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
~[flink-avro-1.11.1.jar:1.11.1]
	at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
~[lexus-flink_2.11-0.1.jar:?]
	at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
~[lexus-flink_2.11-0.1.jar:?]
	at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
~[lexus-flink_2.11-0.1.jar:?]
	at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
~[lexus-flink_2.11-0.1.jar:?]
	at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
Caused by: java.lang.NullPointerException
	at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createConverter$57e941b$5(AvroRowDataDeserializationSchema.java:253)
~[flink-avro-1.11.1.jar:1.11.1]
	at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createMapConverter$7941d275$1(AvroRowDataDeserializationSchema.java:315)
~[flink-avro-1.11.1.jar:1.11.1]
	at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:222)
~[flink-avro-1.11.1.jar:1.11.1]
	at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:207)
~[flink-avro-1.11.1.jar:1.11.1]
	at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:149)
~[flink-avro-1.11.1.jar:1.11.1]
	at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
~[flink-avro-1.11.1.jar:1.11.1]
	at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
~[lexus-flink_2.11-0.1.jar:?]
	at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
~[lexus-flink_2.11-0.1.jar:?]
	at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
~[lexus-flink_2.11-0.1.jar:?]
	at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
~[lexus-flink_2.11-0.1.jar:?]
	at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
~[flink-dist_2.11-1.11.1.jar:1.11.1]

private static DeserializationRuntimeConverter
createMapConverter(LogicalType type) {
		final DeserializationRuntimeConverter keyConverter = createConverter(
			DataTypes.STRING().getLogicalType());
		final DeserializationRuntimeConverter valueConverter = createConverter(
			extractValueTypeToAvroMap(type));
		return avroObject -> {
			final Map<?, ?> map = (Map<?, ?>) avroObject;
			Map<Object, Object> result = new HashMap<>();
			for (Map.Entry<?, ?> entry : map.entrySet()) {
				Object key = keyConverter.convert(entry.getKey());
				Object value = valueConverter.convert(entry.getValue());
				result.put(key, value);
			}
			return new GenericMapData(result);
		};
	}





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-SQL1.11版本对map类型中value的空指针异常

Posted by 奔跑的小飞袁 <s_...@126.com>.
other_para MAP<VARCHAR,VARCHAR>这是我定义的map类型



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-SQL1.11版本对map类型中value的空指针异常

Posted by 奔跑的小飞袁 <s_...@126.com>.
好的,谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复: flink-SQL1.11版本对map类型中value的空指针异常

Posted by 奔跑的小飞袁 <s_...@126.com>.
好的,我尝试一下



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复: 回复: flink-SQL1.11版本对map类型中value的空指针异常

Posted by 奔跑的小飞袁 <s_...@126.com>.
我之前对源码进行了修复,测试的时候没有恢复之前的源码状态,后来发现Map<STRING,STRING NULL>这种方式是不可以的



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复: 回复: flink-SQL1.11版本对map类型中value的空指针异常

Posted by 史 正超 <sh...@outlook.com>.
Benchao Li的那个方法是对的,avro的一个bug:

private static AvroToRowDataConverter createMapConverter(LogicalType type) {
   final AvroToRowDataConverter keyConverter = createConverter(DataTypes.STRING().getLogicalType());
   final AvroToRowDataConverter valueConverter = createConverter(extractValueTypeToAvroMap(type));

   return avroObject -> {
      final Map<?, ?> map = (Map<?, ?>) avroObject;
      Map<Object, Object> result = new HashMap<>();
      for (Map.Entry<?, ?> entry : map.entrySet()) {
         Object key = keyConverter.convert(entry.getKey());
         Object value = valueConverter.convert(entry.getValue());
         result.put(key, value);
      }
      return new GenericMapData(result);
   };
}

 应该是 createNullableConverter

final AvroToRowDataConverter valueConverter = createNullableConverter(extractValueTypeToAvroMap(type));

________________________________
发件人: 史 正超 <sh...@outlook.com>
发送时间: 2020年10月14日 5:22
收件人: user-zh@flink.apache.org <us...@flink.apache.org>
主题: 回复: 回复: flink-SQL1.11版本对map类型中value的空指针异常

确定吗?我这边测试还是有问题,这应该是avro 的一个bug。
________________________________
发件人: 奔跑的小飞袁 <s_...@126.com>
发送时间: 2020年10月14日 3:29
收件人: user-zh@flink.apache.org <us...@flink.apache.org>
主题: Re: 回复: flink-SQL1.11版本对map类型中value的空指针异常

我尝试使用MAP<STRING, STRING NULL>来定义我的类型,问题已经解决,谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复: 回复: flink-SQL1.11版本对map类型中value的空指针异常

Posted by 史 正超 <sh...@outlook.com>.
确定吗?我这边测试还是有问题,这应该是avro 的一个bug。
________________________________
发件人: 奔跑的小飞袁 <s_...@126.com>
发送时间: 2020年10月14日 3:29
收件人: user-zh@flink.apache.org <us...@flink.apache.org>
主题: Re: 回复: flink-SQL1.11版本对map类型中value的空指针异常

我尝试使用MAP<STRING, STRING NULL>来定义我的类型,问题已经解决,谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复: flink-SQL1.11版本对map类型中value的空指针异常

Posted by 奔跑的小飞袁 <s_...@126.com>.
我尝试使用MAP<STRING, STRING NULL>来定义我的类型,问题已经解决,谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复: flink-SQL1.11版本对map类型中value的空指针异常

Posted by 史 正超 <sh...@outlook.com>.
所以我的建议是用avro的规范,你可以这样定义你的MAP类型:
MAP<STRING, STRING NULL>
________________________________
发件人: 史 正超 <sh...@outlook.com>
发送时间: 2020年10月14日 2:45
收件人: user-zh <us...@flink.apache.org>
主题: 回复: flink-SQL1.11版本对map类型中value的空指针异常


但是 方法上有这样的一个注释:Creates a runtime converter which assuming input object is not null.
代码这样写的前提是,不允许对象的值为null的。
________________________________
发件人: Benchao Li <li...@apache.org>
发送时间: 2020年10月14日 2:34
收件人: user-zh <us...@flink.apache.org>
主题: Re: flink-SQL1.11版本对map类型中value的空指针异常

嗯,这应该是一个实现的bug,可以提个issue修复一下~

史 正超 <sh...@outlook.com> 于2020年10月14日周三 上午10:19写道:

> 从你的异常来看,你用的format是 avro, 我看了下源码,他对varchar类型的covert和json不一样,avro的代码是这样的:
>
> case CHAR:
> case VARCHAR:
>    return avroObject -> StringData.fromString(avroObject.toString());
>
> 所以,你的map类型的value值为null,会报空指针异常的。
> ________________________________
> 发件人: 奔跑的小飞袁 <s_...@126.com>
> 发送时间: 2020年10月14日 1:46
> 收件人: user-zh@flink.apache.org <us...@flink.apache.org>
> 主题: Re: flink-SQL1.11版本对map类型中value的空指针异常
>
> other_para MAP<VARCHAR,VARCHAR>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


--

Best,
Benchao Li

Re: flink-SQL1.11版本对map类型中value的空指针异常

Posted by Benchao Li <li...@apache.org>.
是的,所以应该用createNullableConverter,而不是createConverter

史 正超 <sh...@outlook.com> 于2020年10月14日周三 上午10:45写道:

>
> 但是 方法上有这样的一个注释:Creates a runtime converter which assuming input object is
> not null.
> 代码这样写的前提是,不允许对象的值为null的。
> ________________________________
> 发件人: Benchao Li <li...@apache.org>
> 发送时间: 2020年10月14日 2:34
> 收件人: user-zh <us...@flink.apache.org>
> 主题: Re: flink-SQL1.11版本对map类型中value的空指针异常
>
> 嗯,这应该是一个实现的bug,可以提个issue修复一下~
>
> 史 正超 <sh...@outlook.com> 于2020年10月14日周三 上午10:19写道:
>
> > 从你的异常来看,你用的format是 avro, 我看了下源码,他对varchar类型的covert和json不一样,avro的代码是这样的:
> >
> > case CHAR:
> > case VARCHAR:
> >    return avroObject -> StringData.fromString(avroObject.toString());
> >
> > 所以,你的map类型的value值为null,会报空指针异常的。
> > ________________________________
> > 发件人: 奔跑的小飞袁 <s_...@126.com>
> > 发送时间: 2020年10月14日 1:46
> > 收件人: user-zh@flink.apache.org <us...@flink.apache.org>
> > 主题: Re: flink-SQL1.11版本对map类型中value的空指针异常
> >
> > other_para MAP<VARCHAR,VARCHAR>
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>
>
> --
>
> Best,
> Benchao Li
>


-- 

Best,
Benchao Li

回复: flink-SQL1.11版本对map类型中value的空指针异常

Posted by 史 正超 <sh...@outlook.com>.
但是 方法上有这样的一个注释:Creates a runtime converter which assuming input object is not null.
代码这样写的前提是,不允许对象的值为null的。
________________________________
发件人: Benchao Li <li...@apache.org>
发送时间: 2020年10月14日 2:34
收件人: user-zh <us...@flink.apache.org>
主题: Re: flink-SQL1.11版本对map类型中value的空指针异常

嗯,这应该是一个实现的bug,可以提个issue修复一下~

史 正超 <sh...@outlook.com> 于2020年10月14日周三 上午10:19写道:

> 从你的异常来看,你用的format是 avro, 我看了下源码,他对varchar类型的covert和json不一样,avro的代码是这样的:
>
> case CHAR:
> case VARCHAR:
>    return avroObject -> StringData.fromString(avroObject.toString());
>
> 所以,你的map类型的value值为null,会报空指针异常的。
> ________________________________
> 发件人: 奔跑的小飞袁 <s_...@126.com>
> 发送时间: 2020年10月14日 1:46
> 收件人: user-zh@flink.apache.org <us...@flink.apache.org>
> 主题: Re: flink-SQL1.11版本对map类型中value的空指针异常
>
> other_para MAP<VARCHAR,VARCHAR>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


--

Best,
Benchao Li

Re: flink-SQL1.11版本对map类型中value的空指针异常

Posted by Benchao Li <li...@apache.org>.
嗯,这应该是一个实现的bug,可以提个issue修复一下~

史 正超 <sh...@outlook.com> 于2020年10月14日周三 上午10:19写道:

> 从你的异常来看,你用的format是 avro, 我看了下源码,他对varchar类型的covert和json不一样,avro的代码是这样的:
>
> case CHAR:
> case VARCHAR:
>    return avroObject -> StringData.fromString(avroObject.toString());
>
> 所以,你的map类型的value值为null,会报空指针异常的。
> ________________________________
> 发件人: 奔跑的小飞袁 <s_...@126.com>
> 发送时间: 2020年10月14日 1:46
> 收件人: user-zh@flink.apache.org <us...@flink.apache.org>
> 主题: Re: flink-SQL1.11版本对map类型中value的空指针异常
>
> other_para MAP<VARCHAR,VARCHAR>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li

回复: flink-SQL1.11版本对map类型中value的空指针异常

Posted by 史 正超 <sh...@outlook.com>.
从你的异常来看,你用的format是 avro, 我看了下源码,他对varchar类型的covert和json不一样,avro的代码是这样的:

case CHAR:
case VARCHAR:
   return avroObject -> StringData.fromString(avroObject.toString());

所以,你的map类型的value值为null,会报空指针异常的。
________________________________
发件人: 奔跑的小飞袁 <s_...@126.com>
发送时间: 2020年10月14日 1:46
收件人: user-zh@flink.apache.org <us...@flink.apache.org>
主题: Re: flink-SQL1.11版本对map类型中value的空指针异常

other_para MAP<VARCHAR,VARCHAR>



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-SQL1.11版本对map类型中value的空指针异常

Posted by 奔跑的小飞袁 <s_...@126.com>.
other_para MAP<VARCHAR,VARCHAR>



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-SQL1.11版本对map类型中value的空指针异常

Posted by Benchao Li <li...@apache.org>.
Hi,

你的map是什么类型呢?我来复现一下。

奔跑的小飞袁 <s_...@126.com> 于2020年10月13日周二 下午6:07写道:

> hello
> 我在使用flink-sql1.11版本是使用到了map类型,但是我遇到了问题,当map中的value为空时会产生空指针异常,下面附上我的错误以及源代码
> org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
>         at
>
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_152]
>         at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_152]
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_152]
>         at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_152]
>         at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at akka.japi.pf
> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> Caused by: java.io.IOException: Failed to deserialize Avro record.
>         at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:151)
> ~[flink-avro-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
> ~[flink-avro-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
> ~[lexus-flink_2.11-0.1.jar:?]
>         at
>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> ~[lexus-flink_2.11-0.1.jar:?]
>         at
>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> ~[lexus-flink_2.11-0.1.jar:?]
>         at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> ~[lexus-flink_2.11-0.1.jar:?]
>         at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> Caused by: java.lang.NullPointerException
>         at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createConverter$57e941b$5(AvroRowDataDeserializationSchema.java:253)
> ~[flink-avro-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createMapConverter$7941d275$1(AvroRowDataDeserializationSchema.java:315)
> ~[flink-avro-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:222)
> ~[flink-avro-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:207)
> ~[flink-avro-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:149)
> ~[flink-avro-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
> ~[flink-avro-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
> ~[lexus-flink_2.11-0.1.jar:?]
>         at
>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> ~[lexus-flink_2.11-0.1.jar:?]
>         at
>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> ~[lexus-flink_2.11-0.1.jar:?]
>         at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> ~[lexus-flink_2.11-0.1.jar:?]
>         at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>
> private static DeserializationRuntimeConverter
> createMapConverter(LogicalType type) {
>                 final DeserializationRuntimeConverter keyConverter =
> createConverter(
>                         DataTypes.STRING().getLogicalType());
>                 final DeserializationRuntimeConverter valueConverter =
> createConverter(
>                         extractValueTypeToAvroMap(type));
>                 return avroObject -> {
>                         final Map<?, ?> map = (Map<?, ?>) avroObject;
>                         Map<Object, Object> result = new HashMap<>();
>                         for (Map.Entry<?, ?> entry : map.entrySet()) {
>                                 Object key =
> keyConverter.convert(entry.getKey());
>                                 Object value =
> valueConverter.convert(entry.getValue());
>                                 result.put(key, value);
>                         }
>                         return new GenericMapData(result);
>                 };
>         }
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li