You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2020/04/13 02:59:00 UTC
[jira] [Commented] (FLINK-17097) Flink HBase Connector String field
size at least equal 8
[ https://issues.apache.org/jira/browse/FLINK-17097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17082006#comment-17082006 ]
Jark Wu commented on FLINK-17097:
---------------------------------
I think we should collect nothing if rowkey is invalid. Would you like to take this issue ? [~xingoo]
> Flink HBase Connector String field size at least equal 8
> --------------------------------------------------------
>
> Key: FLINK-17097
> URL: https://issues.apache.org/jira/browse/FLINK-17097
> Project: Flink
> Issue Type: Bug
> Components: Connectors / HBase
> Affects Versions: 1.10.0
> Reporter: xingoo
> Priority: Major
>
> when using string field in hbase connector, the rowkey length at least 8, becuase byte[] size at least 8.
> example:
> {code:java}
> //代码占位符
> rowkey: "1"
> {code}
> when using it as lookup function:
> {code:java}
> //代码占位符
> Caused by: java.lang.IllegalArgumentException: offset (0) + length (8) exceed the capacity of the array: 1
> at org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:779)
> at org.apache.hadoop.hbase.util.Bytes.toLong(Bytes.java:753)
> at org.apache.hadoop.hbase.util.Bytes.toLong(Bytes.java:726)
> at org.apache.flink.addons.hbase.util.HBaseTypeUtils.deserializeToObject(HBaseTypeUtils.java:57)
> at org.apache.flink.addons.hbase.util.HBaseReadWriteHelper.parseToRow(HBaseReadWriteHelper.java:158)
> at org.apache.flink.addons.hbase.HBaseLookupFunction.eval(HBaseLookupFunction.java:78)
> at StreamExecCorrelate$144.processElement(Unknown Source)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> at StreamExecCalc$134.processElement(Unknown Source)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> at SourceConversion$127.processElement(Unknown Source)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
> at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)
> at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
> {code}
> trace code:
> {code:java}
> //代码占位符
> @Internal
> public class HBaseTypeUtils {
> private static final byte[] EMPTY_BYTES = new byte[]{};
> /**
> * Deserialize byte array to Java Object with the given type.
> */
> public static Object deserializeToObject(byte[] value, int typeIdx, Charset stringCharset) {
> switch (typeIdx) {
> case 0: // byte[]
> return value;
> case 1: // String
> return new String(value, stringCharset);
> ...
> public String(byte bytes[], Charset charset) {
> this(bytes, 0, bytes.length, charset);
> }{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)