You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/10 02:43:04 UTC

[GitHub] [pulsar] startjava created a discussion: what time merge about this version ?

GitHub user startjava created a discussion: what time merge about this version ?

https://github.com/apache/pulsar/blob/4ee346693df8fc8314f94d53b00283f1c6079dc1/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java

now 2.10.1 version no top url api.
thank .

GitHub link: https://github.com/apache/pulsar/discussions/17978

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org


[GitHub] [pulsar] liangyuanpeng added a comment to the discussion: what time merge about this version ?

Posted by GitBox <gi...@apache.org>.
GitHub user liangyuanpeng added a comment to the discussion: what time merge about this version ?

Change `                consumer.acknowledgeCumulative(msg);` to ` consumer.acknowledge(msg); ` also get what you want.

GitHub link: https://github.com/apache/pulsar/discussions/17978#discussioncomment-3844883

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org


[GitHub] [pulsar] liangyuanpeng added a comment to the discussion: what time merge about this version ?

Posted by GitBox <gi...@apache.org>.
GitHub user liangyuanpeng added a comment to the discussion: what time merge about this version ?

Seems like the release/2.11.0 plan have not include it but it exist in milestone 2.11.0.  

PR: https://github.com/apache/pulsar/pull/16817

/cc @codelipenghui 

GitHub link: https://github.com/apache/pulsar/discussions/17978#discussioncomment-3835798

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org


[GitHub] [pulsar] startjava added a comment to the discussion: what time merge about this version ?

Posted by GitBox <gi...@apache.org>.
GitHub user startjava added a comment to the discussion: what time merge about this version ?

```

package com.ghy.www.cumulativeacknowledgement.consumer.listener;


import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.stereotype.Component;

@Component
public class MyMessageListener1 implements MessageListener {
    @Override
    public void received(Consumer consumer, Message msg) {
        try {
            String messageString = new String(msg.getData());
            System.out.println(messageString);
            if (messageString.equals("acknowledge消息5")) {
                consumer.acknowledge(msg);
                System.out.println(messageString + "执行了acknowledge");
            }
        } catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
    }
}
```

two method both test , both return 0 .
i want implement Cumulative and Not Cumulative Diff 。

i code envirment has error ?


GitHub link: https://github.com/apache/pulsar/discussions/17978#discussioncomment-3844926

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org


[GitHub] [pulsar] liangyuanpeng added a comment to the discussion: what time merge about this version ?

Posted by GitBox <gi...@apache.org>.
GitHub user liangyuanpeng added a comment to the discussion: what time merge about this version ?

`bin/pulsar-admin topics partitioned-stats {topic}` or `bin/pulsar-admin topics stats {topic}` 

and you can get this message:

```
"subscriptions" : {
    {subscriptionsName} : {
      "consumers": [{
            "msgRateOut" : 0.0,
           "msgThroughputOut" : 0.0,
         ...
      }]
 ...
```

GitHub link: https://github.com/apache/pulsar/discussions/17978#discussioncomment-3835851

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org


[GitHub] [pulsar] liangyuanpeng added a comment to the discussion: what time merge about this version ?

Posted by GitBox <gi...@apache.org>.
GitHub user liangyuanpeng added a comment to the discussion: what time merge about this version ?

I'm sorry I misunderstood.



GitHub link: https://github.com/apache/pulsar/discussions/17978#discussioncomment-3835824

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org


[GitHub] [pulsar] startjava added a comment to the discussion: what time merge about this version ?

Posted by GitBox <gi...@apache.org>.
GitHub user startjava added a comment to the discussion: what time merge about this version ?

@liangyuanpeng  current version 2.10.1 getUnackedMessages() method  availability??

GitHub link: https://github.com/apache/pulsar/discussions/17978#discussioncomment-3844041

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org


[GitHub] [pulsar] startjava added a comment to the discussion: what time merge about this version ?

Posted by GitBox <gi...@apache.org>.
GitHub user startjava added a comment to the discussion: what time merge about this version ?

use bottom code print info :
```
    @RequestMapping("getUnAckMsgCount")
    public String getUnAckMsgCount() throws PulsarClientException, PulsarAdminException {
        {
            List<? extends ConsumerStats> consumers = pulsarAdmin.topics().getStats(myTopic1).getSubscriptions().get(myTopic1_subscriptionName).getConsumers();
            for (int i = 0; i < consumers.size(); i++) {
                ConsumerStats consumerStats = consumers.get(i);
                System.out.println(myTopic1 + " getConsumerName=" + consumerStats.getConsumerName() + " getUnackedMessages=" + consumerStats.getUnackedMessages());
            }
        }
        System.out.println();
        {
            List<? extends ConsumerStats> consumers = pulsarAdmin.topics().getStats(myTopic2).getSubscriptions().get(myTopic2_subscriptionName).getConsumers();
            for (int i = 0; i < consumers.size(); i++) {
                ConsumerStats consumerStats = consumers.get(i);
                System.out.println(myTopic2 + " getConsumerName=" + consumerStats.getConsumerName() + " getUnackedMessages=" + consumerStats.getUnackedMessages());
            }
        }
        return "成功获得";
    }
```


GitHub link: https://github.com/apache/pulsar/discussions/17978#discussioncomment-3844937

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org


[GitHub] [pulsar] startjava added a comment to the discussion: what time merge about this version ?

Posted by GitBox <gi...@apache.org>.
GitHub user startjava added a comment to the discussion: what time merge about this version ?

what time release&nbsp;2.11.0 RELEASE version?



------------------&nbsp;原始邮件&nbsp;------------------
发件人: ***@***.***&gt;; 
发送时间: 2022年10月10日(星期一) 上午10:52
收件人: ***@***.***&gt;; 
抄送: ***@***.***&gt;; ***@***.***&gt;; 
主题: Re: [apache/pulsar] what time merge about this version ? (Discussion #17978)





 
Seems like the release/2.11.0 plan have not include it but it exist in milestone 2.11.0.
 
PR: #16817
 
/cc @codelipenghui
 
—
Reply to this email directly, view it on GitHub, or unsubscribe.
You are receiving this because you authored the thread.Message ID: ***@***.***&gt;

GitHub link: https://github.com/apache/pulsar/discussions/17978#discussioncomment-3835808

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org


[GitHub] [pulsar] liangyuanpeng added a comment to the discussion: what time merge about this version ?

Posted by GitBox <gi...@apache.org>.
GitHub user liangyuanpeng added a comment to the discussion: what time merge about this version ?

You can try to do not ack message and get the getUnackedMessages .

GitHub link: https://github.com/apache/pulsar/discussions/17978#discussioncomment-3844734

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org


[GitHub] [pulsar] liangyuanpeng added a comment to the discussion: what time merge about this version ?

Posted by GitBox <gi...@apache.org>.
GitHub user liangyuanpeng added a comment to the discussion: what time merge about this version ?

Do you mean you want to get this message from `pulsar-admin`?

GitHub link: https://github.com/apache/pulsar/discussions/17978#discussioncomment-3835821

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org


[GitHub] [pulsar] liangyuanpeng added a comment to the discussion: what time merge about this version ?

Posted by GitBox <gi...@apache.org>.
GitHub user liangyuanpeng added a comment to the discussion: what time merge about this version ?

`pulsarAdmin.topics().getStats("").getSubscriptions().get("").getConsumers().get(index).getUnackedMessages()`

GitHub link: https://github.com/apache/pulsar/discussions/17978#discussioncomment-3835867

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org


[GitHub] [pulsar] liangyuanpeng added a comment to the discussion: what time merge about this version ?

Posted by GitBox <gi...@apache.org>.
GitHub user liangyuanpeng added a comment to the discussion: what time merge about this version ?

  M1-M5 would be acked when you acknowledgeCumulative M5 , that is the reason of you got the unackmessages is 0.

GitHub link: https://github.com/apache/pulsar/discussions/17978#discussioncomment-3844311

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org


[GitHub] [pulsar] startjava added a comment to the discussion: what time merge about this version ?

Posted by GitBox <gi...@apache.org>.
GitHub user startjava added a comment to the discussion: what time merge about this version ?

thank you ! very much ! i wait future version test this api .

GitHub link: https://github.com/apache/pulsar/discussions/17978#discussioncomment-3844343

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org


[GitHub] [pulsar] liangyuanpeng added a comment to the discussion: what time merge about this version ?

Posted by GitBox <gi...@apache.org>.
GitHub user liangyuanpeng added a comment to the discussion: what time merge about this version ?

This API is working for 2.10.1, you can use it on now.

GitHub link: https://github.com/apache/pulsar/discussions/17978#discussioncomment-3844714

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org


[GitHub] [pulsar] startjava added a comment to the discussion: what time merge about this version ?

Posted by GitBox <gi...@apache.org>.
GitHub user startjava added a comment to the discussion: what time merge about this version ?

@liangyuanpeng use which version test ?

GitHub link: https://github.com/apache/pulsar/discussions/17978#discussioncomment-3845454

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org


[GitHub] [pulsar] startjava added a comment to the discussion: what time merge about this version ?

Posted by GitBox <gi...@apache.org>.
GitHub user startjava added a comment to the discussion: what time merge about this version ?

thank you , but i want use java client get "unAckMsgCount" info 。
thank you too .




------------------&nbsp;原始邮件&nbsp;------------------
发件人: ***@***.***&gt;; 
发送时间: 2022年10月10日(星期一) 中午11:11
收件人: ***@***.***&gt;; 
抄送: ***@***.***&gt;; ***@***.***&gt;; 
主题: Re: [apache/pulsar] what time merge about this version ? (Discussion #17978)





 
bin/pulsar-admin topics partitioned-stats {topic} or bin/pulsar-admin topics stats {topic}
 
and you can get this message:
 "subscriptions" : {     {subscriptionsName} : {       "consumers": [{             "msgRateOut" : 0.0,            "msgThroughputOut" : 0.0,          ...       }]  ...  
—
Reply to this email directly, view it on GitHub, or unsubscribe.
You are receiving this because you authored the thread.Message ID: ***@***.***&gt;

GitHub link: https://github.com/apache/pulsar/discussions/17978#discussioncomment-3835855

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org


[GitHub] [pulsar] liangyuanpeng edited a comment on the discussion: what time merge about this version ?

Posted by GitBox <gi...@apache.org>.
GitHub user liangyuanpeng edited a comment on the discussion: what time merge about this version ?

~~Seems like the release/2.11.0 plan have not include it but it exist in milestone 2.11.0.  ~~

~~PR: https://github.com/apache/pulsar/pull/16817~~

~~/cc @codelipenghui ~~

GitHub link: https://github.com/apache/pulsar/discussions/17978#discussioncomment-3835798

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org


[GitHub] [pulsar] liangyuanpeng edited a comment on the discussion: what time merge about this version ?

Posted by GitBox <gi...@apache.org>.
GitHub user liangyuanpeng edited a comment on the discussion: what time merge about this version ?

~~Seems like the release/2.11.0 plan have not include it but it exist in milestone 2.11.0.  ~~

~~PR: https://github.com/apache/pulsar/pull/16817~~



GitHub link: https://github.com/apache/pulsar/discussions/17978#discussioncomment-3835798

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org


[GitHub] [pulsar] startjava edited a comment on the discussion: what time merge about this version ?

Posted by GitBox <gi...@apache.org>.
GitHub user startjava edited a comment on the discussion: what time merge about this version ?

> `pulsarAdmin.topics().getStats("").getSubscriptions().get("").getConsumers().get(index).getUnackedMessages()`

producer:
```
    @RequestMapping("sendMessage2")
    public String sendMessage2() throws IOException, PulsarAdminException {
        for (int i = 1; i <= 5; i++) {
            String messageValue = "消息" + i;
            producer2.send(messageValue);
        }
        return "成功生产2";
    }
```
    
consumer:
```
@Component
public class MyMessageListener2 implements MessageListener {
    @Override
    public void received(Consumer consumer, Message msg) {
        try {
            String messageString = new String(msg.getData());
            if (messageString.equals("消息5")) {
                consumer.acknowledgeCumulative(msg);
                System.out.println("执行了acknowledgeCumulative");
            }
        } catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
    }
}
```

getinfo:
```
    @RequestMapping("getUnAckMsgCount")
    public String getUnAckMsgCount() throws PulsarClientException, PulsarAdminException {
        {
            List<? extends ConsumerStats> consumers = pulsarAdmin.topics().getStats(myTopic1).getSubscriptions().get(myTopic1_subscriptionName).getConsumers();
            for (int i = 0; i < consumers.size(); i++) {
                ConsumerStats consumerStats = consumers.get(i);
                System.out.println(myTopic1 + " getConsumerName=" + consumerStats.getConsumerName() + " getUnackedMessages=" + consumerStats.getUnackedMessages());
            }
        }
        System.out.println();
        {
            List<? extends ConsumerStats> consumers = pulsarAdmin.topics().getStats(myTopic2).getSubscriptions().get(myTopic2_subscriptionName).getConsumers();
            for (int i = 0; i < consumers.size(); i++) {
                ConsumerStats consumerStats = consumers.get(i);
                System.out.println(myTopic2 + " getConsumerName=" + consumerStats.getConsumerName() + " getUnackedMessages=" + consumerStats.getUnackedMessages());
            }
        }
        return "成功获得";
    }
}
```

result:
```
myTopic1 getConsumerName=消费者1 getUnackedMessages=0

myTopic2 getConsumerName=消费者2 getUnackedMessages=0

```
run acknowledge or acknowledgeCumulative method , getUnackedMessages result both 0 .

why ?? thank you !



GitHub link: https://github.com/apache/pulsar/discussions/17978#discussioncomment-3835999

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org


[GitHub] [pulsar] startjava added a comment to the discussion: what time merge about this version ?

Posted by GitBox <gi...@apache.org>.
GitHub user startjava added a comment to the discussion: what time merge about this version ?

> `pulsarAdmin.topics().getStats("").getSubscriptions().get("").getConsumers().get(index).getUnackedMessages()`

producer:
    @RequestMapping("sendMessage2")
    public String sendMessage2() throws IOException, PulsarAdminException {
        for (int i = 1; i <= 5; i++) {
            String messageValue = "消息" + i;
            producer2.send(messageValue);
        }
        return "成功生产2";
    }
    
consumer:
@Component
public class MyMessageListener2 implements MessageListener {
    @Override
    public void received(Consumer consumer, Message msg) {
        try {
            String messageString = new String(msg.getData());
            if (messageString.equals("消息5")) {
                consumer.acknowledgeCumulative(msg);
                System.out.println("执行了acknowledgeCumulative");
            }
        } catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
    }
}

getinfo:
    @RequestMapping("getUnAckMsgCount")
    public String getUnAckMsgCount() throws PulsarClientException, PulsarAdminException {
        {
            List<? extends ConsumerStats> consumers = pulsarAdmin.topics().getStats(myTopic1).getSubscriptions().get(myTopic1_subscriptionName).getConsumers();
            for (int i = 0; i < consumers.size(); i++) {
                ConsumerStats consumerStats = consumers.get(i);
                System.out.println(myTopic1 + " getConsumerName=" + consumerStats.getConsumerName() + " getUnackedMessages=" + consumerStats.getUnackedMessages());
            }
        }
        System.out.println();
        {
            List<? extends ConsumerStats> consumers = pulsarAdmin.topics().getStats(myTopic2).getSubscriptions().get(myTopic2_subscriptionName).getConsumers();
            for (int i = 0; i < consumers.size(); i++) {
                ConsumerStats consumerStats = consumers.get(i);
                System.out.println(myTopic2 + " getConsumerName=" + consumerStats.getConsumerName() + " getUnackedMessages=" + consumerStats.getUnackedMessages());
            }
        }
        return "成功获得";
    }
}

result:
myTopic1 getConsumerName=消费者1 getUnackedMessages=0

myTopic2 getConsumerName=消费者2 getUnackedMessages=0

run acknowledge or acknowledgeCumulative method , getUnackedMessages result both 0 .

why ?? thank you !



GitHub link: https://github.com/apache/pulsar/discussions/17978#discussioncomment-3835999

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org