You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Henrik Laxhuber (Jira)" <ji...@apache.org> on 2022/07/03 15:52:00 UTC

[jira] [Updated] (FLINK-28367) OffsetDateTime does not work with keyBy

     [ https://issues.apache.org/jira/browse/FLINK-28367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Henrik Laxhuber updated FLINK-28367:
------------------------------------
    Description: 
Using keyBy incorrectly (de-)serializes java.time.OffsetDateTime types - the offset gets lost and becomes null.

Here's a minimal non-working example:

 
{code:java}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment

import java.time.OffsetDateTime

object MWE {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env
      .fromElements("2022-07-03T15:35:48.142Z", "2022-07-03T15:35:48.438Z")
      .map(OffsetDateTime.parse(_))
      .keyBy((t: OffsetDateTime) => t)
      .print()

    env.execute()
  }
} {code}
 

Expected Output:
{code:java}
2022-07-03T15:35:48.438Z
2022-07-03T15:35:48.142Z{code}
Actual Output:
{code:java}
2022-07-03T15:35:48.438null
2022-07-03T15:35:48.142null{code}
The issue arises whenever keyBy and OffsetDateTime are involved; I believe it could have something to do with the way that flink serializes the state.

  was:
Using `keyBy` incorrectly (de-)serializes `java.time.OffsetDateTime` types - the offset gets lost and becomes `null`.

Here's a minimal non-working example:

 
{code:java}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment

import java.time.OffsetDateTime

object MWE {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env
      .fromElements("2022-07-03T15:35:48.142Z", "2022-07-03T15:35:48.438Z")
      .map(OffsetDateTime.parse(_))
      .keyBy((t: OffsetDateTime) => t)
      .print()

    env.execute()
  }
} {code}
 

 


> OffsetDateTime does not work with keyBy
> ---------------------------------------
>
>                 Key: FLINK-28367
>                 URL: https://issues.apache.org/jira/browse/FLINK-28367
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream, API / Type Serialization System
>    Affects Versions: 1.15.0
>         Environment: * Java 1.8 (openjdk 1.8.0_322)
>  * Scala 2.12.15
>  * Flink 1.15.0
>            Reporter: Henrik Laxhuber
>            Priority: Major
>
> Using keyBy incorrectly (de-)serializes java.time.OffsetDateTime types - the offset gets lost and becomes null.
> Here's a minimal non-working example:
>  
> {code:java}
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import java.time.OffsetDateTime
> object MWE {
>   def main(args: Array[String]): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env
>       .fromElements("2022-07-03T15:35:48.142Z", "2022-07-03T15:35:48.438Z")
>       .map(OffsetDateTime.parse(_))
>       .keyBy((t: OffsetDateTime) => t)
>       .print()
>     env.execute()
>   }
> } {code}
>  
> Expected Output:
> {code:java}
> 2022-07-03T15:35:48.438Z
> 2022-07-03T15:35:48.142Z{code}
> Actual Output:
> {code:java}
> 2022-07-03T15:35:48.438null
> 2022-07-03T15:35:48.142null{code}
> The issue arises whenever keyBy and OffsetDateTime are involved; I believe it could have something to do with the way that flink serializes the state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)