You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/01/26 00:47:31 UTC

[GitHub] [pulsar-client-go] flowchartsman opened a new issue #452: MetadataError: Consumer not found when instantiating a reader with StartMessageID: pulsar.LatestMessageID()

flowchartsman opened a new issue #452:
URL: https://github.com/apache/pulsar-client-go/issues/452


   I am creating a reader to read backwards in time off of a topic, and I have noticed that I receive the above error if I seek by time after creating the reader with `StartMessageID: pulsar.LatestMessageID()` if I use `pulsar.EarliestMessageID()` it seems to behave correctly:
   
   ```
   ../countconsumer localhost:6651 testten testnam testtopic
   new client
   persistent://testten/testnam/testtopic
   new reader
   INFO[0000] [Connecting to broker]                        remote_addr="pulsar://localhost:6651"
   INFO[0000] [TCP connection established]                  local_addr="127.0.0.1:36588" remote_addr="pulsar://localhost:6651"
   INFO[0000] [Connection is ready]                         local_addr="127.0.0.1:36588" remote_addr="pulsar://localhost:6651"
   INFO[0000] [Connecting to broker]                        remote_addr="pulsar://localhost:6651"
   INFO[0000] [TCP connection established]                  local_addr="127.0.0.1:36596" remote_addr="pulsar://localhost:6651"
   INFO[0000] [Connection is ready]                         local_addr="127.0.0.1:36596" remote_addr="pulsar://localhost:6651"
   INFO[0000] [Connected consumer]                          consumerID=1 name= subscription=reader-ruqtj topic="persistent://testten/testnam/testtopic"
   INFO[0000] [Created consumer]                            consumerID=1 name= subscription=reader-ruqtj topic="persistent://testten/testnam/testtopic"
   new seek
   INFO[0000] Broker notification of Closed consumer: [1]   local_addr="127.0.0.1:36596" remote_addr="pulsar://localhost:6651"
   INFO[0000] [Reconnecting to broker in  100ms]            consumerID=1 name= subscription=reader-ruqtj topic="persistent://testten/testnam/testtopic"
   ERRO[0000] [Failed to get last message id]               consumerID=1 error="server error: MetadataError: Consumer not found" name= subscription=reader-ruqtj topic="persistent://testten/testnam/testtopic"
   ERRO[0000] [Failed to get last message id from broker]   error="server error: MetadataError: Consumer not found" topic="persistent://testten/testnam/testtopic"
   ```
   
   Code attached.
   
   ```go
   package main
   
   import (
   	"context"
   	"fmt"
   	"log"
   	"os"
   	"os/signal"
   	"strings"
   	"time"
   
   	"github.com/dustin/go-humanize"
   	"go.uber.org/atomic"
   
   	"github.com/apache/pulsar-client-go/pulsar"
   )
   
   // TopicName returns a fully-qualified pulsar topic string
   func TopicName(tenant, namespace, topic string) string {
   	return fmt.Sprintf("persistent://%s/%s/%s", tenant, namespace, topic)
   }
   
   func main() {
   	log.SetFlags(0)
   
   	log.Println("new client")
   	client, err := pulsar.NewClient(pulsar.ClientOptions{
   		URL: "pulsar://" + os.Args[1],
   	})
   	if err != nil {
   		log.Fatal(err)
   	}
   	defer client.Close()
   
   	topic := TopicName(os.Args[2], os.Args[3], os.Args[4])
   	log.Println(topic)
   
   	log.Println("new reader")
   	reader, err := client.CreateReader(pulsar.ReaderOptions{
   		Topic: topic,
   		StartMessageID:          pulsar.LatestMessageID(),
   	})
   	if err != nil {
   		log.Fatal(err)
   	}
   	log.Println("new seek")
   	time.Sleep(100 * time.Millisecond)
   	reader.SeekByTime(time.Now().Add(-72 * time.Hour))
   	time.Sleep(100 * time.Millisecond)
   
   	ctx, cFunc := context.WithCancel(context.Background())
   	sig := make(chan os.Signal, 1)
   	signal.Notify(sig, os.Interrupt, os.Kill)
   	go func() {
   		<-sig
   		cFunc()
   	}()
   
   	var maxPayloadSize int
   	var count atomic.Uint32
   	go func() {
   		t := time.NewTicker(3 * time.Second)
   		for {
   			<-t.C
   			log.Printf("entries: %d\n", count.Load())
   		}
   	}()
   MainLoop:
   	for {
   		select {
   		case <-ctx.Done():
   			log.Println("closing gracefully")
   			reader.Close()
   			break MainLoop
   		default:
   			// get message
   		}
   		if reader.HasNext() {
   			msg, err := reader.Next(ctx)
   			if err != nil {
   				log.Println("consumer receive error:", err)
   				continue
   			}
   			count.Add(1)
   
   			if len(msg.Payload()) > maxPayloadSize {
   				maxPayloadSize = len(msg.Payload())
   				h := humanize.Bytes(uint64(maxPayloadSize))
   				log.Printf("new max payload size: %d (%s)\n", maxPayloadSize, h)
   				filename := strings.ReplaceAll(fmt.Sprintf("%s-%s", os.Args[4], h), " ", "")
   				out, _ := os.Create(filename)
   				out.Write(msg.Payload())
   				if err := out.Close(); err != nil {
   					log.Fatal(err)
   				}
   			}
   
   		} else {
   			log.Println("done!")
   			break MainLoop
   		}
   	}
   }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] longtengz commented on issue #452: MetadataError: Consumer not found when instantiating a reader with StartMessageID: pulsar.LatestMessageID()

Posted by GitBox <gi...@apache.org>.
longtengz commented on issue #452:
URL: https://github.com/apache/pulsar-client-go/issues/452#issuecomment-870323570


   I have the same error 
   
   ```
   ERRO[0027] [Failed to get last message id from broker]   error="server error: MetadataError: Consumer not found" topic="persistent://public/default/xxx"
   ERRO[0027] [Failed to get last message id]               consumerID=1 error="server error: MetadataError: Consumer not found" name= subscription=reader-fjjyc topic="persistent://public/default/xxx"
   ```
   
   even if I set `StartMessageID` to `pulsar.EarliestMessageID()`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] longtengz commented on issue #452: MetadataError: Consumer not found when instantiating a reader with StartMessageID: pulsar.LatestMessageID()

Posted by GitBox <gi...@apache.org>.
longtengz commented on issue #452:
URL: https://github.com/apache/pulsar-client-go/issues/452#issuecomment-870323570


   I have the same error 
   
   ```
   ERRO[0027] [Failed to get last message id from broker]   error="server error: MetadataError: Consumer not found" topic="persistent://public/default/xxx"
   ERRO[0027] [Failed to get last message id]               consumerID=1 error="server error: MetadataError: Consumer not found" name= subscription=reader-fjjyc topic="persistent://public/default/xxx"
   ```
   
   even if I set `StartMessageID` to `pulsar.EarliestMessageID()`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] flowchartsman commented on issue #452: MetadataError: Consumer not found when instantiating a reader with StartMessageID: pulsar.LatestMessageID()

Posted by GitBox <gi...@apache.org>.
flowchartsman commented on issue #452:
URL: https://github.com/apache/pulsar-client-go/issues/452#issuecomment-819237890


   Any progress on this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] flowchartsman removed a comment on issue #452: MetadataError: Consumer not found when instantiating a reader with StartMessageID: pulsar.LatestMessageID()

Posted by GitBox <gi...@apache.org>.
flowchartsman removed a comment on issue #452:
URL: https://github.com/apache/pulsar-client-go/issues/452#issuecomment-819237890


   Any progress on this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] brzyangg commented on issue #452: MetadataError: Consumer not found when instantiating a reader with StartMessageID: pulsar.LatestMessageID()

Posted by GitBox <gi...@apache.org>.
brzyangg commented on issue #452:
URL: https://github.com/apache/pulsar-client-go/issues/452#issuecomment-768763291


   +1
   The value of StartMessageID is always `newMessageID(math.MaxInt64, math.MaxInt64, -1, -1)`
   Cause the Reader to set StartMessageID is LeastMessageID and the message cannot be read


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org