You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Henrik Laxhuber (Jira)" <ji...@apache.org> on 2022/07/03 17:13:00 UTC

[jira] [Commented] (FLINK-28367) OffsetDateTime does not work with keyBy

    [ https://issues.apache.org/jira/browse/FLINK-28367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17561912#comment-17561912 ] 

Henrik Laxhuber commented on FLINK-28367:
-----------------------------------------

A workaround is to register a custom Kryo serializer that simply uses toString and parse:
{code:java}
import java.time.OffsetDateTime

import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}

class FixedOffsetDateTimeSerializer
    extends Serializer[OffsetDateTime]
    with Serializable {
  override def write(kryo: Kryo, output: Output, obj: OffsetDateTime): Unit =
    output.writeString(obj.toString)

  override def read(
      kryo: Kryo,
      input: Input,
      `type`: Class[OffsetDateTime]
  ): OffsetDateTime =
    OffsetDateTime.parse(input.readString())
}

object MWE {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.getConfig.registerTypeWithKryoSerializer[FixedOffsetDateTimeSerializer](
      classOf[OffsetDateTime],
      new FixedOffsetDateTimeSerializer()
    )

    env
      .fromElements("2022-07-03T15:35:48.142Z", "2022-07-03T15:35:48.438Z")
      .map(OffsetDateTime.parse(_))
      .keyBy((t: OffsetDateTime) => t)
      .print()

    env.execute()
  }
}  {code}
But I feel like this should be working out of the box.

> OffsetDateTime does not work with keyBy
> ---------------------------------------
>
>                 Key: FLINK-28367
>                 URL: https://issues.apache.org/jira/browse/FLINK-28367
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream, API / Type Serialization System
>    Affects Versions: 1.15.0
>         Environment: * Java 1.8 (openjdk 1.8.0_322)
>  * Scala 2.12.15
>  * Flink 1.15.0
>            Reporter: Henrik Laxhuber
>            Priority: Major
>
> Using keyBy incorrectly (de-)serializes java.time.OffsetDateTime types - the offset gets lost and becomes null.
> Here's a minimal non-working example:
>  
> {code:java}
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import java.time.OffsetDateTime
> object MWE {
>   def main(args: Array[String]): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env
>       .fromElements("2022-07-03T15:35:48.142Z", "2022-07-03T15:35:48.438Z")
>       .map(OffsetDateTime.parse(_))
>       .keyBy((t: OffsetDateTime) => t)
>       .print()
>     env.execute()
>   }
> } {code}
>  
> Expected Output:
> {code:java}
> 2022-07-03T15:35:48.438Z
> 2022-07-03T15:35:48.142Z{code}
> Actual Output:
> {code:java}
> 2022-07-03T15:35:48.438null
> 2022-07-03T15:35:48.142null{code}
> The issue arises whenever keyBy and OffsetDateTime are involved; I believe it could have something to do with the way that flink serializes the state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)