You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by JackJia <co...@126.com> on 2021/01/12 06:16:10 UTC

Flink代码一直报 java.lang.NullPointerException错误,但变量明明初始化了,却仍然有该错误,找不到原因,请诸位指教。谢谢

请教个问题,代码一直报 java.lang.NullPointerException错误,但变量明明初始化了,却仍然有该错误,找不到原因,请诸位指教。谢谢。
错误如下:
2021-01-12 04:36:09,950 INFO  org.apache.flink.runtime.taskmanager.Task                     - Window(TumblingEventTimeWindows(60000), EventTimeTrigger, WashDataDetectionFunction) -> Map -> Map -> Sink: Unnamed (1/1) (b015c7cebf71e744f6b50136cdc32e20) switched from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve output watermark:
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:216)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:169)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at java.util.ArrayList$SubList.checkForComodification(ArrayList.java:1238)
at java.util.ArrayList$SubList.size(ArrayList.java:1048)
at java.util.AbstractList.add(AbstractList.java:108)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:90)
at com.airen.lasttime.keyedfunction.WashDataDetectionFunction.temperatureFall(WashDataDetectionFunction.java:264)
at com.airen.lasttime.keyedfunction.WashDataDetectionFunction.washDetection(WashDataDetectionFunction.java:206)
at com.airen.lasttime.keyedfunction.WashDataDetectionFunction.process(WashDataDetectionFunction.java:94)
at com.airen.lasttime.keyedfunction.WashDataDetectionFunction.process(WashDataDetectionFunction.java:33)
at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:457)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:784)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:213)
... 10 more


涉及的代码如下:
private SingleOutputStreamOperator<SockRawEvent> dataClean(DataStream<SockRawEvent> source, ParameterTool pt) {
    WindowedStream<SockRawEvent, String, TimeWindow> windowStream = source.keyBy(x -> x.mac).window(TumblingEventTimeWindows.of(Time.seconds(60)));

SingleOutputStreamOperator<SockRawEvent> afterWashDetection = windowStream.process(new WashDataDetectionFunction());

SingleOutputStreamOperator<SockRawEvent> afterIdleAndShortLiveClean = afterWashDetection.keyBy(x -> x.mac)
            .window(TumblingEventTimeWindows.of(Time.seconds(60))).process(new IdleAndShortLiveDataCleanFunction());


DataStream<SockRowV2> washedData = afterWashDetection.getSideOutput(WashDataDetectionFunction.getWashOutputTag())
            .map(x -> SockRowV2.of(x, SockRawMarkEnum.WASH));
writeToKinesis(pt, washedData);
DataStream<SockRowV2> cleanedData = afterIdleAndShortLiveClean.getSideOutput(IdleAndShortLiveDataCleanFunction.getIdleShortLiveOutputTag())
            .map(x -> SockRowV2.of(x, SockRawMarkEnum.IDLE_SHORT_LIVE));
writeToKinesis(pt, cleanedData);

    return afterIdleAndShortLiveClean;
}
WashDataDetectionFunction类的process函数如下:
@Override
public void process(String s, Context context, Iterable<SockRawEvent> elements, Collector<SockRawEvent> out) throws Exception {
    ArrayList<SockRawEvent> eventList = new ArrayList<>(1600);
    for (SockRawEvent e : elements) {
        eventList.add(e);
}
    eventList = eventList.stream().sorted(new Comparator<SockRawEvent>() {
@Override
public int compare(SockRawEvent o1, SockRawEvent o2) {
return Long.compare(o1.utctime.getMillis(), o2.utctime.getMillis());
}
    }).collect(Collectors.toCollection(ArrayList::new));
//
    //System.out.println(String.format("windows(%d, %d) key: %s, data: %d", context.window().getStart(), context.window().getEnd(), s, eventList.size()));
Map<Long, List<SockRawEvent>> gmap = eventList.stream().collect(Collectors.groupingBy(x -> x.utctime.getMillis() / 180000));
    for (Map.Entry<Long, List<SockRawEvent>> entry : gmap.entrySet())
        washDetection(context,entry.getValue(), out);

}
WashDataDetectionFunction类的temperatureFall方法如下,其中错误中提到的
at com.airen.lasttime.keyedfunction.WashDataDetectionFunction.temperatureFall(WashDataDetectionFunction.java:264)是“if ( null != las3MinuteData && null != (last3m=las3MinuteData.value())) { ”这一行。 
private boolean temperatureFall(List<WashDetectionSockValue> washDetectionSockValueList) throws IOException {
if (ObjectUtil.isEmpty(washDetectionSockValueList)) return false;

    final int temperatureFallThreshold = 3;

List<WashDetectionSockValue> last3m = null;
//时间降序
List<WashDetectionSockValue> socksSorted = washDetectionSockValueList.stream()
            .sorted(new WashDetectionComparator())
            .collect(Collectors.toList());
    double[] t = new double[socksSorted.size()];
    for (int i = 0; i < socksSorted.size(); i++) {
        t[i] = socksSorted.get(i).getAvgTemper();
}
double currVarTempe = StatisticsUtils.getStandardDeviation(t);
    double lastVarTempe = 0;
    long startTime;
    if ( null != las3MinuteData && null != (last3m=las3MinuteData.value())) {
        last3m.addAll(socksSorted);
//更新las3MinuteData
if (last3m.size() < 3 * dataNumPerMinute) {
las3MinuteData.update(last3m);
startTime = last3m.get(0).getUserTime();
} else {
            List<WashDetectionSockValue> last3mTmp = last3m.subList(last3m.size() - 3 * dataNumPerMinute, last3m.size());
las3MinuteData.update(last3mTmp);
startTime = last3mTmp.get(0).getUserTime();
}
if (currVarTempe < temperatureFallThreshold) {
double[] lltt = new double[last3m.size()];
            for (int i = 0; i < last3m.size(); i++) {
                lltt[i] = last3m.get(i).getAvgTemper();
}
            lastVarTempe = StatisticsUtils.getStandardDeviation(lltt);
}
    } else {
//las3MinuteData为空,更新las3MinuteData
if (socksSorted.size() < 3 * dataNumPerMinute) {
            last3m = socksSorted;
} else {
            last3m = socksSorted.subList(socksSorted.size() - 3 * dataNumPerMinute, socksSorted.size());
}
las3MinuteData.update(last3m);
startTime = last3m.get(0).getUserTime();
}
boolean full = lastVarTempe > temperatureFallThreshold || currVarTempe > temperatureFallThreshold;
    if (full) {
sockCacheStartTime.update(startTime);
}
return full;
}










Re: Flink代码一直报 java.lang.NullPointerException错误,但变量明明初始化了,却仍然有该错误,找不到原因,请诸位指教。谢谢

Posted by Yun Tang <my...@live.com>.
Hi,

这个错误其实是kryo初始化时候扔出来的。你自定义的类 SockRowV2,WashDetectionSockValue 等,不符合Flink关于pojo的定义,所以会回退到使用kryo进行序列化/反序列化。建议将相关类在kryo上进行注册 [1]。特别地,如果是thrift或者protobuf的类,需要单独注册[2],更好的方法其实是建议将你们的自定义类修改为满足Flink的POJO类 [3]


[1] https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#kryo
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html#rules-for-pojo-types

祝好
唐云
________________________________
From: JackJia <co...@126.com>
Sent: Tuesday, January 12, 2021 14:16
To: user-zh@flink.apache.org <us...@flink.apache.org>
Subject: Flink代码一直报 java.lang.NullPointerException错误,但变量明明初始化了,却仍然有该错误,找不到原因,请诸位指教。谢谢

请教个问题,代码一直报 java.lang.NullPointerException错误,但变量明明初始化了,却仍然有该错误,找不到原因,请诸位指教。谢谢。
错误如下:
2021-01-12 04:36:09,950 INFO  org.apache.flink.runtime.taskmanager.Task                     - Window(TumblingEventTimeWindows(60000), EventTimeTrigger, WashDataDetectionFunction) -> Map -> Map -> Sink: Unnamed (1/1) (b015c7cebf71e744f6b50136cdc32e20) switched from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve output watermark:
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:216)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:169)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at java.util.ArrayList$SubList.checkForComodification(ArrayList.java:1238)
at java.util.ArrayList$SubList.size(ArrayList.java:1048)
at java.util.AbstractList.add(AbstractList.java:108)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:90)
at com.airen.lasttime.keyedfunction.WashDataDetectionFunction.temperatureFall(WashDataDetectionFunction.java:264)
at com.airen.lasttime.keyedfunction.WashDataDetectionFunction.washDetection(WashDataDetectionFunction.java:206)
at com.airen.lasttime.keyedfunction.WashDataDetectionFunction.process(WashDataDetectionFunction.java:94)
at com.airen.lasttime.keyedfunction.WashDataDetectionFunction.process(WashDataDetectionFunction.java:33)
at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:457)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:784)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:213)
... 10 more


涉及的代码如下:
private SingleOutputStreamOperator<SockRawEvent> dataClean(DataStream<SockRawEvent> source, ParameterTool pt) {
    WindowedStream<SockRawEvent, String, TimeWindow> windowStream = source.keyBy(x -> x.mac).window(TumblingEventTimeWindows.of(Time.seconds(60)));

SingleOutputStreamOperator<SockRawEvent> afterWashDetection = windowStream.process(new WashDataDetectionFunction());

SingleOutputStreamOperator<SockRawEvent> afterIdleAndShortLiveClean = afterWashDetection.keyBy(x -> x.mac)
            .window(TumblingEventTimeWindows.of(Time.seconds(60))).process(new IdleAndShortLiveDataCleanFunction());


DataStream<SockRowV2> washedData = afterWashDetection.getSideOutput(WashDataDetectionFunction.getWashOutputTag())
            .map(x -> SockRowV2.of(x, SockRawMarkEnum.WASH));
writeToKinesis(pt, washedData);
DataStream<SockRowV2> cleanedData = afterIdleAndShortLiveClean.getSideOutput(IdleAndShortLiveDataCleanFunction.getIdleShortLiveOutputTag())
            .map(x -> SockRowV2.of(x, SockRawMarkEnum.IDLE_SHORT_LIVE));
writeToKinesis(pt, cleanedData);

    return afterIdleAndShortLiveClean;
}
WashDataDetectionFunction类的process函数如下:
@Override
public void process(String s, Context context, Iterable<SockRawEvent> elements, Collector<SockRawEvent> out) throws Exception {
    ArrayList<SockRawEvent> eventList = new ArrayList<>(1600);
    for (SockRawEvent e : elements) {
        eventList.add(e);
}
    eventList = eventList.stream().sorted(new Comparator<SockRawEvent>() {
@Override
public int compare(SockRawEvent o1, SockRawEvent o2) {
return Long.compare(o1.utctime.getMillis(), o2.utctime.getMillis());
}
    }).collect(Collectors.toCollection(ArrayList::new));
//
    //System.out.println(String.format("windows(%d, %d) key: %s, data: %d", context.window().getStart(), context.window().getEnd(), s, eventList.size()));
Map<Long, List<SockRawEvent>> gmap = eventList.stream().collect(Collectors.groupingBy(x -> x.utctime.getMillis() / 180000));
    for (Map.Entry<Long, List<SockRawEvent>> entry : gmap.entrySet())
        washDetection(context,entry.getValue(), out);

}
WashDataDetectionFunction类的temperatureFall方法如下,其中错误中提到的
at com.airen.lasttime.keyedfunction.WashDataDetectionFunction.temperatureFall(WashDataDetectionFunction.java:264)是“if ( null != las3MinuteData && null != (last3m=las3MinuteData.value())) { ”这一行。
private boolean temperatureFall(List<WashDetectionSockValue> washDetectionSockValueList) throws IOException {
if (ObjectUtil.isEmpty(washDetectionSockValueList)) return false;

    final int temperatureFallThreshold = 3;

List<WashDetectionSockValue> last3m = null;
//时间降序
List<WashDetectionSockValue> socksSorted = washDetectionSockValueList.stream()
            .sorted(new WashDetectionComparator())
            .collect(Collectors.toList());
    double[] t = new double[socksSorted.size()];
    for (int i = 0; i < socksSorted.size(); i++) {
        t[i] = socksSorted.get(i).getAvgTemper();
}
double currVarTempe = StatisticsUtils.getStandardDeviation(t);
    double lastVarTempe = 0;
    long startTime;
    if ( null != las3MinuteData && null != (last3m=las3MinuteData.value())) {
        last3m.addAll(socksSorted);
//更新las3MinuteData
if (last3m.size() < 3 * dataNumPerMinute) {
las3MinuteData.update(last3m);
startTime = last3m.get(0).getUserTime();
} else {
            List<WashDetectionSockValue> last3mTmp = last3m.subList(last3m.size() - 3 * dataNumPerMinute, last3m.size());
las3MinuteData.update(last3mTmp);
startTime = last3mTmp.get(0).getUserTime();
}
if (currVarTempe < temperatureFallThreshold) {
double[] lltt = new double[last3m.size()];
            for (int i = 0; i < last3m.size(); i++) {
                lltt[i] = last3m.get(i).getAvgTemper();
}
            lastVarTempe = StatisticsUtils.getStandardDeviation(lltt);
}
    } else {
//las3MinuteData为空,更新las3MinuteData
if (socksSorted.size() < 3 * dataNumPerMinute) {
            last3m = socksSorted;
} else {
            last3m = socksSorted.subList(socksSorted.size() - 3 * dataNumPerMinute, socksSorted.size());
}
las3MinuteData.update(last3m);
startTime = last3m.get(0).getUserTime();
}
boolean full = lastVarTempe > temperatureFallThreshold || currVarTempe > temperatureFallThreshold;
    if (full) {
sockCacheStartTime.update(startTime);
}
return full;
}