You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Cristian Cioriia <ci...@adobe.com> on 2021/02/01 19:39:38 UTC

[Flink SQL] Insert query fails for partitioned table

Hey guys,

I’m trying to create a Kafka backed partitioned table [1] and insert some data into it [2] using the sql-client, but I get the error [3] when doing it. Can you guys help with this? Also, I wanted to add the partition to the table as in [4] as per the documentation, but then the creation of the table failed with the error [5]. Can you let me know why it doesn’t work?

The versions used for our Flink pods deployed in Kubernetes are flink:1.11.1-scala_2.11, as in the Flink sql-training.

Looking forward for your input.

Thanks,
Cristi

[1] ` CREATE TABLE KafkaTable (
  cntStart TIMESTAMP(3),
  cntEnd TIMESTAMP(3),
  cnt BIGINT,
  partitionByField STRING,
/*other fields*/
)
PARTITIONED BY (partitionByField)
WITH (
  'connector' = 'kafka',
  'topic' =’topic-name’,
  'properties.bootstrap.servers' = kafka-server:port’,
  'properties.group.id' = 'GroupId',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

[2] INSERT INTO KafkaTable SELECT HOP_START(someTs, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) AS cntStart, HOP_END(someTs, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) AS cntEnd, COUNT(DISTINCT idField) AS cnt, /*other_fields*/ FROM source_table GROUP BY HOP(someTs, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE),  /*other_fields*/

[3] [ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: 'default_catalog.default_database. KafkaTable s' is a partitioned table, but the underlying [Kafka universal table sink] DynamicTableSink doesn't implement SupportsPartitioning interface.

[4]  `CREATE TABLE KafkaTable (
  cntStart TIMESTAMP(3),
  cntEnd TIMESTAMP(3),
  cnt BIGINT,
  partitionByField STRING,
/*other fields*/,
  `partition` BIGINT METADATA VIRTUAL
)
PARTITIONED BY (partitionByField)
WITH (
  'connector' = 'kafka',
  'topic' =’topic-name’,
  'properties.bootstrap.servers' = kafka-server:port’,
  'properties.group.id' = 'GroupId',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

[5] [ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "METADATA" at line 12, column 22.



[http://www.adobe.com/content/dam/acom/en/special/images/Adobe_Red_Tag_Logo_for_email_sig_RGB.png]<http://adobe.com/>
Cristian Cioriia
+40751812984(tel)
Office address
Software Development Engineer
+40751812984 (cell)
Anchor Plaza, Bulevardul Timișoara 26Z, București 061331
Adobe
cioriia@adobe.com
www.adobe.com