You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Cristian Cioriia <ci...@adobe.com> on 2021/02/01 19:39:38 UTC
[Flink SQL] Insert query fails for partitioned table
Hey guys,
I’m trying to create a Kafka backed partitioned table [1] and insert some data into it [2] using the sql-client, but I get the error [3] when doing it. Can you guys help with this? Also, I wanted to add the partition to the table as in [4] as per the documentation, but then the creation of the table failed with the error [5]. Can you let me know why it doesn’t work?
The versions used for our Flink pods deployed in Kubernetes are flink:1.11.1-scala_2.11, as in the Flink sql-training.
Looking forward for your input.
Thanks,
Cristi
[1] ` CREATE TABLE KafkaTable (
cntStart TIMESTAMP(3),
cntEnd TIMESTAMP(3),
cnt BIGINT,
partitionByField STRING,
/*other fields*/
)
PARTITIONED BY (partitionByField)
WITH (
'connector' = 'kafka',
'topic' =’topic-name’,
'properties.bootstrap.servers' = kafka-server:port’,
'properties.group.id' = 'GroupId',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
[2] INSERT INTO KafkaTable SELECT HOP_START(someTs, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) AS cntStart, HOP_END(someTs, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) AS cntEnd, COUNT(DISTINCT idField) AS cnt, /*other_fields*/ FROM source_table GROUP BY HOP(someTs, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE), /*other_fields*/
[3] [ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: 'default_catalog.default_database. KafkaTable s' is a partitioned table, but the underlying [Kafka universal table sink] DynamicTableSink doesn't implement SupportsPartitioning interface.
[4] `CREATE TABLE KafkaTable (
cntStart TIMESTAMP(3),
cntEnd TIMESTAMP(3),
cnt BIGINT,
partitionByField STRING,
/*other fields*/,
`partition` BIGINT METADATA VIRTUAL
)
PARTITIONED BY (partitionByField)
WITH (
'connector' = 'kafka',
'topic' =’topic-name’,
'properties.bootstrap.servers' = kafka-server:port’,
'properties.group.id' = 'GroupId',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
[5] [ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "METADATA" at line 12, column 22.
[http://www.adobe.com/content/dam/acom/en/special/images/Adobe_Red_Tag_Logo_for_email_sig_RGB.png]<http://adobe.com/>
Cristian Cioriia
+40751812984(tel)
Office address
Software Development Engineer
+40751812984 (cell)
Anchor Plaza, Bulevardul Timișoara 26Z, București 061331
Adobe
cioriia@adobe.com
www.adobe.com