You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Robert Metzger (JIRA)" <ji...@apache.org> on 2019/02/28 15:52:00 UTC

[jira] [Updated] (FLINK-11021) ZoneOffset objects don't appear to be serialized correctly

     [ https://issues.apache.org/jira/browse/FLINK-11021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Robert Metzger updated FLINK-11021:
-----------------------------------
    Component/s: API / Type Serialization System

> ZoneOffset objects don't appear to be serialized correctly
> ----------------------------------------------------------
>
>                 Key: FLINK-11021
>                 URL: https://issues.apache.org/jira/browse/FLINK-11021
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Type Serialization System
>    Affects Versions: 1.6.2
>         Environment: Scala 2.11.11, OpenJDK 1.8.0_192-b12
>            Reporter: Christina
>            Priority: Major
>         Attachments: TimezoneStreamProcessor.scala
>
>
> In Flink 1.6.2 ZoneOffset objects are not being serialized correctly: they are turned into `null` when serialized in a Flink job. I've attached a basic sample job that illustrates the problem along with a few sbt console commands (below) that also exhibit the problem.
>  
> {code:java}
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import java.time._
> import java.time.temporal.ChronoUnit
> val env = StreamExecutionEnvironment.createLocalEnvironment(1)
> val UTCZoneId = ZoneId.of("UTC")
> val UTCZoneOffset = ZoneOffset.UTC
> case class MyTime(
> timestamp: ZonedDateTime,
> zoneOffset: ZoneOffset
> ) extends Serializable
> val now = MyTime(ZonedDateTime.now(UTCZoneId).truncatedTo(ChronoUnit.MILLIS), UTCZoneOffset)
> val mytimeTypeInfo = org.apache.flink.api.common.typeinfo.TypeInformation.of(classOf[MyTime])
> val ser = mytimeTypeInfo.createSerializer(env.getConfig)
> val out = new org.apache.flink.core.memory.ByteArrayDataOutputView()
> val serialized = ser.serialize(now, out)
> val bytes = out.toByteArray
> val in = new org.apache.flink.core.memory.ByteArrayDataInputView(bytes)
> val deserialized = ser.deserialize(in)
> println(s"""
> before serializaton: $now
> after serialization: $deserialized
> """){code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)