You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/01/26 00:47:31 UTC
[GitHub] [pulsar-client-go] flowchartsman opened a new issue #452: MetadataError: Consumer not found when instantiating a reader with StartMessageID: pulsar.LatestMessageID()
flowchartsman opened a new issue #452:
URL: https://github.com/apache/pulsar-client-go/issues/452
I am creating a reader to read backwards in time off of a topic, and I have noticed that I receive the above error if I seek by time after creating the reader with `StartMessageID: pulsar.LatestMessageID()` if I use `pulsar.EarliestMessageID()` it seems to behave correctly:
```
../countconsumer localhost:6651 testten testnam testtopic
new client
persistent://testten/testnam/testtopic
new reader
INFO[0000] [Connecting to broker] remote_addr="pulsar://localhost:6651"
INFO[0000] [TCP connection established] local_addr="127.0.0.1:36588" remote_addr="pulsar://localhost:6651"
INFO[0000] [Connection is ready] local_addr="127.0.0.1:36588" remote_addr="pulsar://localhost:6651"
INFO[0000] [Connecting to broker] remote_addr="pulsar://localhost:6651"
INFO[0000] [TCP connection established] local_addr="127.0.0.1:36596" remote_addr="pulsar://localhost:6651"
INFO[0000] [Connection is ready] local_addr="127.0.0.1:36596" remote_addr="pulsar://localhost:6651"
INFO[0000] [Connected consumer] consumerID=1 name= subscription=reader-ruqtj topic="persistent://testten/testnam/testtopic"
INFO[0000] [Created consumer] consumerID=1 name= subscription=reader-ruqtj topic="persistent://testten/testnam/testtopic"
new seek
INFO[0000] Broker notification of Closed consumer: [1] local_addr="127.0.0.1:36596" remote_addr="pulsar://localhost:6651"
INFO[0000] [Reconnecting to broker in 100ms] consumerID=1 name= subscription=reader-ruqtj topic="persistent://testten/testnam/testtopic"
ERRO[0000] [Failed to get last message id] consumerID=1 error="server error: MetadataError: Consumer not found" name= subscription=reader-ruqtj topic="persistent://testten/testnam/testtopic"
ERRO[0000] [Failed to get last message id from broker] error="server error: MetadataError: Consumer not found" topic="persistent://testten/testnam/testtopic"
```
Code attached.
```go
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"strings"
"time"
"github.com/dustin/go-humanize"
"go.uber.org/atomic"
"github.com/apache/pulsar-client-go/pulsar"
)
// TopicName returns a fully-qualified pulsar topic string
func TopicName(tenant, namespace, topic string) string {
return fmt.Sprintf("persistent://%s/%s/%s", tenant, namespace, topic)
}
func main() {
log.SetFlags(0)
log.Println("new client")
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://" + os.Args[1],
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
topic := TopicName(os.Args[2], os.Args[3], os.Args[4])
log.Println(topic)
log.Println("new reader")
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: topic,
StartMessageID: pulsar.LatestMessageID(),
})
if err != nil {
log.Fatal(err)
}
log.Println("new seek")
time.Sleep(100 * time.Millisecond)
reader.SeekByTime(time.Now().Add(-72 * time.Hour))
time.Sleep(100 * time.Millisecond)
ctx, cFunc := context.WithCancel(context.Background())
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, os.Kill)
go func() {
<-sig
cFunc()
}()
var maxPayloadSize int
var count atomic.Uint32
go func() {
t := time.NewTicker(3 * time.Second)
for {
<-t.C
log.Printf("entries: %d\n", count.Load())
}
}()
MainLoop:
for {
select {
case <-ctx.Done():
log.Println("closing gracefully")
reader.Close()
break MainLoop
default:
// get message
}
if reader.HasNext() {
msg, err := reader.Next(ctx)
if err != nil {
log.Println("consumer receive error:", err)
continue
}
count.Add(1)
if len(msg.Payload()) > maxPayloadSize {
maxPayloadSize = len(msg.Payload())
h := humanize.Bytes(uint64(maxPayloadSize))
log.Printf("new max payload size: %d (%s)\n", maxPayloadSize, h)
filename := strings.ReplaceAll(fmt.Sprintf("%s-%s", os.Args[4], h), " ", "")
out, _ := os.Create(filename)
out.Write(msg.Payload())
if err := out.Close(); err != nil {
log.Fatal(err)
}
}
} else {
log.Println("done!")
break MainLoop
}
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] longtengz commented on issue #452: MetadataError: Consumer not found when instantiating a reader with StartMessageID: pulsar.LatestMessageID()
Posted by GitBox <gi...@apache.org>.
longtengz commented on issue #452:
URL: https://github.com/apache/pulsar-client-go/issues/452#issuecomment-870323570
I have the same error
```
ERRO[0027] [Failed to get last message id from broker] error="server error: MetadataError: Consumer not found" topic="persistent://public/default/xxx"
ERRO[0027] [Failed to get last message id] consumerID=1 error="server error: MetadataError: Consumer not found" name= subscription=reader-fjjyc topic="persistent://public/default/xxx"
```
even if I set `StartMessageID` to `pulsar.EarliestMessageID()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] longtengz commented on issue #452: MetadataError: Consumer not found when instantiating a reader with StartMessageID: pulsar.LatestMessageID()
Posted by GitBox <gi...@apache.org>.
longtengz commented on issue #452:
URL: https://github.com/apache/pulsar-client-go/issues/452#issuecomment-870323570
I have the same error
```
ERRO[0027] [Failed to get last message id from broker] error="server error: MetadataError: Consumer not found" topic="persistent://public/default/xxx"
ERRO[0027] [Failed to get last message id] consumerID=1 error="server error: MetadataError: Consumer not found" name= subscription=reader-fjjyc topic="persistent://public/default/xxx"
```
even if I set `StartMessageID` to `pulsar.EarliestMessageID()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] flowchartsman commented on issue #452: MetadataError: Consumer not found when instantiating a reader with StartMessageID: pulsar.LatestMessageID()
Posted by GitBox <gi...@apache.org>.
flowchartsman commented on issue #452:
URL: https://github.com/apache/pulsar-client-go/issues/452#issuecomment-819237890
Any progress on this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] flowchartsman removed a comment on issue #452: MetadataError: Consumer not found when instantiating a reader with StartMessageID: pulsar.LatestMessageID()
Posted by GitBox <gi...@apache.org>.
flowchartsman removed a comment on issue #452:
URL: https://github.com/apache/pulsar-client-go/issues/452#issuecomment-819237890
Any progress on this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] brzyangg commented on issue #452: MetadataError: Consumer not found when instantiating a reader with StartMessageID: pulsar.LatestMessageID()
Posted by GitBox <gi...@apache.org>.
brzyangg commented on issue #452:
URL: https://github.com/apache/pulsar-client-go/issues/452#issuecomment-768763291
+1
The value of StartMessageID is always `newMessageID(math.MaxInt64, math.MaxInt64, -1, -1)`
Cause the Reader to set StartMessageID is LeastMessageID and the message cannot be read
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org