You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Zhou Zach <is...@foxmail.com> on 2020/06/16 13:14:40 UTC

回复: Re: flink sql read hbase sink mysql data type not match

2020-06-16 21:01:09,756 INFO&nbsp; org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser&nbsp; - Kafka version: unknown
2020-06-16 21:01:09,757 INFO&nbsp; org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser&nbsp; - Kafka commitId: unknown
2020-06-16 21:01:09,758 INFO&nbsp; org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer&nbsp; - [Consumer clientId=consumer-7, groupId=null] Subscribed to partition(s): user_behavior-0
2020-06-16 21:01:09,765 INFO&nbsp; org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata&nbsp; - Cluster ID: cAT_xBISQNWghT9kR5UuIw
2020-06-16 21:01:09,766 WARN&nbsp; org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig&nbsp; - The configuration 'zookeeper.connect' was supplied but isn't a known config.
2020-06-16 21:01:09,766 INFO&nbsp; org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser&nbsp; - Kafka version: unknown
2020-06-16 21:01:09,767 INFO&nbsp; org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser&nbsp; - Kafka commitId: unknown
2020-06-16 21:01:09,768 INFO&nbsp; org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.Fetcher&nbsp; - [Consumer clientId=consumer-7, groupId=null] Resetting offset for partition user_behavior-0 to offset 43545.
2020-06-16 21:01:35,904 INFO&nbsp; org.apache.flink.addons.hbase.HBaseLookupFunction&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;- start close ...
2020-06-16 21:01:35,906 INFO&nbsp; org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - Close zookeeper connection 0x72d39885 to cdh1:2181,cdh2:2181,cdh3:2181
2020-06-16 21:01:35,908 INFO&nbsp; org.apache.flink.addons.hbase.HBaseLookupFunction&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;- end close.
2020-06-16 21:01:35,908 INFO&nbsp; org.apache.zookeeper.ZooKeeper&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - Session: 0x172b776fac80ae4 closed
2020-06-16 21:01:35,909 INFO&nbsp; org.apache.zookeeper.ClientCnxn&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;- EventThread shut down
2020-06-16 21:01:35,911 INFO&nbsp; org.apache.flink.runtime.taskmanager.Task&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;- Source: KafkaTableSource(uid, phoneType, clickCount, time) -&gt; SourceConversion(table=[default_catalog.default_database.user_behavior, source: [KafkaTableSource(uid, phoneType, clickCount, time)]], fields=[uid, phoneType, clickCount, time]) -&gt; Calc(select=[uid, time]) -&gt; LookupJoin(table=[HBaseTableSource[schema=[rowkey, cf], projectFields=null]], joinType=[InnerJoin], async=[false], lookup=[rowkey=uid], select=[uid, time, rowkey, cf]) -&gt; Calc(select=[CAST(time) AS time, cf.age AS age]) -&gt; SinkConversionToTuple2 -&gt; Sink: JDBCUpsertTableSink(time, age) (1/2) (e45989f173dc35aefc52413349db7f30) switched from RUNNING to FAILED.
java.lang.IllegalArgumentException: offset (0) + length (4) exceed the capacity of the array: 2
	at org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:838)
	at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:1004)
	at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:980)
	at org.apache.flink.addons.hbase.util.HBaseTypeUtils.deserializeToObject(HBaseTypeUtils.java:55)
	at org.apache.flink.addons.hbase.util.HBaseReadWriteHelper.parseToRow(HBaseReadWriteHelper.java:158)
	at org.apache.flink.addons.hbase.HBaseLookupFunction.eval(HBaseLookupFunction.java:78)
	at LookupFunction$12.flatMap(Unknown Source)
	at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82)
	at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36)
	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
	at StreamExecCalc$7.processElement(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
	at SourceConversion$6.processElement(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)



Query:
val hConf = HBaseConfiguration.create()
hConf.set(HConstants.ZOOKEEPER_QUORUM, "cdh1:2181,cdh2:2181,cdh3:2181")
hConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase")

val users = new HBaseTableSource(hConf, "user_hbase5")
users.setRowKey("rowkey", classOf[String]) // currency as the primary key
users.addColumn("cf", "age", classOf[Integer])

streamTableEnv.registerTableSource("users", users)


streamTableEnv.sqlUpdate(
  """
    |
    |insert into  time_age
    |SELECT
    |  cast(b.`time` as string) as `time`,  u.cf.age
    |FROM
    |  (select * , PROCTIME() AS proctime from user_behavior) AS b
    |  JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
    |  ON b.uid = u.rowkey
    |
    |""".stripMargin)
offset (0) + length (4) exceed the capacity of the array: 2
这个错误提示 是不是 hbase取出来的int类型,
用users.addColumn("cf", "age", classOf[Integer]) 来转换是不是不对,
怎么把int转换成Integer呢或者把Integer转换成int




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"libenchao"<libenchao@apache.org&gt;;
发送时间:&nbsp;2020年6月16日(星期二) 晚上7:56
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: Re: flink sql read hbase sink mysql data type not match



不能直接cast,ROW类型是一个复合类型,要获取其中的某个字段,可以用`.`来获取。
比如你现在这个场景,就是 SELECT rowkey, cf.age FROM users

Zhou Zach <ispmd@foxmail.com&gt; 于2020年6月16日周二 下午6:59写道:

&gt; flink sql 怎么将ROW<`age` INT&amp;gt;转换成INT啊
&gt;
&gt;
&gt; streamTableEnv.sqlUpdate(
&gt;&nbsp;&nbsp; """
&gt;&nbsp;&nbsp;&nbsp;&nbsp; |
&gt;&nbsp;&nbsp;&nbsp;&nbsp; |insert into&nbsp; user_age
&gt;&nbsp;&nbsp;&nbsp;&nbsp; |SELECT rowkey, cast(cf as int) as age
&gt;&nbsp;&nbsp;&nbsp;&nbsp; |FROM
&gt;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp; users
&gt;&nbsp;&nbsp;&nbsp;&nbsp; |
&gt;&nbsp;&nbsp;&nbsp;&nbsp; |""".stripMargin)这样尝试报错了