You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Flink Jira Bot (Jira)" <ji...@apache.org> on 2021/10/30 10:40:00 UTC

[jira] [Updated] (FLINK-20772) RocksDBValueState with TTL occurs NullPointerException when calling update(null) method

     [ https://issues.apache.org/jira/browse/FLINK-20772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Flink Jira Bot updated FLINK-20772:
-----------------------------------
    Labels: auto-deprioritized-major beginner stale-minor  (was: auto-deprioritized-major beginner)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Minor but is unassigned and neither itself nor its Sub-Tasks have been updated for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is still Minor, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized.


> RocksDBValueState with TTL occurs NullPointerException when calling update(null) method 
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-20772
>                 URL: https://issues.apache.org/jira/browse/FLINK-20772
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.11.2
>         Environment: Flink version: 1.11.2
> Flink Cluster: Standalone cluster with 3 Job managers and Task managers on CentOS 7
>            Reporter: Seongbae Chang
>            Priority: Minor
>              Labels: auto-deprioritized-major, beginner, stale-minor
>
> h2. Problem
>  * I use ValueState for my custom trigger and set TTL for these ValueState in RocksDB backend environment.
>  * I found an error when I used this code. I know that ValueState.update(null) works equally to ValueState.clear() in general. Unfortunately, this error occurs after using TTL
> {code:java}
> // My Code
> ctx.getPartitionedState(batchTotalSizeStateDesc).update(null);
> {code}
>  * I tested this in Flink 1.11.2, but I think it would be a problem in upper versions.
>  * Plus, I'm a beginner. So, if there is any problem in this discussion issue, please give me advice about that. And I'll fix it! 
> {code:java}
> // Error Stacktrace
> Caused by: TimerException{org.apache.flink.util.FlinkRuntimeException: Error while adding data to RocksDB}
> 	... 12 more
> Caused by: org.apache.flink.util.FlinkRuntimeException: Error while adding data to RocksDB
> 	at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
> 	at org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50)
> 	at <MY-CLASS>.onProcessingTime(ActionBatchTimeTrigger.java:102)
> 	at <MY-CLASS>.onProcessingTime(ActionBatchTimeTrigger.java:29)
> 	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onProcessingTime(WindowOperator.java:902)
> 	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:498)
> 	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1220)
> 	... 11 more
> Caused by: java.lang.NullPointerException
> 	at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:69)
> 	at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:32)
> 	at org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142)
> 	at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
> 	at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
> 	at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
> 	at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
> 	... 18 more
> {code}
>  
> h2. Reason
>  * It relates to RocksDBValueState with TTLValueState
>  * In RocksDBValueState(as well as other types of ValueState), *.update(null)* has to be caught in if-clauses(null checking). However, it skips the null checking and then tries to serialize the null value.
> {code:java}
> // https://github.com/apache/flink/blob/release-1.11/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java#L96-L110
> @Override
> public void update(V value) { 
>     if (value == null) { 
>         clear(); 
>         return; 
>     }
>  
>     try { 
>         backend.db.put(columnFamily, writeOptions, serializeCurrentKeyWithGroupAndNamespace(), serializeValue(value)); 
>     } catch (Exception e) { 
>         throw new FlinkRuntimeException("Error while adding data to RocksDB", e);      
>     }
> }{code}
>  *  It is because that TtlValueState wraps the value(null) with the LastAccessTime and makes the new TtlValue Object with the null value.
> {code:java}
> // https://github.com/apache/flink/blob/release-1.11/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java#L47-L51
> @Override
> public void update(T value) throws IOException { 
>     accessCallback.run(); 
>     original.update(wrapWithTs(value));
> }
> {code}
> {code:java}
> // https://github.com/apache/flink/blob/release-1.11/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java#L46-L48
> static <V> TtlValue<V> wrapWithTs(V value, long ts) { 
>     return new TtlValue<>(value, ts);
> }{code}
> {code:java}
> // https://github.com/apache/flink/blob/release-1.11/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
> public class TtlValue<T> implements Serializable {
>     private static final long serialVersionUID = 5221129704201125020L;
>     @Nullable
>     private final T userValue;
>     private final long lastAccessTimestamp;
>     public TtlValue(@Nullable T userValue, long lastAccessTimestamp) {
>         this.userValue = userValue;
>         this.lastAccessTimestamp = lastAccessTimestamp;
>     }
>     @Nullable
>     public T getUserValue() {
>         return userValue;
>     }
>     public long getLastAccessTimestamp() {
>         return lastAccessTimestamp;
>     }
> }
> {code}
>  * In conclusion, I think that null checking logic has to be changed for checking whether userValue variable in TtlValue is null or not
>  
> I hope that it would be helpful to improve Flink and if I have a chance, I want to fix it!
> Thank you and have a happy Christmas all!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)