You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "xingoo (Jira)" <ji...@apache.org> on 2020/04/13 02:52:00 UTC

[jira] [Created] (FLINK-17097) Flink HBase Connector String field size at least equal 8

xingoo created FLINK-17097:
------------------------------

             Summary: Flink HBase Connector String field size at least equal 8
                 Key: FLINK-17097
                 URL: https://issues.apache.org/jira/browse/FLINK-17097
             Project: Flink
          Issue Type: Bug
          Components: Connectors / HBase
    Affects Versions: 1.10.0
            Reporter: xingoo


when using string field in hbase connector, the rowkey length at least 8, becuase byte[] size at least 8.

example:
{code:java}
//代码占位符
rowkey: "1"
{code}
when using it as lookup function:
{code:java}
//代码占位符
Caused by: java.lang.IllegalArgumentException: offset (0) + length (8) exceed the capacity of the array: 1
	at org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:779)
	at org.apache.hadoop.hbase.util.Bytes.toLong(Bytes.java:753)
	at org.apache.hadoop.hbase.util.Bytes.toLong(Bytes.java:726)
	at org.apache.flink.addons.hbase.util.HBaseTypeUtils.deserializeToObject(HBaseTypeUtils.java:57)
	at org.apache.flink.addons.hbase.util.HBaseReadWriteHelper.parseToRow(HBaseReadWriteHelper.java:158)
	at org.apache.flink.addons.hbase.HBaseLookupFunction.eval(HBaseLookupFunction.java:78)
	at StreamExecCorrelate$144.processElement(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
	at StreamExecCalc$134.processElement(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
	at SourceConversion$127.processElement(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)

{code}
trace code:
{code:java}
//代码占位符
@Internal
public class HBaseTypeUtils {

 private static final byte[] EMPTY_BYTES = new byte[]{};

 /**
  * Deserialize byte array to Java Object with the given type.
  */
 public static Object deserializeToObject(byte[] value, int typeIdx, Charset stringCharset) {
  switch (typeIdx) {
   case 0: // byte[]
    return value;
   case 1: // String
    return new String(value, stringCharset);

...

public String(byte bytes[], Charset charset) {
    this(bytes, 0, bytes.length, charset);
}{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)