You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by allan <18...@163.com> on 2019/09/29 08:31:01 UTC

map不能返回null值吗

Hi,
发现了一个问题,就是在调用map函数后,返回的null值,map的返回值为tuple2,在下一步filter过滤null值是,作业报异常。

java.lang.NullPointerException
	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)
	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
	at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)


经过调试发现代码中 OperatorChain 类中的 pushToOperator 方法 ,参数 record为空的情况下会报异常。
难道map 不能返回null值吗?


@Override
   protected <X> void pushToOperator(StreamRecord<X> record) {
      try {
         // we know that the given outputTag matches our OutputTag so the record
         // must be of the type that our operator (and Serializer) expects.
         @SuppressWarnings("unchecked")
         StreamRecord<T> castRecord = (StreamRecord<T>) record;

         numRecordsIn.inc();
         StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
         operator.setKeyContextElement1(copy);
         operator.processElement(copy);
      } catch (ClassCastException e) {
         if (outputTag != null) {
            // Enrich error message
            ClassCastException replace = new ClassCastException(
               String.format(
                  "%s. Failed to push OutputTag with id '%s' to operator. " +
                     "This can occur when multiple OutputTags with different types " +
                     "but identical names are being used.",
                  e.getMessage(),
                  outputTag.getId()));

            throw new ExceptionInChainedOperatorException(replace);
         } else {
            throw new ExceptionInChainedOperatorException(e);
         }
      } catch (Exception e) {
         throw new ExceptionInChainedOperatorException(e);
      }

   }
}



Re:Re: map不能返回null值吗

Posted by a**** <18...@163.com>.


ok,我知道了。确定一下,之前没发现,跟了一下代码,所以问一下。多谢!





在 2019-09-29 16:44:53,"Qi Luo" <lu...@gmail.com> 写道:
>Hi Allan,
>
>map只能返回非null,你可以考虑使用flatMap。
>
>Qi
>
>On Sun, Sep 29, 2019 at 4:31 PM allan <18...@163.com> wrote:
>
>> Hi,
>> 发现了一个问题,就是在调用map函数后,返回的null值,map的返回值为tuple2,在下一步filter过滤null值是,作业报异常。
>>
>> java.lang.NullPointerException
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>         at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>>         at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>>         at
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>         at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>>         at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>>         at
>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
>>         at org.apache.flink.streaming.runtime.io
>> .StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>         at java.lang.Thread.run(Thread.java:748)
>>
>>
>> 经过调试发现代码中 OperatorChain 类中的 pushToOperator 方法 ,参数 record为空的情况下会报异常。
>> 难道map 不能返回null值吗?
>>
>>
>> @Override
>>    protected <X> void pushToOperator(StreamRecord<X> record) {
>>       try {
>>          // we know that the given outputTag matches our OutputTag so the
>> record
>>          // must be of the type that our operator (and Serializer) expects.
>>          @SuppressWarnings("unchecked")
>>          StreamRecord<T> castRecord = (StreamRecord<T>) record;
>>
>>          numRecordsIn.inc();
>>          StreamRecord<T> copy =
>> castRecord.copy(serializer.copy(castRecord.getValue()));
>>          operator.setKeyContextElement1(copy);
>>          operator.processElement(copy);
>>       } catch (ClassCastException e) {
>>          if (outputTag != null) {
>>             // Enrich error message
>>             ClassCastException replace = new ClassCastException(
>>                String.format(
>>                   "%s. Failed to push OutputTag with id '%s' to operator.
>> " +
>>                      "This can occur when multiple OutputTags with
>> different types " +
>>                      "but identical names are being used.",
>>                   e.getMessage(),
>>                   outputTag.getId()));
>>
>>             throw new ExceptionInChainedOperatorException(replace);
>>          } else {
>>             throw new ExceptionInChainedOperatorException(e);
>>          }
>>       } catch (Exception e) {
>>          throw new ExceptionInChainedOperatorException(e);
>>       }
>>
>>    }
>> }
>>
>>
>>

Re: map不能返回null值吗

Posted by Qi Luo <lu...@gmail.com>.
Hi Allan,

map只能返回非null,你可以考虑使用flatMap。

Qi

On Sun, Sep 29, 2019 at 4:31 PM allan <18...@163.com> wrote:

> Hi,
> 发现了一个问题,就是在调用map函数后,返回的null值,map的返回值为tuple2,在下一步filter过滤null值是,作业报异常。
>
> java.lang.NullPointerException
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>         at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>         at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
>         at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>         at java.lang.Thread.run(Thread.java:748)
>
>
> 经过调试发现代码中 OperatorChain 类中的 pushToOperator 方法 ,参数 record为空的情况下会报异常。
> 难道map 不能返回null值吗?
>
>
> @Override
>    protected <X> void pushToOperator(StreamRecord<X> record) {
>       try {
>          // we know that the given outputTag matches our OutputTag so the
> record
>          // must be of the type that our operator (and Serializer) expects.
>          @SuppressWarnings("unchecked")
>          StreamRecord<T> castRecord = (StreamRecord<T>) record;
>
>          numRecordsIn.inc();
>          StreamRecord<T> copy =
> castRecord.copy(serializer.copy(castRecord.getValue()));
>          operator.setKeyContextElement1(copy);
>          operator.processElement(copy);
>       } catch (ClassCastException e) {
>          if (outputTag != null) {
>             // Enrich error message
>             ClassCastException replace = new ClassCastException(
>                String.format(
>                   "%s. Failed to push OutputTag with id '%s' to operator.
> " +
>                      "This can occur when multiple OutputTags with
> different types " +
>                      "but identical names are being used.",
>                   e.getMessage(),
>                   outputTag.getId()));
>
>             throw new ExceptionInChainedOperatorException(replace);
>          } else {
>             throw new ExceptionInChainedOperatorException(e);
>          }
>       } catch (Exception e) {
>          throw new ExceptionInChainedOperatorException(e);
>       }
>
>    }
> }
>
>
>