You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/24 03:00:54 UTC
[GitHub] [pulsar-client-go] flowchartsman commented on issue #583: Reader can not read next coming messages when start with LatestMessageID
flowchartsman commented on issue #583:
URL: https://github.com/apache/pulsar-client-go/issues/583#issuecomment-926305205
i'm also seeing this issue with the following test program
```go
package main
import (
"context"
"errors"
"flag"
"fmt"
"log"
"os"
"os/signal"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)
func main() {
log.SetFlags(0)
broker := flag.String("broker", "localhost:6650", "broker in the form of host:port")
tail := flag.Bool("f", false, "tail the topic")
jump := flag.String("j", "latest", "where to jump to [earliest|latest|-1h]")
flag.Parse()
flag.CommandLine.Args()
if len(flag.CommandLine.Args()) != 1 {
log.Fatal("usage: ./reader [flags] \"<tenant>/<namespace>/<topic>\"")
}
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://" + *broker,
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
topic := "persistent://" + flag.CommandLine.Args()[0]
log.Println("reading from", topic)
log.Println("new reader")
var whence pulsar.MessageID
var jumpwindow time.Duration
switch *jump {
case "latest":
whence = pulsar.LatestMessageID()
log.Println("\"latest\" tails by default")
*tail = true
// check no tail
case "earliest":
whence = pulsar.EarliestMessageID()
default:
whence = pulsar.LatestMessageID()
dur, err := time.ParseDuration(*jump)
if err != nil {
log.Fatalf("invalid jump %q, expecting \"earliest\", \"latest\" or negative time duration", *jump)
}
if dur > -1*time.Minute {
log.Fatalf("jump duration must be at least -1m and cannot be positive. You gave %q", *jump)
}
jumpwindow = dur
}
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: topic,
StartMessageIDInclusive: true,
StartMessageID: whence,
})
if err != nil {
log.Fatal(err)
}
if jumpwindow != 0 {
reader.SeekByTime(time.Now().Add(jumpwindow))
}
// log.Println("new seek")
// time.Sleep(100 * time.Millisecond)
// reader.SeekByTime(time.Now().Add(-72 * time.Hour))
// time.Sleep(100 * time.Millisecond)
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
go func() {
<-ctx.Done()
time.Sleep(3 * time.Second)
log.Fatal("timed out waiting for consumer to die")
}()
var next func() bool
if *tail {
next = func() bool { return true }
} else {
next = reader.HasNext
}
for next() {
msg, err := reader.Next(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
log.Fatal("exited")
}
log.Fatalf("consumer receive error: %v", err)
}
fmt.Println(string(msg.Payload()))
}
}
```
If I use `-f` and `-j earliest`, the reader works as intended and I see the messages and new ones print out. If I use `-j latest`, nothing comes through.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org