You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Henrik Laxhuber (Jira)" <ji...@apache.org> on 2022/07/03 15:52:00 UTC
[jira] [Updated] (FLINK-28367) OffsetDateTime does not work with keyBy
[ https://issues.apache.org/jira/browse/FLINK-28367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Henrik Laxhuber updated FLINK-28367:
------------------------------------
Description:
Using keyBy incorrectly (de-)serializes java.time.OffsetDateTime types - the offset gets lost and becomes null.
Here's a minimal non-working example:
{code:java}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import java.time.OffsetDateTime
object MWE {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.fromElements("2022-07-03T15:35:48.142Z", "2022-07-03T15:35:48.438Z")
.map(OffsetDateTime.parse(_))
.keyBy((t: OffsetDateTime) => t)
.print()
env.execute()
}
} {code}
Expected Output:
{code:java}
2022-07-03T15:35:48.438Z
2022-07-03T15:35:48.142Z{code}
Actual Output:
{code:java}
2022-07-03T15:35:48.438null
2022-07-03T15:35:48.142null{code}
The issue arises whenever keyBy and OffsetDateTime are involved; I believe it could have something to do with the way that flink serializes the state.
was:
Using `keyBy` incorrectly (de-)serializes `java.time.OffsetDateTime` types - the offset gets lost and becomes `null`.
Here's a minimal non-working example:
{code:java}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import java.time.OffsetDateTime
object MWE {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.fromElements("2022-07-03T15:35:48.142Z", "2022-07-03T15:35:48.438Z")
.map(OffsetDateTime.parse(_))
.keyBy((t: OffsetDateTime) => t)
.print()
env.execute()
}
} {code}
> OffsetDateTime does not work with keyBy
> ---------------------------------------
>
> Key: FLINK-28367
> URL: https://issues.apache.org/jira/browse/FLINK-28367
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream, API / Type Serialization System
> Affects Versions: 1.15.0
> Environment: * Java 1.8 (openjdk 1.8.0_322)
> * Scala 2.12.15
> * Flink 1.15.0
> Reporter: Henrik Laxhuber
> Priority: Major
>
> Using keyBy incorrectly (de-)serializes java.time.OffsetDateTime types - the offset gets lost and becomes null.
> Here's a minimal non-working example:
>
> {code:java}
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import java.time.OffsetDateTime
> object MWE {
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env
> .fromElements("2022-07-03T15:35:48.142Z", "2022-07-03T15:35:48.438Z")
> .map(OffsetDateTime.parse(_))
> .keyBy((t: OffsetDateTime) => t)
> .print()
> env.execute()
> }
> } {code}
>
> Expected Output:
> {code:java}
> 2022-07-03T15:35:48.438Z
> 2022-07-03T15:35:48.142Z{code}
> Actual Output:
> {code:java}
> 2022-07-03T15:35:48.438null
> 2022-07-03T15:35:48.142null{code}
> The issue arises whenever keyBy and OffsetDateTime are involved; I believe it could have something to do with the way that flink serializes the state.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)