You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/08/08 07:42:16 UTC

[GitHub] [pulsar-client-go] k-pisey opened a new issue #583: Reader can not read next coming messages when start with LatestMessageID

k-pisey opened a new issue #583:
URL: https://github.com/apache/pulsar-client-go/issues/583


   #### Expected behavior
   
   It should be able to receive messages where published afterward, Message payload should be printed.
   #### Actual behavior
   Nothing received by reader, seems no message being published.
   
   #### Steps to reproduce
   ```go
   package main
   
   import (
   	"context"
   	"fmt"
   	"github.com/apache/pulsar-client-go/pulsar"
   	"log"
   	"os"
   )
   
   func main() {
   	client, err := pulsar.NewClient(pulsar.ClientOptions{
   		URL: "pulsar://localhost:6650",
   	})
   
   	if err != nil {
   		log.Fatal(err)
   	}
   	defer client.Close()
   
   	reader, err := client.CreateReader(pulsar.ReaderOptions{
   		Topic:                   "my-topic",
   		StartMessageID:          pulsar.LatestMessageID(),
   	})
   	if err != nil {
   		fmt.Fprintln(os.Stderr, "error creating reader for topic", err)
   		os.Exit(1)
   	}
   	defer reader.Close()
   
   	producer, err := client.CreateProducer(pulsar.ProducerOptions{
   		Topic: "my-topic",
   	})
   
   	if err != nil {
   		log.Fatal(err)
   	}
   	defer producer.Close()
   
   	for i := 0; i < 100; i++ {
   		_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
   			Payload: []byte{byte(i)},
   		})
   		if err != nil {
   			log.Fatal(err)
   		}
   	}
   	for {
   		if reader.HasNext() {
   			msg, err := reader.Next(context.Background())
   			if err != nil {
   				fmt.Fprintln(os.Stderr, "error reading message", err)
   				os.Exit(1)
   			}
   			fmt.Fprintln(os.Stdout, "Message payload: ", msg.Payload())
   		} else {
   			break
   		}
   	}
   }
   ```
   
   
   #### System configuration
   **Pulsar version**: x.y
   - pulsar:2.7.1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on issue #583: Reader can not read next coming messages when start with LatestMessageID

Posted by GitBox <gi...@apache.org>.
cckellogg commented on issue #583:
URL: https://github.com/apache/pulsar-client-go/issues/583#issuecomment-895568802


   How are you publishing messages? Do you have retention configured for the topic or a subscription associated with it? The default behavior is to discard messages on a topic unless retention is configured or there is a subscription for that topic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] flowchartsman edited a comment on issue #583: Reader can not read next coming messages when start with LatestMessageID

Posted by GitBox <gi...@apache.org>.
flowchartsman edited a comment on issue #583:
URL: https://github.com/apache/pulsar-client-go/issues/583#issuecomment-926305205


   I was also seeing this issue with the following test program
   
   ```go
   package main
   
   import (
   	"context"
   	"errors"
   	"flag"
   	"fmt"
   	"log"
   	"os"
   	"os/signal"
   	"time"
   
   	"github.com/apache/pulsar-client-go/pulsar"
   )
   
   func main() {
   	log.SetFlags(0)
   	broker := flag.String("broker", "localhost:6650", "broker in the form of host:port")
   	tail := flag.Bool("f", false, "tail the topic")
   	jump := flag.String("j", "latest", "where to jump to [earliest|latest|-1h]")
   	flag.Parse()
   	flag.CommandLine.Args()
   	if len(flag.CommandLine.Args()) != 1 {
   		log.Fatal("usage: ./reader [flags] \"<tenant>/<namespace>/<topic>\"")
   	}
   
   	client, err := pulsar.NewClient(pulsar.ClientOptions{
   		URL: "pulsar://" + *broker,
   	})
   	if err != nil {
   		log.Fatal(err)
   	}
   	defer client.Close()
   
   	topic := "persistent://" + flag.CommandLine.Args()[0]
   	log.Println("reading from", topic)
   
   	log.Println("new reader")
   	var whence pulsar.MessageID
   	var jumpwindow time.Duration
   
   	switch *jump {
   	case "latest":
   		whence = pulsar.LatestMessageID()
   		log.Println("\"latest\" tails by default")
   		*tail = true
   		// check no tail
   	case "earliest":
   		whence = pulsar.EarliestMessageID()
   	default:
   		whence = pulsar.LatestMessageID()
   		dur, err := time.ParseDuration(*jump)
   		if err != nil {
   			log.Fatalf("invalid jump %q, expecting \"earliest\", \"latest\" or negative time duration", *jump)
   		}
   		if dur > -1*time.Minute {
   			log.Fatalf("jump duration must be at least -1m and cannot be positive. You gave %q", *jump)
   		}
   		jumpwindow = dur
   	}
   	reader, err := client.CreateReader(pulsar.ReaderOptions{
   		Topic:                   topic,
   		StartMessageIDInclusive: true,
   		StartMessageID:          whence,
   	})
   	if err != nil {
   		log.Fatal(err)
   	}
   
   	if jumpwindow != 0 {
   		reader.SeekByTime(time.Now().Add(jumpwindow))
   	}
   	// log.Println("new seek")
   	// time.Sleep(100 * time.Millisecond)
   	// reader.SeekByTime(time.Now().Add(-72 * time.Hour))
   	// time.Sleep(100 * time.Millisecond)
   
   	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
   	defer cancel()
   
   	go func() {
   		<-ctx.Done()
   		time.Sleep(3 * time.Second)
   		log.Fatal("timed out waiting for consumer to die")
   	}()
   
   	var next func() bool
   	if *tail {
   		next = func() bool { return true }
   	} else {
   		next = reader.HasNext
   	}
   
   	for next() {
   		msg, err := reader.Next(ctx)
   		if err != nil {
   			if errors.Is(err, context.Canceled) {
   				log.Fatal("exited")
   			}
   			log.Fatalf("consumer receive error: %v", err)
   		}
   		fmt.Println(string(msg.Payload()))
   	}
   }
   ```
   
   If I used `-f` and `-j earliest`, the reader worked as intended and I saw the messages and new ones print out.  If I used `-j latest`, nothing came through, however I rebuilt the project and it strangely started working again, so reproduction is flakey. Leaving the code here in case anyone else is seeing it and wants to reproduce.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] flowchartsman commented on issue #583: Reader can not read next coming messages when start with LatestMessageID

Posted by GitBox <gi...@apache.org>.
flowchartsman commented on issue #583:
URL: https://github.com/apache/pulsar-client-go/issues/583#issuecomment-926305205


   i'm also seeing this issue with the following test program
   
   ```go
   package main
   
   import (
   	"context"
   	"errors"
   	"flag"
   	"fmt"
   	"log"
   	"os"
   	"os/signal"
   	"time"
   
   	"github.com/apache/pulsar-client-go/pulsar"
   )
   
   func main() {
   	log.SetFlags(0)
   	broker := flag.String("broker", "localhost:6650", "broker in the form of host:port")
   	tail := flag.Bool("f", false, "tail the topic")
   	jump := flag.String("j", "latest", "where to jump to [earliest|latest|-1h]")
   	flag.Parse()
   	flag.CommandLine.Args()
   	if len(flag.CommandLine.Args()) != 1 {
   		log.Fatal("usage: ./reader [flags] \"<tenant>/<namespace>/<topic>\"")
   	}
   
   	client, err := pulsar.NewClient(pulsar.ClientOptions{
   		URL: "pulsar://" + *broker,
   	})
   	if err != nil {
   		log.Fatal(err)
   	}
   	defer client.Close()
   
   	topic := "persistent://" + flag.CommandLine.Args()[0]
   	log.Println("reading from", topic)
   
   	log.Println("new reader")
   	var whence pulsar.MessageID
   	var jumpwindow time.Duration
   
   	switch *jump {
   	case "latest":
   		whence = pulsar.LatestMessageID()
   		log.Println("\"latest\" tails by default")
   		*tail = true
   		// check no tail
   	case "earliest":
   		whence = pulsar.EarliestMessageID()
   	default:
   		whence = pulsar.LatestMessageID()
   		dur, err := time.ParseDuration(*jump)
   		if err != nil {
   			log.Fatalf("invalid jump %q, expecting \"earliest\", \"latest\" or negative time duration", *jump)
   		}
   		if dur > -1*time.Minute {
   			log.Fatalf("jump duration must be at least -1m and cannot be positive. You gave %q", *jump)
   		}
   		jumpwindow = dur
   	}
   	reader, err := client.CreateReader(pulsar.ReaderOptions{
   		Topic:                   topic,
   		StartMessageIDInclusive: true,
   		StartMessageID:          whence,
   	})
   	if err != nil {
   		log.Fatal(err)
   	}
   
   	if jumpwindow != 0 {
   		reader.SeekByTime(time.Now().Add(jumpwindow))
   	}
   	// log.Println("new seek")
   	// time.Sleep(100 * time.Millisecond)
   	// reader.SeekByTime(time.Now().Add(-72 * time.Hour))
   	// time.Sleep(100 * time.Millisecond)
   
   	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
   	defer cancel()
   
   	go func() {
   		<-ctx.Done()
   		time.Sleep(3 * time.Second)
   		log.Fatal("timed out waiting for consumer to die")
   	}()
   
   	var next func() bool
   	if *tail {
   		next = func() bool { return true }
   	} else {
   		next = reader.HasNext
   	}
   
   	for next() {
   		msg, err := reader.Next(ctx)
   		if err != nil {
   			if errors.Is(err, context.Canceled) {
   				log.Fatal("exited")
   			}
   			log.Fatalf("consumer receive error: %v", err)
   		}
   		fmt.Println(string(msg.Payload()))
   	}
   }
   ```
   
   If I use `-f` and `-j earliest`, the reader works as intended and I see the messages and new ones print out.  If I use `-j latest`, nothing comes through.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] k-pisey commented on issue #583: Reader can not read next coming messages when start with LatestMessageID

Posted by GitBox <gi...@apache.org>.
k-pisey commented on issue #583:
URL: https://github.com/apache/pulsar-client-go/issues/583#issuecomment-895667112


   Publishing and reading messages happen in a single program attached above, reader was created before producer therefore there is a subscription associated with the given topic, retention was not configured.
   
   Additional to above configuration, to make it clear regarding subscription,  I established a consuming to the given topic via `admin cli` then start the program. I only got messages at consuming terminal.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org