You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/03/08 09:42:56 UTC

[GitHub] [pulsar] ilyam8 opened a new issue #6510: negative `unackedMessages` in consumers stats

ilyam8 opened a new issue #6510: negative `unackedMessages` in consumers stats
URL: https://github.com/apache/pulsar/issues/6510
 
 
   **Describe the bug**
   
   Pulsar docker image `apachepulsar/pulsar:2.5.0`. Standalone mode.
   
   I noticed that `pulsar_subscription_unacked_messages` value in the prometheus stats is a negative number for `non-persitent` topics.
   
   **According my tests (not extensive) the bug appears only when topic type is `non-persistent`**.
   
   Related: #5929
   
   ___
   
   That is what i get:
   
   ```cmd
   pulsar_subscription_unacked_messages{cluster="standalone",namespace="sample/ns1",topic="non-persistent://sample/ns1/demo-1",subscription="my-consumer-11"} -4739 1583659567277
   ```
   
   
   ```json
   {
       "non-persistent://sample/ns1/demo-1": {
           "publishers": [
               {
                   "msgRateIn": 8.998,
                   "msgThroughputIn": 449.904,
                   "averageMsgSize": 50,
                   "address": "/172.17.0.1:34560",
                   "producerId": 0,
                   "producerName": "my-producer-11",
                   "connectedSince": "2020-03-08T09:17:45.526Z",
                   "clientVersion": "pulsar-client-go",
                   "metadata": {}
               }
           ],
           "replication": {},
           "subscriptions": {
               "my-consumer-11": {
                   "consumers": [
                       {
                           "address": "/172.17.0.1:34560",
                           "consumerName": "",
                           "availablePermits": 1,
                           "connectedSince": "2020-03-08T09:17:45.526Z",
                           "msgRateOut": 8.998,
                           "msgThroughputOut": 449.901,
                           "msgRateRedeliver": 0,
                           "unackedMessages": -5925,
                           "blockedConsumerOnUnackedMsgs": false,
                           "clientVersion": "pulsar-client-go",
                           "metadata": {}
                       }
                   ],
                   "msgBacklog": 0,
                   "msgRateExpired": 0,
                   "msgRateOut": 8.998,
                   "msgThroughputOut": 449.901,
                   "msgRateRedeliver": 0,
                   "type": "Shared",
                   "msgDropRate": 0
               }
           },
           "producerCount": 1,
           "averageMsgSize": 49.999,
           "msgRateIn": 8.998,
           "msgRateOut": 8.998,
           "msgThroughputIn": 449.904,
           "msgThroughputOut": 449.901
       }
   }
   ```
   
   
   **To Reproduce**
   Steps to reproduce the behavior:
   
   See the code below. It creates publisher/consumer for following topics:
   
   - `non-persistent://sample/ns1/demo-1` # has hegative `unackedMessages` in consumers stats
   - `persistent://sample/ns1/demo-1` # no problem
   
   And that is pretty much it.
   
   ```go
   package main
   
   import (
   	"context"
   	"fmt"
   	"log"
   	"os"
   	"os/signal"
   	"sync"
   	"syscall"
   	"time"
   
   	comcast "github.com/Comcast/pulsar-client-go"
   )
   
   var pulsarPool = comcast.NewManagedClientPool()
   
   func newPulsarProducer(name, topic string) *comcast.ManagedProducer {
   	return comcast.NewManagedProducer(pulsarPool, comcast.ManagedProducerConfig{
   		ManagedClientConfig: comcast.ManagedClientConfig{
   			ClientConfig: comcast.ClientConfig{
   				Addr: "pulsar://localhost:6650",
   			},
   		},
   		Topic: topic,
   		Name:  name,
   	})
   }
   
   func newPulsarConsumer(name, topic string) *comcast.ManagedConsumer {
   	return comcast.NewManagedConsumer(pulsarPool, comcast.ManagedConsumerConfig{
   		ManagedClientConfig: comcast.ManagedClientConfig{
   			ClientConfig: comcast.ClientConfig{
   				Addr: "pulsar://localhost:6650",
   			},
   		},
   		Topic: topic,
   		Name:  name,
   	})
   }
   
   func startPulsarProducer(p *comcast.ManagedProducer, wg *sync.WaitGroup, stop chan struct{}) {
   	defer func() {
   		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
   		defer cancel()
   		_ = p.Close(ctx)
   		wg.Done()
   	}()
   	produce := func(msg string) error {
   		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
   		defer cancel()
   		_, err := p.Send(ctx, []byte(msg))
   		return err
   	}
   
   loop:
   	for i := 0; ; i++ {
   		select {
   		case <-stop:
   			break loop
   		default:
   			msg := fmt.Sprintf("message-%d", i)
   			if err := produce(msg); err != nil {
   				log.Printf("producer error: %v\n", err)
   				break loop
   			}
   		}
   		time.Sleep(time.Millisecond * 100)
   	}
   }
   
   func startPulsarConsumer(c *comcast.ManagedConsumer, wg *sync.WaitGroup) {
   	defer func() {
   		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
   		defer cancel()
   		_ = c.Unsubscribe(ctx)
   		_ = c.Close(ctx)
   		wg.Done()
   	}()
   	consume := func() error {
   		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
   		defer cancel()
   
   		msg, err := c.Receive(ctx)
   		if err != nil {
   			return err
   		}
   		log.Println("received:", msg.Topic, string(msg.Payload), *msg.Msg.ConsumerId)
   
   		return c.Ack(ctx, msg)
   	}
   
   	for {
   		if err := consume(); err != nil {
   			log.Printf("consumer error: %v\n", err)
   			break
   		}
   	}
   }
   
   func doPulsar() {
   	stopCh := make(chan struct{})
   	go func() {
   		signalChan := make(chan os.Signal, 1)
   		signal.Notify(signalChan, syscall.SIGINT)
   		<-signalChan
   		close(stopCh)
   		log.Println("SIGINT received. Terminating...")
   	}()
   
   	topic1 := "non-persistent://sample/ns1/demo-1"
   	p11 := newPulsarProducer("my-producer-11", topic1)
   	c11 := newPulsarConsumer("my-consumer-11", topic1)
   
   	topic2 := "persistent://sample/ns1/demo-1"
   	p21 := newPulsarProducer("my-producer-21", topic2)
   	c21 := newPulsarConsumer("my-consumer-21", topic2)
   
   	var wg sync.WaitGroup
   	wg.Add(4)
   	go startPulsarProducer(p11, &wg, stopCh)
   	go startPulsarConsumer(c11, &wg)
   
   	go startPulsarProducer(p21, &wg, stopCh)
   	go startPulsarConsumer(c21, &wg)
   
   	wg.Wait()
   }
   
   func main() {
   	doPulsar()
   }
   
   ```
   
   
   **Expected behavior**
   
   Do not have negative `unackedMessages`/`pulsar_subscription_unacked_messages` values.
   
   
   **Desktop (please complete the following information):**
    - ProductName:	Mac OS X, ProductVersion:	10.15.3
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] codelipenghui commented on issue #6510: negative `unackedMessages` in consumers stats

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #6510: negative `unackedMessages` in consumers stats
URL: https://github.com/apache/pulsar/issues/6510#issuecomment-599328906
 
 
   @ilyam8 I think #5929 can fix this issue because of the value of `maxUnackedMessages` always 0 for a non-persistent topic's consumer.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] codelipenghui edited a comment on issue #6510: negative `unackedMessages` in consumers stats

Posted by GitBox <gi...@apache.org>.
codelipenghui edited a comment on issue #6510: negative `unackedMessages` in consumers stats
URL: https://github.com/apache/pulsar/issues/6510#issuecomment-599328906
 
 
   @ilyam8 I think #5929 can fix this issue because of the value of `maxUnackedMessages` always 0 for a non-persistent topic's consumer. And #5929 will release at 2.5.1 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] codelipenghui closed issue #6510: negative `unackedMessages` in consumers stats

Posted by GitBox <gi...@apache.org>.
codelipenghui closed issue #6510: negative `unackedMessages` in consumers stats
URL: https://github.com/apache/pulsar/issues/6510
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] jiazhai commented on issue #6510: negative `unackedMessages` in consumers stats

Posted by GitBox <gi...@apache.org>.
jiazhai commented on issue #6510: negative `unackedMessages` in consumers stats
URL: https://github.com/apache/pulsar/issues/6510#issuecomment-597430306
 
 
   @codelipenghui  #5929,  Would you please help on this issue? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] ilyam8 commented on issue #6510: negative `unackedMessages` in consumers stats

Posted by GitBox <gi...@apache.org>.
ilyam8 commented on issue #6510: negative `unackedMessages` in consumers stats
URL: https://github.com/apache/pulsar/issues/6510#issuecomment-599357946
 
 
   Oh, i didnt notice `2.5.1` label in the #5929, i thought is is in the `2.5.0`. 
   
   Ok, so this issue is duplicate of #5755, feel free to close. Thx!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] codelipenghui commented on issue #6510: negative `unackedMessages` in consumers stats

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #6510: negative `unackedMessages` in consumers stats
URL: https://github.com/apache/pulsar/issues/6510#issuecomment-599450219
 
 
   Ok, thanks for your feedback, close this issue via #5929

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services