You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Zhou Zach <is...@foxmail.com> on 2020/06/16 13:14:40 UTC
回复: Re: flink sql read hbase sink mysql data type not match
2020-06-16 21:01:09,756 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka version: unknown
2020-06-16 21:01:09,757 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: unknown
2020-06-16 21:01:09,758 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-7, groupId=null] Subscribed to partition(s): user_behavior-0
2020-06-16 21:01:09,765 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata - Cluster ID: cAT_xBISQNWghT9kR5UuIw
2020-06-16 21:01:09,766 WARN org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'zookeeper.connect' was supplied but isn't a known config.
2020-06-16 21:01:09,766 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka version: unknown
2020-06-16 21:01:09,767 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: unknown
2020-06-16 21:01:09,768 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-7, groupId=null] Resetting offset for partition user_behavior-0 to offset 43545.
2020-06-16 21:01:35,904 INFO org.apache.flink.addons.hbase.HBaseLookupFunction - start close ...
2020-06-16 21:01:35,906 INFO org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient - Close zookeeper connection 0x72d39885 to cdh1:2181,cdh2:2181,cdh3:2181
2020-06-16 21:01:35,908 INFO org.apache.flink.addons.hbase.HBaseLookupFunction - end close.
2020-06-16 21:01:35,908 INFO org.apache.zookeeper.ZooKeeper - Session: 0x172b776fac80ae4 closed
2020-06-16 21:01:35,909 INFO org.apache.zookeeper.ClientCnxn - EventThread shut down
2020-06-16 21:01:35,911 INFO org.apache.flink.runtime.taskmanager.Task - Source: KafkaTableSource(uid, phoneType, clickCount, time) -> SourceConversion(table=[default_catalog.default_database.user_behavior, source: [KafkaTableSource(uid, phoneType, clickCount, time)]], fields=[uid, phoneType, clickCount, time]) -> Calc(select=[uid, time]) -> LookupJoin(table=[HBaseTableSource[schema=[rowkey, cf], projectFields=null]], joinType=[InnerJoin], async=[false], lookup=[rowkey=uid], select=[uid, time, rowkey, cf]) -> Calc(select=[CAST(time) AS time, cf.age AS age]) -> SinkConversionToTuple2 -> Sink: JDBCUpsertTableSink(time, age) (1/2) (e45989f173dc35aefc52413349db7f30) switched from RUNNING to FAILED.
java.lang.IllegalArgumentException: offset (0) + length (4) exceed the capacity of the array: 2
at org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:838)
at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:1004)
at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:980)
at org.apache.flink.addons.hbase.util.HBaseTypeUtils.deserializeToObject(HBaseTypeUtils.java:55)
at org.apache.flink.addons.hbase.util.HBaseReadWriteHelper.parseToRow(HBaseReadWriteHelper.java:158)
at org.apache.flink.addons.hbase.HBaseLookupFunction.eval(HBaseLookupFunction.java:78)
at LookupFunction$12.flatMap(Unknown Source)
at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82)
at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at StreamExecCalc$7.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at SourceConversion$6.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
Query:
val hConf = HBaseConfiguration.create()
hConf.set(HConstants.ZOOKEEPER_QUORUM, "cdh1:2181,cdh2:2181,cdh3:2181")
hConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase")
val users = new HBaseTableSource(hConf, "user_hbase5")
users.setRowKey("rowkey", classOf[String]) // currency as the primary key
users.addColumn("cf", "age", classOf[Integer])
streamTableEnv.registerTableSource("users", users)
streamTableEnv.sqlUpdate(
"""
|
|insert into time_age
|SELECT
| cast(b.`time` as string) as `time`, u.cf.age
|FROM
| (select * , PROCTIME() AS proctime from user_behavior) AS b
| JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
| ON b.uid = u.rowkey
|
|""".stripMargin)
offset (0) + length (4) exceed the capacity of the array: 2
这个错误提示 是不是 hbase取出来的int类型,
用users.addColumn("cf", "age", classOf[Integer]) 来转换是不是不对,
怎么把int转换成Integer呢或者把Integer转换成int
------------------ 原始邮件 ------------------
发件人: "libenchao"<libenchao@apache.org>;
发送时间: 2020年6月16日(星期二) 晚上7:56
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: Re: Re: flink sql read hbase sink mysql data type not match
不能直接cast,ROW类型是一个复合类型,要获取其中的某个字段,可以用`.`来获取。
比如你现在这个场景,就是 SELECT rowkey, cf.age FROM users
Zhou Zach <ispmd@foxmail.com> 于2020年6月16日周二 下午6:59写道:
> flink sql 怎么将ROW<`age` INT&gt;转换成INT啊
>
>
> streamTableEnv.sqlUpdate(
> """
> |
> |insert into user_age
> |SELECT rowkey, cast(cf as int) as age
> |FROM
> | users
> |
> |""".stripMargin)这样尝试报错了