You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by Cristian Cioriia <> on 2021/02/01 19:39:38 UTC

[Flink SQL] Insert query fails for partitioned table

Hey guys,

I’m trying to create a Kafka backed partitioned table [1] and insert some data into it [2] using the sql-client, but I get the error [3] when doing it. Can you guys help with this? Also, I wanted to add the partition to the table as in [4] as per the documentation, but then the creation of the table failed with the error [5]. Can you let me know why it doesn’t work?

The versions used for our Flink pods deployed in Kubernetes are flink:1.11.1-scala_2.11, as in the Flink sql-training.

Looking forward for your input.


[1] ` CREATE TABLE KafkaTable (
  cntStart TIMESTAMP(3),
  cntEnd TIMESTAMP(3),
  cnt BIGINT,
  partitionByField STRING,
/*other fields*/
PARTITIONED BY (partitionByField)
  'connector' = 'kafka',
  'topic' =’topic-name’,
  'properties.bootstrap.servers' = kafka-server:port’,
  '' = 'GroupId',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'

[2] INSERT INTO KafkaTable SELECT HOP_START(someTs, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) AS cntStart, HOP_END(someTs, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) AS cntEnd, COUNT(DISTINCT idField) AS cnt, /*other_fields*/ FROM source_table GROUP BY HOP(someTs, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE),  /*other_fields*/

[3] [ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: 'default_catalog.default_database. KafkaTable s' is a partitioned table, but the underlying [Kafka universal table sink] DynamicTableSink doesn't implement SupportsPartitioning interface.

[4]  `CREATE TABLE KafkaTable (
  cntStart TIMESTAMP(3),
  cntEnd TIMESTAMP(3),
  cnt BIGINT,
  partitionByField STRING,
/*other fields*/,
PARTITIONED BY (partitionByField)
  'connector' = 'kafka',
  'topic' =’topic-name’,
  'properties.bootstrap.servers' = kafka-server:port’,
  '' = 'GroupId',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'

[5] [ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "METADATA" at line 12, column 22.

Cristian Cioriia
Office address
Software Development Engineer
+40751812984 (cell)
Anchor Plaza, Bulevardul Timișoara 26Z, București 061331