You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jaume (Jira)" <ji...@apache.org> on 2022/05/17 08:24:00 UTC
[jira] [Commented] (FLINK-25916) Using upsert-kafka with a flush buffer results in Null Pointer Exception
[ https://issues.apache.org/jira/browse/FLINK-25916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17538026#comment-17538026 ]
Jaume commented on FLINK-25916:
-------------------------------
We have been experiencing the same issue with our Flink jobs.
If we add `{color:#6a8759}sink.buffer-flush.interval{color}{color:#cc7832}` & `{color}{color:#6a8759}sink.buffer-flush.max-rows` into our SQL Flink jobs (using `upsert-kafka`), they start raising `NullpointerException` in versions 1.14.3 & 1.15. We had to rollback to Flink 1.13.2 to make them work again
{color}
> Using upsert-kafka with a flush buffer results in Null Pointer Exception
> ------------------------------------------------------------------------
>
> Key: FLINK-25916
> URL: https://issues.apache.org/jira/browse/FLINK-25916
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka, Table SQL / Runtime
> Affects Versions: 1.15.0, 1.14.3
> Environment: CentOS 7.9 x64
> Intel Xeon Gold 6140 CPU
> Reporter: Corey Shaw
> Priority: Major
>
> Flink Version: 1.14.3
> upsert-kafka version: 1.14.3
>
> I have been trying to buffer output from the upsert-kafka connector using the documented parameters {{sink.buffer-flush.max-rows}} and {{sink.buffer-flush.interval}}
> Whenever I attempt to run an INSERT query with buffering, I receive the following error (shortened for brevity):
> {code:java}
> Caused by: java.lang.NullPointerException
> at org.apache.flink.streaming.connectors.kafka.table.ReducingUpsertWriter.flush(ReducingUpsertWriter.java:145)
> at org.apache.flink.streaming.connectors.kafka.table.ReducingUpsertWriter.lambda$registerFlush$3(ReducingUpsertWriter.java:124)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684)
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.lang.Thread.run(Thread.java:829) [?:?] {code}
>
> If I remove the parameters related to flush buffering, then everything works as expected with no problems at all. For reference, here is the full setup with source, destination, and queries. Yes, I realize the INSERT could use an overhaul, but that's not the issue at hand :).
> {code:java}
> CREATE TABLE `source_topic` (
> `timeGMT` INT,
> `eventtime` AS TO_TIMESTAMP(FROM_UNIXTIME(`timeGMT`)),
> `visIdHigh` BIGINT,
> `visIdLow` BIGINT,
> `visIdStr` AS CONCAT(IF(`visIdHigh` IS NULL, '', CAST(`visIdHigh` AS STRING)), IF(`visIdLow` IS NULL, '', CAST(`visIdLow` AS STRING))),
> WATERMARK FOR eventtime AS eventtime - INTERVAL '25' SECONDS
> ) WITH (
> 'connector' = 'kafka',
> 'properties.group.id' = 'flink_metrics',
> 'properties.bootstrap.servers' = 'brokers.example.com:9093',
> 'topic' = 'source_topic',
> 'scan.startup.mode' = 'earliest-offset',
> 'value.format' = 'avro-confluent',
> 'value.avro-confluent.url' = 'http://schema.example.com',
> 'value.fields-include' = 'EXCEPT_KEY'
> );
> CREATE TABLE dest_topic (
> `messageType` VARCHAR,
> `observationID` BIGINT,
> `obsYear` BIGINT,
> `obsMonth` BIGINT,
> `obsDay` BIGINT,
> `obsHour` BIGINT,
> `obsMinute` BIGINT,
> `obsTz` VARCHAR(5),
> `value` BIGINT,
> PRIMARY KEY (observationID, messageType) NOT ENFORCED
> ) WITH (
> 'connector' = 'upsert-kafka',
> 'key.format' = 'json',
> 'properties.bootstrap.servers' = 'brokers.example.com:9092',
> 'sink.buffer-flush.max-rows' = '50000',
> 'sink.buffer-flush.interval' = '1000',
> 'topic' = 'dest_topic ',
> 'value.format' = 'json'
> );
> INSERT INTO adobenow_metrics
> SELECT `messageType`, `observationID`, obsYear, obsMonth, obsDay, obsHour, obsMinute, obsTz, SUM(`value`) AS `value` FROM (
> SELECT `messageType`, `observationID`, obsYear, obsMonth, obsDay, obsHour, obsMinute, '-0000' AS obsTz, 1 AS `value`, `visIdStr` FROM (
> SELECT
> 'visit' AS `messageType`,
> CAST(DATE_FORMAT(window_start, 'yyyyMMddHHmm') AS BIGINT) AS `observationID`,
> year(window_start) AS obsYear,
> month(window_start) AS obsMonth,
> dayofmonth(window_start) AS obsDay,
> hour(window_start) AS obsHour,
> minute(window_start) AS obsMinute,
> '-0000' AS obsTz,
> visIdStr
> FROM TABLE(TUMBLE(TABLE `adobenow_sparkweb`, DESCRIPTOR(`eventtime`), INTERVAL '60' SECONDS))
> WHERE visIdStr IS NOT NULL
> GROUP BY window_start, window_end, visIdStr
> )
> GROUP BY messageType, observationID, obsYear, obsMonth, obsDay, obsHour, obsMinute, `visIdStr`
> )
> GROUP BY messageType, observationID, obsYear, obsMonth, obsDay, obsHour, obsMinute, obsTz;{code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)