You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2020/04/13 02:59:00 UTC

[jira] [Commented] (FLINK-17097) Flink HBase Connector String field size at least equal 8

    [ https://issues.apache.org/jira/browse/FLINK-17097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17082006#comment-17082006 ] 

Jark Wu commented on FLINK-17097:
---------------------------------

I think we should collect nothing if rowkey is invalid. Would you like to take this issue ? [~xingoo]

> Flink HBase Connector String field size at least equal 8
> --------------------------------------------------------
>
>                 Key: FLINK-17097
>                 URL: https://issues.apache.org/jira/browse/FLINK-17097
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / HBase
>    Affects Versions: 1.10.0
>            Reporter: xingoo
>            Priority: Major
>
> when using string field in hbase connector, the rowkey length at least 8, becuase byte[] size at least 8.
> example:
> {code:java}
> //代码占位符
> rowkey: "1"
> {code}
> when using it as lookup function:
> {code:java}
> //代码占位符
> Caused by: java.lang.IllegalArgumentException: offset (0) + length (8) exceed the capacity of the array: 1
> 	at org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:779)
> 	at org.apache.hadoop.hbase.util.Bytes.toLong(Bytes.java:753)
> 	at org.apache.hadoop.hbase.util.Bytes.toLong(Bytes.java:726)
> 	at org.apache.flink.addons.hbase.util.HBaseTypeUtils.deserializeToObject(HBaseTypeUtils.java:57)
> 	at org.apache.flink.addons.hbase.util.HBaseReadWriteHelper.parseToRow(HBaseReadWriteHelper.java:158)
> 	at org.apache.flink.addons.hbase.HBaseLookupFunction.eval(HBaseLookupFunction.java:78)
> 	at StreamExecCorrelate$144.processElement(Unknown Source)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> 	at StreamExecCalc$134.processElement(Unknown Source)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> 	at SourceConversion$127.processElement(Unknown Source)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> 	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)
> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
> {code}
> trace code:
> {code:java}
> //代码占位符
> @Internal
> public class HBaseTypeUtils {
>  private static final byte[] EMPTY_BYTES = new byte[]{};
>  /**
>   * Deserialize byte array to Java Object with the given type.
>   */
>  public static Object deserializeToObject(byte[] value, int typeIdx, Charset stringCharset) {
>   switch (typeIdx) {
>    case 0: // byte[]
>     return value;
>    case 1: // String
>     return new String(value, stringCharset);
> ...
> public String(byte bytes[], Charset charset) {
>     this(bytes, 0, bytes.length, charset);
> }{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)