You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "gkgkgk (Jira)" <ji...@apache.org> on 2020/03/16 16:56:00 UTC
[jira] [Created] (FLINK-16622) Type ARRAY of table field
'id' does not match with the physical type LEGACY('ARRAY',
'ANY<[Ljava.lang.String;, rO0ABXNyAD......>') of the 'id' field of the
TableSource return type.
gkgkgk created FLINK-16622:
------------------------------
Summary: Type ARRAY<STRING> of table field 'id' does not match with the physical type LEGACY('ARRAY', 'ANY<[Ljava.lang.String;, rO0ABXNyAD......>') of the 'id' field of the TableSource return type.
Key: FLINK-16622
URL: https://issues.apache.org/jira/browse/FLINK-16622
Project: Flink
Issue Type: Bug
Components: API / Core
Affects Versions: 1.10.0
Reporter: gkgkgk
when i define a ARRAY<VARCHAR> field in table schema and define field in 'format.json-schema' like this
'format.json-schema' = '{
"type": "object",
"properties": {
"id": {
"type": "array",
"items":{"type":"string"}
}
}'
there is a error flow this:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type ARRAY<STRING> of table field 'id' does not match with the physical type LEGACY('ARRAY', 'ANY<[Ljava.lang.String;, rO0ABXNyADdvc......>') of the 'id' field of the TableSource return type.
the flow is my ddl:
--source
CREATE TABLE dwd_user_log (
id ARRAY<VARCHAR>,
ctime TIMESTAMP(3),
pageId VARCHAR,
deviceId VARCHAR,
WATERMARK FOR ctime AS ctime - INTERVAL '10' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'dev_dwd_user_log_02',
'connector.startup-mode' = 'latest-offset',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = 'node14.example.com:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'node14.example.com:9092',
'connector.properties.2.key' = 'group.id',
'connector.properties.2.value' = 'dev-group',
'update-mode' = 'append',
'format.type' = 'json',
-- 'format.derive-schema' = 'true'
'format.json-schema' = '{
"type": "object",
"properties": {
"id": {
"type": "array",
"items":{"type":"string"}
},
"ctime": {
"type": "string",
"format": "date-time"
},
"pageId": {
"type": "string"
},
"deviceId": {
"type": "string"
}
}
}'
-- 'schema.1.rowtime.timestamps.type' = 'from-field',
-- 'schema.1.rowtime.timestamps.from' = 'ctime',
-- 'schema.1.rowtime.watermarks.type' = 'periodic-bounded',
-- 'schema.1.rowtime.watermarks.delay' = '10000'
-- 'schema.1.from' = 'ctime'
);
-- sink
-- sink for pv
CREATE TABLE dws_pv (
windowStart TIMESTAMP(3),
windowEnd TIMESTAMP(3),
pageId VARCHAR,
id ARRAY<VARCHAR>,
viewCount BIGINT
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'dev_dws_pvuv_02',
'connector.startup-mode' = 'latest-offset',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = 'node14.example.com:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'node14.example.com:9092',
'connector.properties.2.key' = 'group.id',
'connector.properties.2.value' = 'dev-group',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
);
-- pv
INSERT INTO dws_pv
SELECT
TUMBLE_START(ctime, INTERVAL '20' SECOND) AS windowStart,
TUMBLE_END(ctime, INTERVAL '20' SECOND) AS windowEnd,
pageId,
id,
COUNT(deviceId) AS viewCount
FROM dwd_user_log
GROUP BY TUMBLE(ctime, INTERVAL '20' SECOND),pageId,id;
--
This message was sent by Atlassian Jira
(v8.3.4#803005)