You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jim Chen <ch...@gmail.com> on 2020/07/16 02:29:12 UTC
HELP!!! Flink1.10 sql insert into HBase, error like validateSchemaAndApplyImplicitCast
Hi,
I use flink1.10.1 sql to connect hbase1.4.3. When inset into hbase,
report an error like validateSchemaAndApplyImplicitCast. Means that the
Query Schema and Sink Schema are inconsistent.
Mainly Row (EXPR$0) in Query Schema, which are all expressions. Sink
Schema is Row(device_id). I don't know how to write in sql to be consistent
with hbase's sink schema.
I try to write sql like select device_id as rowkey, ROW( device_id as
[cannot write as] ) as f1
error message as follow:
[image: image.png]
sample code like:
HBase sink ddl:
String ddlSource = "CREATE TABLE
test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping (\n" +
" rowkey STRING,\n" +
" f1 ROW< \n" +
" device_id STRING,\n" +
" pass_id STRING,\n" +
" first_date STRING,\n" +
" first_channel_id STRING,\n" +
" first_app_version STRING,\n" +
" first_server_time STRING,\n" +
" first_server_hour STRING,\n" +
" first_ip_location STRING,\n" +
" first_login_time STRING,\n" +
" sys_can_uninstall STRING,\n" +
" update_date STRING,\n" +
" server_time BIGINT,\n" +
" last_pass_id STRING,\n" +
" last_channel_id STRING,\n" +
" last_app_version STRING,\n" +
" last_date STRING,\n" +
" os STRING,\n" +
" attribution_channel_id STRING,\n" +
" attribution_first_date STRING,\n" +
" p_product STRING,\n" +
" p_project STRING,\n" +
" p_dt STRING\n" +
" >\n" +
") WITH (\n" +
" 'connector.type' = 'hbase',\n" +
" 'connector.version' = '1.4.3',\n" + //
即使绕过语法编译,换其他版本的hbase,还是有问题,如线上的版本就不行
" 'connector.table-name' =
'dw_common_mobile_device_user_mapping_new',\n" +
" 'connector.zookeeper.quorum' = '"+ zookeeperServers
+"',\n" +
" 'connector.zookeeper.znode.parent' = '/hbase143',\n" +
" 'connector.write.buffer-flush.max-size' = '2mb',\n" +
" 'connector.write.buffer-flush.max-rows' = '1000',\n" +
" 'connector.write.buffer-flush.interval' = '2s'\n" +
")";
insert into sql:
String bodyAndLocalSql = "" +
// "insert into
test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping " +
"SELECT CAST(rowkey AS STRING) AS rowkey, " +
" ROW(" +
" device_id, pass_id, first_date, first_channel_id,
first_app_version, first_server_time, first_server_hour, first_ip_location,
first_login_time, sys_can_uninstall, update_date, server_time,
last_pass_id, last_channel_id, last_app_version, last_date, os,
attribution_channel_id, attribution_first_date, p_product, p_project, p_dt
" +
") AS f1" +
" FROM " +
"(" +
" SELECT " +
" MD5(CONCAT_WS('|', kafka.uid, kafka.p_product,
kafka.p_project)) AS rowkey, " +
" kafka.uid AS device_id " +
",kafka.pass_id " +
// first_date
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd') " +
// 老用户
" ELSE hbase.first_date END AS first_date " +
// first_channel_id
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN kafka.wlb_channel_id" +
// 老用户
" ELSE hbase.first_channel_id END AS first_channel_id " +
// first_app_version
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN kafka.app_version " +
// 老用户
" ELSE hbase.first_app_version END AS first_app_version " +
// first_server_time
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd
HH:mm:ss') " +
// 老用户
" ELSE hbase.first_server_time END AS first_server_time " +
// first_server_hour
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, 'HH') " +
// 老用户
" ELSE hbase.first_server_hour END AS first_server_hour " +
// first_ip_location
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN kafka.ip_location " +
// 老用户
" ELSE hbase.first_ip_location END AS first_ip_location " +
// first_login_time
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd
HH:mm:ss') " +
// 老用户
" ELSE hbase.first_login_time END AS first_login_time " +
",kafka.sys_can_uninstall " +
// update_date
",CASE WHEN hbase.pass_id = 0 " +
" THEN CAST(FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd')
AS string) " +
" END AS update_date " + // VARCHAR(2000)
// server_time
",kafka.server_time" +
", kafka.pass_id AS last_pass_id" +
", kafka.wlb_channel_id AS last_channel_id" +
", kafka.app_version AS last_app_version" +
", CAST(FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd') AS
STRING) AS last_date" + // VARCHAR(2000)
", kafka.os" +
", hbase.attribution_channel_id" +
", hbase.attribution_first_date" +
", kafka.p_product" +
", kafka.p_project" +
", kafka.p_dt" +
" FROM test_hive_catalog.test_ods.test_ods_header AS kafka
" +
" FULL JOIN
test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping AS hbase " +
" ON kafka.uid = hbase.device_id " + // TODO
这里uid,后面要改成device_id
" WHERE kafka.is_body=1 AND kafka.is_local=1" +
")";
Re: HELP!!! Flink1.10 sql insert into HBase, error like
validateSchemaAndApplyImplicitCast
Posted by Danny Chan <yu...@gmail.com>.
I suspect there are some inconsistency in the nullability of the whole record field, can you compare the 2 schema and see the diff ? For a table, you can get the TableSchema first and print it out.
Best,
Danny Chan
在 2020年7月16日 +0800 AM10:56,Leonard Xu <xb...@gmail.com>,写道:
> Hi, Jim
>
> Could you post error message in text that contains the entire schema of query and sink? I doubt there are some fields type were mismatched.
>
> Best,
> Leonard Xu
>
> > 在 2020年7月16日,10:29,Jim Chen <ch...@gmail.com> 写道:
> >
> > Hi,
> > I use flink1.10.1 sql to connect hbase1.4.3. When inset into hbase, report an error like validateSchemaAndApplyImplicitCast. Means that the Query Schema and Sink Schema are inconsistent.
> > Mainly Row (EXPR$0) in Query Schema, which are all expressions. Sink Schema is Row(device_id). I don't know how to write in sql to be consistent with hbase's sink schema.
> > I try to write sql like select device_id as rowkey, ROW( device_id as [cannot write as] ) as f1
> >
> > error message as follow:
> > <image.png>
> >
> > sample code like:
> > HBase sink ddl:
> > String ddlSource = "CREATE TABLE test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping (\n" +
> > " rowkey STRING,\n" +
> > " f1 ROW< \n" +
> > " device_id STRING,\n" +
> > " pass_id STRING,\n" +
> > " first_date STRING,\n" +
> > " first_channel_id STRING,\n" +
> > " first_app_version STRING,\n" +
> > " first_server_time STRING,\n" +
> > " first_server_hour STRING,\n" +
> > " first_ip_location STRING,\n" +
> > " first_login_time STRING,\n" +
> > " sys_can_uninstall STRING,\n" +
> > " update_date STRING,\n" +
> > " server_time BIGINT,\n" +
> > " last_pass_id STRING,\n" +
> > " last_channel_id STRING,\n" +
> > " last_app_version STRING,\n" +
> > " last_date STRING,\n" +
> > " os STRING,\n" +
> > " attribution_channel_id STRING,\n" +
> > " attribution_first_date STRING,\n" +
> > " p_product STRING,\n" +
> > " p_project STRING,\n" +
> > " p_dt STRING\n" +
> > " >\n" +
> > ") WITH (\n" +
> > " 'connector.type' = 'hbase',\n" +
> > " 'connector.version' = '1.4.3',\n" + // 即使绕过语法编译,换其他版本的hbase,还是有问题,如线上的版本就不行
> > " 'connector.table-name' = 'dw_common_mobile_device_user_mapping_new',\n" +
> > " 'connector.zookeeper.quorum' = '"+ zookeeperServers +"',\n" +
> > " 'connector.zookeeper.znode.parent' = '/hbase143',\n" +
> > " 'connector.write.buffer-flush.max-size' = '2mb',\n" +
> > " 'connector.write.buffer-flush.max-rows' = '1000',\n" +
> > " 'connector.write.buffer-flush.interval' = '2s'\n" +
> > ")";
> >
> > insert into sql:
> >
> > String bodyAndLocalSql = "" +
> > // "insert into test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping " +
> > "SELECT CAST(rowkey AS STRING) AS rowkey, " +
> > " ROW(" +
> > " device_id, pass_id, first_date, first_channel_id, first_app_version, first_server_time, first_server_hour, first_ip_location, first_login_time, sys_can_uninstall, update_date, server_time, last_pass_id, last_channel_id, last_app_version, last_date, os, attribution_channel_id, attribution_first_date, p_product, p_project, p_dt " +
> > ") AS f1" +
> > " FROM " +
> > "(" +
> > " SELECT " +
> > " MD5(CONCAT_WS('|', kafka.uid, kafka.p_product, kafka.p_project)) AS rowkey, " +
> > " kafka.uid AS device_id " +
> > ",kafka.pass_id " +
> >
> > // first_date
> > ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
> > // 新用户
> > " THEN FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd') " +
> > // 老用户
> > " ELSE hbase.first_date END AS first_date " +
> >
> > // first_channel_id
> > ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
> > // 新用户
> > " THEN kafka.wlb_channel_id" +
> > // 老用户
> > " ELSE hbase.first_channel_id END AS first_channel_id " +
> >
> > // first_app_version
> > ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
> > // 新用户
> > " THEN kafka.app_version " +
> > // 老用户
> > " ELSE hbase.first_app_version END AS first_app_version " +
> >
> > // first_server_time
> > ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
> > // 新用户
> > " THEN FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd HH:mm:ss') " +
> > // 老用户
> > " ELSE hbase.first_server_time END AS first_server_time " +
> >
> > // first_server_hour
> > ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
> > // 新用户
> > " THEN FROM_UNIXTIME(kafka.server_time, 'HH') " +
> > // 老用户
> > " ELSE hbase.first_server_hour END AS first_server_hour " +
> >
> > // first_ip_location
> > ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
> > // 新用户
> > " THEN kafka.ip_location " +
> > // 老用户
> > " ELSE hbase.first_ip_location END AS first_ip_location " +
> >
> > // first_login_time
> > ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
> > // 新用户
> > " THEN FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd HH:mm:ss') " +
> > // 老用户
> > " ELSE hbase.first_login_time END AS first_login_time " +
> >
> > ",kafka.sys_can_uninstall " +
> >
> > // update_date
> > ",CASE WHEN hbase.pass_id = 0 " +
> > " THEN CAST(FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd') AS string) " +
> > " END AS update_date " + // VARCHAR(2000)
> >
> > // server_time
> > ",kafka.server_time" +
> >
> > ", kafka.pass_id AS last_pass_id" +
> > ", kafka.wlb_channel_id AS last_channel_id" +
> > ", kafka.app_version AS last_app_version" +
> > ", CAST(FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd') AS STRING) AS last_date" + // VARCHAR(2000)
> > ", kafka.os" +
> > ", hbase.attribution_channel_id" +
> > ", hbase.attribution_first_date" +
> > ", kafka.p_product" +
> > ", kafka.p_project" +
> > ", kafka.p_dt" +
> >
> > " FROM test_hive_catalog.test_ods.test_ods_header AS kafka " +
> > " FULL JOIN test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping AS hbase " +
> > " ON kafka.uid = hbase.device_id " + // TODO 这里uid,后面要改成device_id
> > " WHERE kafka.is_body=1 AND kafka.is_local=1" +
> > ")";
> >
> >
>
Re: HELP!!! Flink1.10 sql insert into HBase, error like
validateSchemaAndApplyImplicitCast
Posted by Leonard Xu <xb...@gmail.com>.
Hi, Jim
Could you post error message in text that contains the entire schema of query and sink? I doubt there are some fields type were mismatched.
Best,
Leonard Xu
> 在 2020年7月16日,10:29,Jim Chen <ch...@gmail.com> 写道:
>
> Hi,
> I use flink1.10.1 sql to connect hbase1.4.3. When inset into hbase, report an error like validateSchemaAndApplyImplicitCast. Means that the Query Schema and Sink Schema are inconsistent.
> Mainly Row (EXPR$0) in Query Schema, which are all expressions. Sink Schema is Row(device_id). I don't know how to write in sql to be consistent with hbase's sink schema.
> I try to write sql like select device_id as rowkey, ROW( device_id as [cannot write as] ) as f1
>
> error message as follow:
> <image.png>
>
> sample code like:
> HBase sink ddl:
> String ddlSource = "CREATE TABLE test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping (\n" +
> " rowkey STRING,\n" +
> " f1 ROW< \n" +
> " device_id STRING,\n" +
> " pass_id STRING,\n" +
> " first_date STRING,\n" +
> " first_channel_id STRING,\n" +
> " first_app_version STRING,\n" +
> " first_server_time STRING,\n" +
> " first_server_hour STRING,\n" +
> " first_ip_location STRING,\n" +
> " first_login_time STRING,\n" +
> " sys_can_uninstall STRING,\n" +
> " update_date STRING,\n" +
> " server_time BIGINT,\n" +
> " last_pass_id STRING,\n" +
> " last_channel_id STRING,\n" +
> " last_app_version STRING,\n" +
> " last_date STRING,\n" +
> " os STRING,\n" +
> " attribution_channel_id STRING,\n" +
> " attribution_first_date STRING,\n" +
> " p_product STRING,\n" +
> " p_project STRING,\n" +
> " p_dt STRING\n" +
> " >\n" +
> ") WITH (\n" +
> " 'connector.type' = 'hbase',\n" +
> " 'connector.version' = '1.4.3',\n" + // 即使绕过语法编译,换其他版本的hbase,还是有问题,如线上的版本就不行
> " 'connector.table-name' = 'dw_common_mobile_device_user_mapping_new',\n" +
> " 'connector.zookeeper.quorum' = '"+ zookeeperServers +"',\n" +
> " 'connector.zookeeper.znode.parent' = '/hbase143',\n" +
> " 'connector.write.buffer-flush.max-size' = '2mb',\n" +
> " 'connector.write.buffer-flush.max-rows' = '1000',\n" +
> " 'connector.write.buffer-flush.interval' = '2s'\n" +
> ")";
>
> insert into sql:
>
> String bodyAndLocalSql = "" +
> // "insert into test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping " +
> "SELECT CAST(rowkey AS STRING) AS rowkey, " +
> " ROW(" +
> " device_id, pass_id, first_date, first_channel_id, first_app_version, first_server_time, first_server_hour, first_ip_location, first_login_time, sys_can_uninstall, update_date, server_time, last_pass_id, last_channel_id, last_app_version, last_date, os, attribution_channel_id, attribution_first_date, p_product, p_project, p_dt " +
> ") AS f1" +
> " FROM " +
> "(" +
> " SELECT " +
> " MD5(CONCAT_WS('|', kafka.uid, kafka.p_product, kafka.p_project)) AS rowkey, " +
> " kafka.uid AS device_id " +
> ",kafka.pass_id " +
>
> // first_date
> ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
> // 新用户
> " THEN FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd') " +
> // 老用户
> " ELSE hbase.first_date END AS first_date " +
>
> // first_channel_id
> ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
> // 新用户
> " THEN kafka.wlb_channel_id" +
> // 老用户
> " ELSE hbase.first_channel_id END AS first_channel_id " +
>
> // first_app_version
> ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
> // 新用户
> " THEN kafka.app_version " +
> // 老用户
> " ELSE hbase.first_app_version END AS first_app_version " +
>
> // first_server_time
> ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
> // 新用户
> " THEN FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd HH:mm:ss') " +
> // 老用户
> " ELSE hbase.first_server_time END AS first_server_time " +
>
> // first_server_hour
> ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
> // 新用户
> " THEN FROM_UNIXTIME(kafka.server_time, 'HH') " +
> // 老用户
> " ELSE hbase.first_server_hour END AS first_server_hour " +
>
> // first_ip_location
> ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
> // 新用户
> " THEN kafka.ip_location " +
> // 老用户
> " ELSE hbase.first_ip_location END AS first_ip_location " +
>
> // first_login_time
> ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
> // 新用户
> " THEN FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd HH:mm:ss') " +
> // 老用户
> " ELSE hbase.first_login_time END AS first_login_time " +
>
> ",kafka.sys_can_uninstall " +
>
> // update_date
> ",CASE WHEN hbase.pass_id = 0 " +
> " THEN CAST(FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd') AS string) " +
> " END AS update_date " + // VARCHAR(2000)
>
> // server_time
> ",kafka.server_time" +
>
> ", kafka.pass_id AS last_pass_id" +
> ", kafka.wlb_channel_id AS last_channel_id" +
> ", kafka.app_version AS last_app_version" +
> ", CAST(FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd') AS STRING) AS last_date" + // VARCHAR(2000)
> ", kafka.os" +
> ", hbase.attribution_channel_id" +
> ", hbase.attribution_first_date" +
> ", kafka.p_product" +
> ", kafka.p_project" +
> ", kafka.p_dt" +
>
> " FROM test_hive_catalog.test_ods.test_ods_header AS kafka " +
> " FULL JOIN test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping AS hbase " +
> " ON kafka.uid = hbase.device_id " + // TODO 这里uid,后面要改成device_id
> " WHERE kafka.is_body=1 AND kafka.is_local=1" +
> ")";
>
>