You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/09/20 03:43:01 UTC
[GitHub] [pulsar] stevenwangnarvar opened a new issue #5235: lost message
when not ack in partitioned topics in go client
stevenwangnarvar opened a new issue #5235: lost message when not ack in partitioned topics in go client
URL: https://github.com/apache/pulsar/issues/5235
**Describe the bug**
try to test this fix https://github.com/apache/pulsar/pull/4653, but found have message lost during re-process messages which are not acknowledgment.
**To Reproduce**
Steps to reproduce the behavior:
1. Create a partitioned topic
2. run a test go program, send several messages, and receive them but not ack them.
3. Check the received message, check if any messages are lost.
**Expected behavior**
No message should lost.
**Screenshots**
<img width="979" alt="Screen Shot 2019-09-20 at 1 25 10 PM" src="https://user-images.githubusercontent.com/49885838/65297627-b2b43580-dbbc-11e9-9739-531696a82580.png">
<img width="798" alt="Screen Shot 2019-09-20 at 1 24 59 PM" src="https://user-images.githubusercontent.com/49885838/65297632-b8aa1680-dbbc-11e9-8e31-eaf151596201.png">
**Desktop (please complete the following information):**
- OS: Mac
**Additional context**
Queue info:
```
/pulsar-admin topics create-partitioned-topic --partitions 4 persistent://dev/genericfileproc/npulsar_test_output_topic
```
Test code
```
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/apache/pulsar/pulsar-client-go/pulsar"
)
func main() {
topic := "persistent://dev/genericfileproc/npulsar_test_output_topic"
// Instantiate a Pulsar client
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
// Use the client to instantiate a producer
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
})
if err != nil {
log.Fatal(err)
}
// Use the client object to instantiate a consumer
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: "gfp-input-carrier-flat-file",
Type: pulsar.Shared,
AckTimeout: 15 * time.Second,
})
ctx := context.Background()
// Send 5 messages synchronously
for i := 0; i < 5; i++ {
// Create a message
msg := pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("message-%d", i)),
}
// Attempt to send the message
if err := producer.Send(ctx, msg); err != nil {
log.Fatal(err)
}
}
// Listen indefinitely on the topic
for {
msg, err := consumer.Receive(ctx)
if err != nil {
log.Fatal(err)
}
// Do something with the message
fmt.Println("" + time.Now().Format(time.RFC850) + " consume " + string(msg.Payload()))
//consumer.Ack(msg)
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services