You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hbase.apache.org by "Duo Zhang (Jira)" <ji...@apache.org> on 2022/09/06 15:10:00 UTC
[jira] [Reopened] (HBASE-27267) Delete causes timestamp to be negative
[ https://issues.apache.org/jira/browse/HBASE-27267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Duo Zhang reopened HBASE-27267:
-------------------------------
> Delete causes timestamp to be negative
> --------------------------------------
>
> Key: HBASE-27267
> URL: https://issues.apache.org/jira/browse/HBASE-27267
> Project: HBase
> Issue Type: Bug
> Affects Versions: 2.3.4
> Reporter: zhengsicheng
> Assignee: zhengsicheng
> Priority: Major
> Fix For: 3.0.0-alpha-1, 2.5.0, 2.4.5
>
> Attachments: image-2022-08-11-17-10-00-389.png, screenshot-1.png
>
>
> When client-1.1.6 and server-2.3.4 there is a case where the batch delete timestamp is negative
> # 1. RegionServer log message:
> {code:java}
> 2022-07-19 12:13:29,324 WARN [RS_OPEN_REGION-regionserver/HBASE-HOSTNAME1:16020-1.replicationSource.wal-reader.HBASE-HOSTNAME1.local%2C16020%2C1657184880284.HBASE-HOSTNAME1.local%2C16020%2C1657184880284.regiongroup-2,clusterB] hbase.KeyValueUtil: Timestamp cannot be negative, ts=-4323977095312258207, KeyValueBytesHex=\x00\x00\x00, offset=0, length=40
> 2022-07-19 12:13:29,324 WARN [RS_OPEN_REGION-regionserver/HBASE-HOSTNAME1:16020-1.replicationSource.wal-reader.HBASE-HOSTNAME1.local%2C16020%2C1657184880284.HBASE-HOSTNAME1.local%2C16020%2C1657184880284.regiongroup-2,clusterB] wal.ProtobufLogReader: Encountered a malformed edit, seeking back to last good position in file, from 1099261 to 1078224
> java.io.EOFException: EOF while reading 660 WAL KVs; started reading at 1078317 and read up to 1099261
> at org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.readNext(ProtobufLogReader.java:403)
> at org.apache.hadoop.hbase.regionserver.wal.ReaderBase.next(ReaderBase.java:97)
> at org.apache.hadoop.hbase.regionserver.wal.ReaderBase.next(ReaderBase.java:85)
> at org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.readNextEntryAndRecordReaderPosition(WALEntryStream.java:264)
> at org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.tryAdvanceEntry(WALEntryStream.java:178)
> at org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.hasNext(WALEntryStream.java:103)
> at org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.readWALEntries(ReplicationSourceWALReader.java:230)
> at org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.run(ReplicationSourceWALReader.java:145)
> Caused by: java.lang.IllegalArgumentException: Timestamp cannot be negative, ts=-4323977095312258207, KeyValueBytesHex=\x00\x00\x00, offset=0, length=40
> at org.apache.hadoop.hbase.KeyValueUtil.checkKeyValueBytes(KeyValueUtil.java:612)
> at org.apache.hadoop.hbase.KeyValue.<init>(KeyValue.java:346)
> at org.apache.hadoop.hbase.KeyValueUtil.createKeyValueFromInputStream(KeyValueUtil.java:717)
> at org.apache.hadoop.hbase.codec.KeyValueCodecWithTags$KeyValueDecoder.parseCell(KeyValueCodecWithTags.java:81)
> at org.apache.hadoop.hbase.codec.BaseDecoder.advance(BaseDecoder.java:68)
> at org.apache.hadoop.hbase.wal.WALEdit.readFromCells(WALEdit.java:276)
> at org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.readNext(ProtobufLogReader.java:387)
> ... 7 more
> {code}
> # 2. Debug WAL file ,found that the delete operation is caused
> {code:java}
> Sequence=365693989, table=tableA, region=148cedb7b8ca3145690800fd650e084d, at write timestamp=Sat Jul 16 00:50:01 CST 2022
> 2022-07-22 22:09:43,244 ERROR [main] wal.WALPrettyPrinter: Timestamp is negative row=rowkey1, column=d:act, timestamp=-4323977095312258207, type=Delete
> {code}
> # 3. User use spark read/write hbase
> batchsize is 10000
> {code:scala}
> def dataDeleteFromHbase(rdd: RDD[(String, String)], hbase_table: String, hbase_instance: String, hbase_accesskey: String, accumulator: LongAccumulator, buffersize: String, batchsize: Int): Unit = {
> rdd.foreachPartition(iterator => {
> val partitionId = TaskContext.getPartitionId()
> val conf = HBaseConfiguration.create()
> val connection = SparkHbaseUtils.getconnection(conf)
> val table = connection.getTable(TableName.valueOf(hbase_table))
> var deleteList = new util.LinkedList[Delete]()
> var count = 0
> var batchCount = 0
> while (iterator.hasNext) {
> val element = iterator.next
> val crc32 = new CRC32()
> crc32.update(s"${element._1}_${element._2}".getBytes())
> val crcArr = convertLow4bit2SmallEndan(crc32.getValue)
> val key = concat(DigestUtils.md5(s"${element._1}_${element._2}"), crcArr)
> val delete = new Delete(key)
> deleteList.add(delete)
> count += 1
> if (count % batchsize.toInt == 0) {
> batchCount = batchCount + 1
> try {
> table.delete(deleteList)
> } catch {
> case _: RetriesExhaustedWithDetailsException => {
> LOGGER.warn(s"======partitionId: ${partitionId}===batchCount: ${batchCount}===Wait 1000 ms, retry......============")
> Thread.sleep(1000)
> processDelThrottlingException(table, deleteList, partitionId, batchCount)
> }
> case _: ThrottlingException => {
> LOGGER.warn(s"======partitionId: ${partitionId}===batchCount: ${batchCount}===Wait 1000 ms, retry......============")
> Thread.sleep(1000)
> processDelThrottlingException(table, deleteList, partitionId, batchCount)
> }
> }
> LOGGER.warn(s"======partitionId: ${partitionId}===${batchCount * batchsize} rows delete success! ============")
> accumulator.add(batchsize)
> LOGGER.warn(s"##########################already delete count: ${accumulator.value}#######################")
> deleteList = new util.LinkedList[Delete]()
> }
> }
> if (CollectionUtils.isNotEmpty(deleteList)) {
> batchCount = batchCount + 1
> val listSize = deleteList.size()
> try {
> table.delete(deleteList)
> } catch {
> case _: RetriesExhaustedWithDetailsException => {
> LOGGER.warn(s"======partitionId: ${partitionId}===batchCount: ${batchCount}===Wait 1000 ms, retry......============")
> Thread.sleep(1000)
> processDelThrottlingException(table, deleteList, partitionId, batchCount)
> }
> case _: ThrottlingException => {
> LOGGER.warn(s"======partitionId: ${partitionId}===batchCount: ${batchCount}===Wait 1000 ms, retry......============")
> Thread.sleep(1000)
> processDelThrottlingException(table, deleteList, partitionId, batchCount)
> }
> }
> LOGGER.warn(s"======partitionId: ${partitionId}===${(batchCount - 1) * batchsize + listSize} rows delete success! ============")
> accumulator.add(listSize)
> LOGGER.warn(s"##########################already delete count: ${accumulator.value}#######################")
> }
> if (table != null) {
> table.close()
> }
> if (connection != null) {
> connection.close()
> }
> })
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)