You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "panszobe (via GitHub)" <gi...@apache.org> on 2023/03/18 00:57:35 UTC

[GitHub] [pulsar-client-go] panszobe opened a new issue, #993: Messages unacked when EnableBatchIndexAck is false

panszobe opened a new issue, #993:
URL: https://github.com/apache/pulsar-client-go/issues/993

   #### Expected behavior
   
   All of the messages should be acked when `EnableBatchIndexAck` is false, as the same behavior of setting `EnableBatchIndexAck` true.
   
   #### Actual behavior
   
   When setting `EnableBatchIndexAck` to false, there were more and more messaged unacked, Grafana dashboards as below:
   
   <img width="1377" alt="image" src="https://user-images.githubusercontent.com/75996911/226072338-060010cb-73ef-451a-a7ff-0212222f921e.png">
   
   And it led to that Pulsar Server would take more and more memory to handle unacked messages though Client could consume messages at the same time, but backlog became larger, and will cause consuming duplicated messages when restarting consumer.
   
   #### System configuration
   
   pulsar-client-go version: master(**v0.9.1-0.20230313030101-bcbac9f2ae5c**)
   
   
   #### Code Review
   I think the root cause is as below:
   
   ```
   func (pc *partitionConsumer) ackID(msgID MessageID, withResponse bool) error {
   	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
   		pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
   		return errors.New("consumer state is closed")
   	}
   
   	if cmid, ok := msgID.(*chunkMessageID); ok {
   		return pc.unAckChunksTracker.ack(cmid)
   	}
   
   	trackingID := toTrackingMessageID(msgID)
   
   	if trackingID != nil && trackingID.ack() {
   		pc.metrics.AcksCounter.Inc()
   		pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9)
   	} else if !pc.options.enableBatchIndexAck {
   		return nil
   	}
   
   	var ackReq *ackRequest
   	if withResponse {
   		ackReq := pc.sendIndividualAck(trackingID)
   		<-ackReq.doneCh
   	} else {
   		pc.ackGroupingTracker.add(trackingID)
   	}
   	pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
   	if ackReq == nil {
   		return nil
   	}
   	return ackReq.err
   }
   
   func (t *timedAckGroupingTracker) tryAddIndividual(id MessageID) map[[2]uint64]*bitset.BitSet {
   	t.Lock()
   	defer t.Unlock()
   	key := [2]uint64{uint64(id.LedgerID()), uint64(id.EntryID())}
   
   	batchIdx := id.BatchIdx()
   	batchSize := id.BatchSize()
   
   	if batchIdx >= 0 && batchSize > 0 {
   		bs, found := t.pendingAcks[key]
   		if !found {
   			if batchSize > 1 {
   				bs = bitset.New(uint(batchSize))
   				for i := uint(0); i < uint(batchSize); i++ {
   					bs.Set(i)
   				}
   			}
   			t.pendingAcks[key] = bs
   		}
   		if bs != nil {
   			bs.Clear(uint(batchIdx))
   		}
   	} else {
   		t.pendingAcks[key] = nil
   	}
   
   	if len(t.pendingAcks) >= t.maxNumAcks {
   		pendingAcks := t.pendingAcks
   		t.pendingAcks = make(map[[2]uint64]*bitset.BitSet)
   		return pendingAcks
   	}
   	return nil
   }
   ```
   
   When `EnableBatchIndexAck` is false, if `trackingID.ack()` is false, which means that not all of the messages acked, at this time the message will not be added by `ackGroupingTracker`, the message will not be pushed into the `pendingAcks`. 
   At this scene, there are some messages in the same batch always split to different flush batches to send ACK request because of  the limit of `AckGroupingOptions`, though the `ackTracker` will record the batch and the Last message will be added to `pendingAcks` of `ackGroupingTracker`, but according to `timedAckGroupingTracker.tryAddIndividual()` implementation, it will set other bitsets to 1 and only clear bitset of  the such message  when batch message added at the first time, this is correct under `EnableBatchIndexAck` true situation, but will cause some messages unacked under `EnableBatchIndexAck` false situation. Only one batchIdx of the batch can be acked.
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] BewareMyPower commented on issue #993: Messages unacked when EnableBatchIndexAck is false

Posted by "BewareMyPower (via GitHub)" <gi...@apache.org>.
BewareMyPower commented on issue #993:
URL: https://github.com/apache/pulsar-client-go/issues/993#issuecomment-1474716166

   I will push the PR soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] panszobe commented on issue #993: Messages unacked when EnableBatchIndexAck is false

Posted by "panszobe (via GitHub)" <gi...@apache.org>.
panszobe commented on issue #993:
URL: https://github.com/apache/pulsar-client-go/issues/993#issuecomment-1475502833

   > I will push the PR soon.
   
   OK, I will take a test after the PR merged. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] shibd closed issue #993: Messages unacked when EnableBatchIndexAck is false

Posted by "shibd (via GitHub)" <gi...@apache.org>.
shibd closed issue #993: Messages unacked when EnableBatchIndexAck is false 
URL: https://github.com/apache/pulsar-client-go/issues/993


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] BewareMyPower commented on issue #993: Messages unacked when EnableBatchIndexAck is false

Posted by "BewareMyPower (via GitHub)" <gi...@apache.org>.
BewareMyPower commented on issue #993:
URL: https://github.com/apache/pulsar-client-go/issues/993#issuecomment-1474677998

   Since you have found this problem, could you open a PR to fix it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] panszobe commented on issue #993: Messages unacked when EnableBatchIndexAck is false

Posted by "panszobe (via GitHub)" <gi...@apache.org>.
panszobe commented on issue #993:
URL: https://github.com/apache/pulsar-client-go/issues/993#issuecomment-1474553501

   @BewareMyPower Hi, could you have a look? I met this problem in my local env test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org