You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Nico Kruber (Jira)" <ji...@apache.org> on 2021/03/02 17:48:00 UTC
[jira] [Created] (FLINK-21569) Flink SQL with CSV file input job
hangs
Nico Kruber created FLINK-21569:
-----------------------------------
Summary: Flink SQL with CSV file input job hangs
Key: FLINK-21569
URL: https://issues.apache.org/jira/browse/FLINK-21569
Project: Flink
Issue Type: Bug
Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table SQL / Runtime
Affects Versions: 1.12.1
Reporter: Nico Kruber
Attachments: airports.csv, flights-small2.csv
In extension to FLINK-21567, I actually also got the job to be stuck on cancellation by doing the following in the SQL client:
* configure SQL client defaults to run with parallelism 2
* execute the following statement
{code}
CREATE TABLE `airports` (
`IATA_CODE` CHAR(3),
`AIRPORT` STRING,
`CITY` STRING,
`STATE` CHAR(2),
`COUNTRY` CHAR(3),
`LATITUDE` DOUBLE NULL,
`LONGITUDE` DOUBLE NULL,
PRIMARY KEY (`IATA_CODE`) NOT ENFORCED
) WITH (
'connector' = 'filesystem',
'path' = 'file:///tmp/kaggle-flight-delay/airports.csv',
'format' = 'csv',
'csv.allow-comments' = 'true',
'csv.ignore-parse-errors' = 'true',
'csv.null-literal' = ''
);
CREATE TABLE `flights` (
`_YEAR` CHAR(4),
`_MONTH` CHAR(2),
`_DAY` CHAR(2),
`_DAY_OF_WEEK` TINYINT,
`AIRLINE` CHAR(2),
`FLIGHT_NUMBER` SMALLINT,
`TAIL_NUMBER` CHAR(6),
`ORIGIN_AIRPORT` CHAR(3),
`DESTINATION_AIRPORT` CHAR(3),
`_SCHEDULED_DEPARTURE` CHAR(4),
`SCHEDULED_DEPARTURE` AS TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00'),
`_DEPARTURE_TIME` CHAR(4),
`DEPARTURE_DELAY` SMALLINT,
`DEPARTURE_TIME` AS TIMESTAMPADD(MINUTE, CAST(`DEPARTURE_DELAY` AS INT), TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00')),
`TAXI_OUT` SMALLINT,
`WHEELS_OFF` CHAR(4),
`SCHEDULED_TIME` SMALLINT,
`ELAPSED_TIME` SMALLINT,
`AIR_TIME` SMALLINT,
`DISTANCE` SMALLINT,
`WHEELS_ON` CHAR(4),
`TAXI_IN` SMALLINT,
`SCHEDULED_ARRIVAL` CHAR(4),
`ARRIVAL_TIME` CHAR(4),
`ARRIVAL_DELAY` SMALLINT,
`DIVERTED` BOOLEAN,
`CANCELLED` BOOLEAN,
`CANCELLATION_REASON` CHAR(1),
`AIR_SYSTEM_DELAY` SMALLINT,
`SECURITY_DELAY` SMALLINT,
`AIRLINE_DELAY` SMALLINT,
`LATE_AIRCRAFT_DELAY` SMALLINT,
`WEATHER_DELAY` SMALLINT
) WITH (
'connector' = 'filesystem',
'path' = 'file:///tmp/kaggle-flight-delay/flights-small2.csv',
'format' = 'csv',
'csv.null-literal' = ''
);
SELECT `ORIGIN_AIRPORT`, `AIRPORT`, `STATE`, `NUM_DELAYS`
FROM (
SELECT `ORIGIN_AIRPORT`, `AIRPORT`, `STATE`, COUNT(*) AS `NUM_DELAYS`,
ROW_NUMBER() OVER (ORDER BY COUNT(*) DESC) AS rownum
FROM flights, airports
WHERE `ORIGIN_AIRPORT` = `IATA_CODE` AND `DEPARTURE_DELAY` > 0
GROUP BY `ORIGIN_AIRPORT`, `AIRPORT`, `STATE`)
WHERE rownum <= 10;
{code}
Results are shown in the CLI but after quitting the result view, the job seems stuck in CANCELLING until (at least) one of the TMs shuts itself down because a task wouldn't react to the cancelling signal. This appears in its TM logs:
{code}
2021-03-02 18:39:19,451 WARN org.apache.flink.runtime.taskmanager.Task [] - Task 'Source: TableSourceScan(table=[[default_catalog, default_database, airports, project=[IATA_CODE, AIRPORT, STATE]]], fields=[IATA_CODE, AIRPORT, STATE]) (2/2)#0' did not react to cancelling signal for 30 seconds, but is stuck in method:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:653)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
java.lang.Thread.run(Thread.java:748)
...
2021-03-02 18:39:49,447 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Task did not exit gracefully within 180 + seconds.
org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1685) [flink-dist_2.12-1.12.1.jar:1.12.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
2021-03-02 18:39:49,448 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal error occurred while executing the TaskManager. Shutting it down...
org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1685) [flink-dist_2.12-1.12.1.jar:1.12.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)