You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/09/20 03:43:01 UTC

[GitHub] [pulsar] stevenwangnarvar opened a new issue #5235: lost message when not ack in partitioned topics in go client

stevenwangnarvar opened a new issue #5235: lost message when not ack in partitioned topics in go client
URL: https://github.com/apache/pulsar/issues/5235
 
 
   **Describe the bug**
   try to test this fix https://github.com/apache/pulsar/pull/4653, but found have message lost during re-process messages which are not acknowledgment.
   
   **To Reproduce**
   Steps to reproduce the behavior:
   1. Create a partitioned topic
   2. run a test go program, send several messages, and receive them but not ack them.
   3. Check the received message, check if any messages are lost.
   
   **Expected behavior**
   No message should lost.
   
   **Screenshots**
   <img width="979" alt="Screen Shot 2019-09-20 at 1 25 10 PM" src="https://user-images.githubusercontent.com/49885838/65297627-b2b43580-dbbc-11e9-9739-531696a82580.png">
   <img width="798" alt="Screen Shot 2019-09-20 at 1 24 59 PM" src="https://user-images.githubusercontent.com/49885838/65297632-b8aa1680-dbbc-11e9-8e31-eaf151596201.png">
   
   
   **Desktop (please complete the following information):**
    - OS: Mac
   
   **Additional context**
   Queue info:
   ```
   /pulsar-admin topics create-partitioned-topic --partitions 4 persistent://dev/genericfileproc/npulsar_test_output_topic
   ```
   Test code 
   ```
   package main
   
   import (
   	"context"
   	"fmt"
   	"log"
   	"time"
   
   	"github.com/apache/pulsar/pulsar-client-go/pulsar"
   )
   
   func main() {
   	topic := "persistent://dev/genericfileproc/npulsar_test_output_topic"
   
   	// Instantiate a Pulsar client
   	client, err := pulsar.NewClient(pulsar.ClientOptions{
   		URL: "pulsar://localhost:6650",
   	})
   
   	if err != nil {
   		log.Fatal(err)
   	}
   
   	// Use the client to instantiate a producer
   	producer, err := client.CreateProducer(pulsar.ProducerOptions{
   		Topic: topic,
   	})
   
   	if err != nil {
   		log.Fatal(err)
   	}
   
   	// Use the client object to instantiate a consumer
   	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
   		Topic:            topic,
   		SubscriptionName: "gfp-input-carrier-flat-file",
   		Type:             pulsar.Shared,
   		AckTimeout:       15 * time.Second,
   	})
   
   	ctx := context.Background()
   	// Send 5 messages synchronously
   	for i := 0; i < 5; i++ {
   		// Create a message
   		msg := pulsar.ProducerMessage{
   			Payload: []byte(fmt.Sprintf("message-%d", i)),
   		}
   		// Attempt to send the message
   		if err := producer.Send(ctx, msg); err != nil {
   			log.Fatal(err)
   		}
   	}
   
   	// Listen indefinitely on the topic
   	for {
   		msg, err := consumer.Receive(ctx)
   		if err != nil {
   			log.Fatal(err)
   		}
   		// Do something with the message
   		fmt.Println("" + time.Now().Format(time.RFC850) + " consume " + string(msg.Payload()))
   		//consumer.Ack(msg)
   	}
   }
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services