You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by allan <18...@163.com> on 2019/09/29 08:31:01 UTC
map不能返回null值吗
Hi,
发现了一个问题,就是在调用map函数后,返回的null值,map的返回值为tuple2,在下一步filter过滤null值是,作业报异常。
java.lang.NullPointerException
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
经过调试发现代码中 OperatorChain 类中的 pushToOperator 方法 ,参数 record为空的情况下会报异常。
难道map 不能返回null值吗?
@Override
protected <X> void pushToOperator(StreamRecord<X> record) {
try {
// we know that the given outputTag matches our OutputTag so the record
// must be of the type that our operator (and Serializer) expects.
@SuppressWarnings("unchecked")
StreamRecord<T> castRecord = (StreamRecord<T>) record;
numRecordsIn.inc();
StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
operator.setKeyContextElement1(copy);
operator.processElement(copy);
} catch (ClassCastException e) {
if (outputTag != null) {
// Enrich error message
ClassCastException replace = new ClassCastException(
String.format(
"%s. Failed to push OutputTag with id '%s' to operator. " +
"This can occur when multiple OutputTags with different types " +
"but identical names are being used.",
e.getMessage(),
outputTag.getId()));
throw new ExceptionInChainedOperatorException(replace);
} else {
throw new ExceptionInChainedOperatorException(e);
}
} catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
}
}
}
Re:Re: map不能返回null值吗
Posted by a**** <18...@163.com>.
ok,我知道了。确定一下,之前没发现,跟了一下代码,所以问一下。多谢!
在 2019-09-29 16:44:53,"Qi Luo" <lu...@gmail.com> 写道:
>Hi Allan,
>
>map只能返回非null,你可以考虑使用flatMap。
>
>Qi
>
>On Sun, Sep 29, 2019 at 4:31 PM allan <18...@163.com> wrote:
>
>> Hi,
>> 发现了一个问题,就是在调用map函数后,返回的null值,map的返回值为tuple2,在下一步filter过滤null值是,作业报异常。
>>
>> java.lang.NullPointerException
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>> at
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>> at
>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
>> at org.apache.flink.streaming.runtime.io
>> .StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>> 经过调试发现代码中 OperatorChain 类中的 pushToOperator 方法 ,参数 record为空的情况下会报异常。
>> 难道map 不能返回null值吗?
>>
>>
>> @Override
>> protected <X> void pushToOperator(StreamRecord<X> record) {
>> try {
>> // we know that the given outputTag matches our OutputTag so the
>> record
>> // must be of the type that our operator (and Serializer) expects.
>> @SuppressWarnings("unchecked")
>> StreamRecord<T> castRecord = (StreamRecord<T>) record;
>>
>> numRecordsIn.inc();
>> StreamRecord<T> copy =
>> castRecord.copy(serializer.copy(castRecord.getValue()));
>> operator.setKeyContextElement1(copy);
>> operator.processElement(copy);
>> } catch (ClassCastException e) {
>> if (outputTag != null) {
>> // Enrich error message
>> ClassCastException replace = new ClassCastException(
>> String.format(
>> "%s. Failed to push OutputTag with id '%s' to operator.
>> " +
>> "This can occur when multiple OutputTags with
>> different types " +
>> "but identical names are being used.",
>> e.getMessage(),
>> outputTag.getId()));
>>
>> throw new ExceptionInChainedOperatorException(replace);
>> } else {
>> throw new ExceptionInChainedOperatorException(e);
>> }
>> } catch (Exception e) {
>> throw new ExceptionInChainedOperatorException(e);
>> }
>>
>> }
>> }
>>
>>
>>
Re: map不能返回null值吗
Posted by Qi Luo <lu...@gmail.com>.
Hi Allan,
map只能返回非null,你可以考虑使用flatMap。
Qi
On Sun, Sep 29, 2019 at 4:31 PM allan <18...@163.com> wrote:
> Hi,
> 发现了一个问题,就是在调用map函数后,返回的null值,map的返回值为tuple2,在下一步filter过滤null值是,作业报异常。
>
> java.lang.NullPointerException
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
>
>
> 经过调试发现代码中 OperatorChain 类中的 pushToOperator 方法 ,参数 record为空的情况下会报异常。
> 难道map 不能返回null值吗?
>
>
> @Override
> protected <X> void pushToOperator(StreamRecord<X> record) {
> try {
> // we know that the given outputTag matches our OutputTag so the
> record
> // must be of the type that our operator (and Serializer) expects.
> @SuppressWarnings("unchecked")
> StreamRecord<T> castRecord = (StreamRecord<T>) record;
>
> numRecordsIn.inc();
> StreamRecord<T> copy =
> castRecord.copy(serializer.copy(castRecord.getValue()));
> operator.setKeyContextElement1(copy);
> operator.processElement(copy);
> } catch (ClassCastException e) {
> if (outputTag != null) {
> // Enrich error message
> ClassCastException replace = new ClassCastException(
> String.format(
> "%s. Failed to push OutputTag with id '%s' to operator.
> " +
> "This can occur when multiple OutputTags with
> different types " +
> "but identical names are being used.",
> e.getMessage(),
> outputTag.getId()));
>
> throw new ExceptionInChainedOperatorException(replace);
> } else {
> throw new ExceptionInChainedOperatorException(e);
> }
> } catch (Exception e) {
> throw new ExceptionInChainedOperatorException(e);
> }
>
> }
> }
>
>
>