You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:41 UTC

[pulsar-client-go] 04/38: Create producer session

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit ccc596a716e0a15944fc3d270f842cffad8ec1fb
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat Mar 30 07:47:31 2019 -0700

    Create producer session
---
 pulsar/impl/commands.go           |   4 ++
 pulsar/impl/connection.go         |   7 ++-
 pulsar/impl/connection_pool.go    |  14 +++--
 pulsar/impl/lookup_service.go     |  34 ++++++++++--
 pulsar/impl/rpc_client.go         |  30 +++++++---
 pulsar/impl_client.go             |  10 +++-
 pulsar/impl_partition_producer.go | 112 ++++++++++++++++++++++++++++----------
 pulsar/impl_producer.go           |  50 ++++++++---------
 pulsar/message.go                 |   4 +-
 pulsar/producer.go                |   4 +-
 10 files changed, 186 insertions(+), 83 deletions(-)

diff --git a/pulsar/impl/commands.go b/pulsar/impl/commands.go
index 90760e1..16e5b60 100644
--- a/pulsar/impl/commands.go
+++ b/pulsar/impl/commands.go
@@ -24,6 +24,10 @@ func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand
 		cmd.PartitionMetadata = msg.(*pb.CommandPartitionedTopicMetadata)
 		break
 
+	case pb.BaseCommand_PRODUCER:
+		cmd.Producer = msg.(*pb.CommandProducer)
+		break
+
 	default:
 		log.Panic("Missing command type: ", cmdType)
 	}
diff --git a/pulsar/impl/connection.go b/pulsar/impl/connection.go
index 6379f6a..28871cb 100644
--- a/pulsar/impl/connection.go
+++ b/pulsar/impl/connection.go
@@ -5,6 +5,7 @@ import (
 	"github.com/golang/protobuf/proto"
 	log "github.com/sirupsen/logrus"
 	"net"
+	"net/url"
 	pb "pulsar-client-go-native/pulsar/pulsar_proto"
 	"sync"
 	"sync/atomic"
@@ -51,11 +52,11 @@ type connection struct {
 	pendingReqs      map[uint64]*request
 }
 
-func newConnection(logicalAddr string, physicalAddr string) *connection {
+func newConnection(logicalAddr *url.URL, physicalAddr *url.URL) *connection {
 	cnx := &connection{
 		state:        connectionInit,
-		logicalAddr:  logicalAddr,
-		physicalAddr: physicalAddr,
+		logicalAddr:  logicalAddr.Host,
+		physicalAddr: physicalAddr.Host,
 		writeBuffer:  NewBuffer(4096),
 		log:          log.WithField("raddr", physicalAddr),
 		pendingReqs:  make(map[uint64]*request),
diff --git a/pulsar/impl/connection_pool.go b/pulsar/impl/connection_pool.go
index 676f205..1fa647f 100644
--- a/pulsar/impl/connection_pool.go
+++ b/pulsar/impl/connection_pool.go
@@ -1,11 +1,13 @@
 package impl
 
 import (
+	log "github.com/sirupsen/logrus"
+	"net/url"
 	"sync"
 )
 
 type ConnectionPool interface {
-	GetConnection(logicalAddr string, physicalAddr string) (Connection, error)
+	GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error)
 
 	// Close all the connections in the pool
 	Close()
@@ -19,11 +21,13 @@ func NewConnectionPool() ConnectionPool {
 	return &connectionPool{}
 }
 
-func (p *connectionPool) GetConnection(logicalAddr string, physicalAddr string) (Connection, error) {
-	cachedCnx, found := p.pool.Load(logicalAddr)
+func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) {
+	cachedCnx, found := p.pool.Load(logicalAddr.Host)
 	if found {
 		cnx := cachedCnx.(*connection)
-		if err := cnx.waitUntilReady(); err != nil {
+		log.Debug("Found connection in cache:", cnx.logicalAddr, cnx.physicalAddr)
+
+		if err := cnx.waitUntilReady(); err == nil {
 			// Connection is ready to be used
 			return cnx, nil
 		} else {
@@ -33,7 +37,7 @@ func (p *connectionPool) GetConnection(logicalAddr string, physicalAddr string)
 	}
 
 	// Try to create a new connection
-	newCnx, wasCached := p.pool.LoadOrStore(logicalAddr, newConnection(logicalAddr, physicalAddr))
+	newCnx, wasCached := p.pool.LoadOrStore(logicalAddr.Host, newConnection(logicalAddr, physicalAddr))
 	cnx := newCnx.(*connection)
 	if !wasCached {
 		cnx.start()
diff --git a/pulsar/impl/lookup_service.go b/pulsar/impl/lookup_service.go
index 0ebf424..afd0dda 100644
--- a/pulsar/impl/lookup_service.go
+++ b/pulsar/impl/lookup_service.go
@@ -1,6 +1,8 @@
 package impl
 
 import (
+	"errors"
+	"fmt"
 	log "github.com/sirupsen/logrus"
 	"net/url"
 	pb "pulsar-client-go-native/pulsar/pulsar_proto"
@@ -16,18 +18,22 @@ type LookupService interface {
 }
 
 type lookupService struct {
-	rpcClient RpcClient
+	rpcClient  RpcClient
+	serviceUrl *url.URL
 }
 
-func NewLookupService(rpcClient RpcClient) LookupService {
+func NewLookupService(rpcClient RpcClient, serviceUrl *url.URL) LookupService {
 	return &lookupService{
-		rpcClient: rpcClient,
+		rpcClient:  rpcClient,
+		serviceUrl: serviceUrl,
 	}
 }
 
 func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
 	// Follow brokers redirect up to certain number of times
-	for i := 0; i < 20; i++ {
+	const lookupResultMaxRedirect = 20
+
+	for i := 0; i < lookupResultMaxRedirect; i++ {
 		id := ls.rpcClient.NewRequestId()
 		res, err := ls.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_LOOKUP, &pb.CommandLookupTopic{
 			RequestId: &id,
@@ -42,6 +48,7 @@ func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
 		lr := res.Response.LookupTopicResponse
 		switch *lr.Response {
 		case pb.CommandLookupTopicResponse_Redirect:
+			// TODO: Handle redirects
 			log.WithField("topic", topic).Infof("Follow redirect to broker. %v / %v - Use proxy: %v",
 				lr.BrokerServiceUrl, lr.BrokerServiceUrlTls, lr.ProxyThroughServiceUrl)
 			break
@@ -49,12 +56,27 @@ func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
 		case pb.CommandLookupTopicResponse_Connect:
 			log.WithField("topic", topic).Infof("Successfully looked up topic on broker. %s / %s - Use proxy: %t",
 				lr.GetBrokerServiceUrl(), lr.GetBrokerServiceUrlTls(), lr.GetProxyThroughServiceUrl())
-			return nil, nil
+
+			logicalAddress, err := url.Parse(lr.GetBrokerServiceUrl())
+			if err != nil {
+				return nil, err
+			}
+
+			var physicalAddr *url.URL
+			if lr.GetProxyThroughServiceUrl() {
+				physicalAddr = ls.serviceUrl
+			} else {
+				physicalAddr = logicalAddress
+			}
+			return &LookupResult{
+				LogicalAddr:  logicalAddress,
+				PhysicalAddr: physicalAddr,
+			}, nil
 
 		case pb.CommandLookupTopicResponse_Failed:
 			log.WithField("topic", topic).Warn("Failed to lookup topic",
 				lr.Error.String())
-			return nil, nil
+			return nil, errors.New(fmt.Sprintf("failed to lookup topic: %s", lr.Error.String()))
 		}
 	}
 
diff --git a/pulsar/impl/rpc_client.go b/pulsar/impl/rpc_client.go
index fd76569..2aa6106 100644
--- a/pulsar/impl/rpc_client.go
+++ b/pulsar/impl/rpc_client.go
@@ -17,31 +17,37 @@ type RpcClient interface {
 	// Create a new unique request id
 	NewRequestId() uint64
 
+	NewProducerId() uint64
+
+	NewConsumerId() uint64
+
 	// Send a request and block until the result is available
 	RequestToAnyBroker(requestId uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RpcResult, error)
 
-	Request(logicalAddr string, physicalAddr string, requestId uint64,
+	Request(logicalAddr *url.URL, physicalAddr *url.URL, requestId uint64,
 		cmdType pb.BaseCommand_Type, message proto.Message) (*RpcResult, error)
 }
 
 type rpcClient struct {
-	hostPort           string
-	pool               ConnectionPool
-	requestIdGenerator uint64
+	serviceUrl          *url.URL
+	pool                ConnectionPool
+	requestIdGenerator  uint64
+	producerIdGenerator uint64
+	consumerIdGenerator uint64
 }
 
 func NewRpcClient(serviceUrl *url.URL, pool ConnectionPool) RpcClient {
 	return &rpcClient{
-		hostPort: serviceUrl.Host,
-		pool:     pool,
+		serviceUrl: serviceUrl,
+		pool:       pool,
 	}
 }
 
 func (c *rpcClient) RequestToAnyBroker(requestId uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RpcResult, error) {
-	return c.Request(c.hostPort, c.hostPort, requestId, cmdType, message)
+	return c.Request(c.serviceUrl, c.serviceUrl, requestId, cmdType, message)
 }
 
-func (c *rpcClient) Request(logicalAddr string, physicalAddr string, requestId uint64,
+func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestId uint64,
 	cmdType pb.BaseCommand_Type, message proto.Message) (*RpcResult, error) {
 	// TODO: Add retry logic in case of connection issues
 	cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
@@ -69,3 +75,11 @@ func (c *rpcClient) Request(logicalAddr string, physicalAddr string, requestId u
 func (c *rpcClient) NewRequestId() uint64 {
 	return atomic.AddUint64(&c.requestIdGenerator, 1)
 }
+
+func (c *rpcClient) NewProducerId() uint64 {
+	return atomic.AddUint64(&c.producerIdGenerator, 1)
+}
+
+func (c *rpcClient) NewConsumerId() uint64 {
+	return atomic.AddUint64(&c.consumerIdGenerator, 1)
+}
diff --git a/pulsar/impl_client.go b/pulsar/impl_client.go
index 4a7047f..a1f6e10 100644
--- a/pulsar/impl_client.go
+++ b/pulsar/impl_client.go
@@ -15,7 +15,9 @@ type client struct {
 	rpcClient     impl.RpcClient
 	lookupService impl.LookupService
 
-	handlers map[impl.Closable]bool
+	handlers            map[impl.Closable]bool
+	producerIdGenerator uint64
+	consumerIdGenerator uint64
 }
 
 func newClient(options ClientOptions) (Client, error) {
@@ -37,12 +39,13 @@ func newClient(options ClientOptions) (Client, error) {
 		cnxPool: impl.NewConnectionPool(),
 	}
 	c.rpcClient = impl.NewRpcClient(url, c.cnxPool)
-	c.lookupService = impl.NewLookupService(c.rpcClient)
+	c.lookupService = impl.NewLookupService(c.rpcClient, url)
+	c.handlers = make(map[impl.Closable]bool)
 	return c, nil
 }
 
 func (client *client) CreateProducer(options ProducerOptions) (Producer, error) {
-	producer, err := newProducer(client, options)
+	producer, err := newProducer(client, &options)
 	if err == nil {
 		client.handlers[producer] = true
 	}
@@ -92,6 +95,7 @@ func (client *client) TopicPartitions(topic string) ([]string, error) {
 	}
 }
 
+
 func (client *client) Close() error {
 	for handler := range client.handlers {
 		if err := handler.Close(); err != nil {
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index d035ad1..4ba5506 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -3,8 +3,8 @@ package pulsar
 import (
 	"context"
 	log "github.com/sirupsen/logrus"
-	//"pulsar-client-go-native/pulsar/impl"
-	//pb "pulsar-client-go-native/pulsar/pulsar_proto"
+	"pulsar-client-go-native/pulsar/impl"
+	pb "pulsar-client-go-native/pulsar/pulsar_proto"
 	"sync"
 )
 
@@ -14,12 +14,27 @@ type partitionProducer struct {
 	log    *log.Entry
 	mutex  sync.Mutex
 	cond   *sync.Cond
+	cnx    impl.Connection
+
+	producerName *string
+	producerId   uint64
+
+	// Channel where app is posting messages to be published
+	eventsChan chan interface{}
 }
 
 func newPartitionProducer(client *client, topic string, options *ProducerOptions) (*partitionProducer, error) {
 
 	p := &partitionProducer{
-		log: log.WithField("topic", topic),
+		log:        log.WithField("topic", topic),
+		client:     client,
+		topic:      topic,
+		producerId: client.rpcClient.NewProducerId(),
+		eventsChan: make(chan interface{}),
+	}
+
+	if options.Name != "" {
+		p.producerName = &options.Name
 	}
 
 	err := p.grabCnx()
@@ -27,55 +42,95 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
 		log.WithError(err).Errorf("Failed to create producer")
 		return nil, err
 	} else {
-		log.Info("Created producer")
+		log.Info("Created producer on cnx: ")
+		go p.run()
 		return p, nil
 	}
 }
 
 func (p *partitionProducer) grabCnx() error {
-	//lr, err := p.client.lookupService.Lookup(p.topic)
-	//if err != nil {
-	//	p.log.WithError(err).Warn("Failed to lookup topic")
-	//	return err
-	//}
-
-	//id := p.client.rpcClient.NewRequestId()
-	//p.client.rpcClient.Request(lr.LogicalAddr.Host, lr.PhysicalAddr.Host, id, pb.BaseCommand_PRODUCER, *pb.CommandProducer{
-	//
-	//})
-	//
-	//var cnx impl.Connection
-	//cnx, err = p.client.cnxPool.GetConnection(lr.LogicalAddr.Host, lr.PhysicalAddr.Host)
-	//if err != nil {
-	//	p.log.WithError(err).Warn("Failed to get connection")
-	//	return err
-	//}
-
-	//cnx.
+	lr, err := p.client.lookupService.Lookup(p.topic)
+	if err != nil {
+		p.log.WithError(err).Warn("Failed to lookup topic")
+		return err
+	}
+
+	p.log.Info("Lookup result: ", lr)
+	id := p.client.rpcClient.NewRequestId()
+	res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, &pb.CommandProducer{
+		RequestId:    &id,
+		Topic:        &p.topic,
+		Encrypted:    nil,
+		Metadata:     nil,
+		ProducerId:   &p.producerId,
+		ProducerName: p.producerName,
+		Schema:       nil,
+	})
+
+	if err != nil {
+		p.log.WithError(err).Error("Failed to create producer")
+		return err
+	}
 
+	p.producerName = res.Response.ProducerSuccess.ProducerName
+	p.cnx = res.Cnx
+	p.log.WithField("cnx", res.Cnx).Info("Created producer")
 	return nil
 }
 
 func (p *partitionProducer) run() {
-
+	for {
+		i := <-p.eventsChan
+		switch v := i.(type) {
+		case *sendRequest:
+			p.log.Info("Received send request: ", v)
+			v.callback(nil, v.msg, nil)
+		}
+	}
 }
 
 func (p *partitionProducer) Topic() string {
-	return ""
+	return p.topic
 }
 
 func (p *partitionProducer) Name() string {
-	return ""
+	return *p.producerName
 }
 
-func (p *partitionProducer) Send(context.Context, ProducerMessage) error {
+func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) error {
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+
+	var err error
+
+	p.SendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
+		err = e
+		wg.Done()
+	})
+
+	// When sending synchronously we flush immediately to avoid
+	// the increased latency and reduced throughput of batching
+	if err = p.Flush(); err != nil {
+		return err
+	}
+
+	wg.Wait()
+	return err
 	return nil
 }
 
-func (p *partitionProducer) SendAsync(context.Context, ProducerMessage, func(ProducerMessage, error)) {
+type sendRequest struct {
+	ctx      context.Context
+	msg      *ProducerMessage
+	callback func(MessageID, *ProducerMessage, error)
+}
+
+func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error)) {
+	p.eventsChan <- &sendRequest{ctx, msg, callback}
 }
 
 func (p *partitionProducer) LastSequenceID() int64 {
+	// TODO: return real last sequence id
 	return -1
 }
 
@@ -84,5 +139,6 @@ func (p *partitionProducer) Flush() error {
 }
 
 func (p *partitionProducer) Close() error {
+	p.log.Info("Closing producer")
 	return nil
 }
diff --git a/pulsar/impl_producer.go b/pulsar/impl_producer.go
index 587b09d..abedfab 100644
--- a/pulsar/impl_producer.go
+++ b/pulsar/impl_producer.go
@@ -2,18 +2,16 @@ package pulsar
 
 import (
 	"context"
-	"fmt"
 	"pulsar-client-go-native/pulsar/impl"
-	"sync"
 )
 
 type producer struct {
 	topic         string
 	producers     []Producer
-	messageRouter func(ProducerMessage, TopicMetadata) int
+	messageRouter func(*ProducerMessage, TopicMetadata) int
 }
 
-func newProducer(client *client, options ProducerOptions) (*producer, error) {
+func newProducer(client *client, options *ProducerOptions) (*producer, error) {
 	if options.Topic == "" {
 		return nil, newError(ResultInvalidTopicName, "Topic name is required for producer")
 	}
@@ -24,7 +22,7 @@ func newProducer(client *client, options ProducerOptions) (*producer, error) {
 
 	if options.MessageRouter == nil {
 		internalRouter := impl.NewDefaultRouter(options.BatchingMaxPublishDelay)
-		p.messageRouter = func(message ProducerMessage, metadata TopicMetadata) int {
+		p.messageRouter = func(message *ProducerMessage, metadata TopicMetadata) int {
 			return internalRouter(metadata.NumPartitions())
 		}
 	}
@@ -45,12 +43,10 @@ func newProducer(client *client, options ProducerOptions) (*producer, error) {
 
 	c := make(chan ProducerError, numPartitions)
 
-	for i := 0; i < numPartitions; i++ {
-		partition := i
+	for partitionIdx, partition := range partitions {
 		go func() {
-			partitionName := fmt.Sprintf("%s-partition-%d", options.Topic, partition)
-			prod, err := newPartitionProducer(client, partitionName, &options)
-			c <- ProducerError{partition, prod, err}
+			prod, err := newPartitionProducer(client, partition, options)
+			c <- ProducerError{partitionIdx, prod, err}
 		}()
 	}
 
@@ -85,22 +81,12 @@ func (p *producer) NumPartitions() uint32 {
 	return uint32(len(p.producers))
 }
 
-func (p *producer) Send(ctx context.Context, msg ProducerMessage) error {
-	wg := sync.WaitGroup{}
-	wg.Add(1)
-
-	var err error
-
-	p.SendAsync(ctx, msg, func(message ProducerMessage, e error) {
-		err = e
-		wg.Done()
-	})
-
-	wg.Wait()
-	return err
+func (p *producer) Send(ctx context.Context, msg *ProducerMessage) error {
+	partition := p.messageRouter(msg, p)
+	return p.producers[partition].Send(ctx, msg)
 }
 
-func (p *producer) SendAsync(ctx context.Context, msg ProducerMessage, callback func(ProducerMessage, error)) {
+func (p *producer) SendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error)) {
 	partition := p.messageRouter(msg, p)
 	p.producers[partition].SendAsync(ctx, msg, callback)
 }
@@ -117,9 +103,21 @@ func (p *producer) LastSequenceID() int64 {
 }
 
 func (p *producer) Flush() error {
-	return nil
+	var err error = nil
+	for _, pp := range p.producers {
+		if e := pp.Flush(); e != nil && err == nil {
+			err = e
+		}
+	}
+	return err
 }
 
 func (p *producer) Close() error {
-	return nil
+	var err error = nil
+	for _, pp := range p.producers {
+		if e := pp.Close(); e != nil && err == nil {
+			err = e
+		}
+	}
+	return err
 }
diff --git a/pulsar/message.go b/pulsar/message.go
index ad61704..60dd989 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -83,8 +83,8 @@ func DeserializeMessageID(data []byte) MessageID {
 
 var (
 // MessageID that points to the earliest message avaialable in a topic
-//EarliestMessage MessageID = earliestMessageID()
+// TODO: EarliestMessage MessageID = earliestMessageID()
 
 // MessageID that points to the latest message
-//LatestMessage MessageID = latestMessageID()
+// TODO: LatestMessage MessageID = latestMessageID()
 )
diff --git a/pulsar/producer.go b/pulsar/producer.go
index 90769cb..0e59e66 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -161,12 +161,12 @@ type Producer interface {
 	// This call will be blocking until is successfully acknowledged by the Pulsar broker.
 	// Example:
 	// producer.Send(ctx, pulsar.ProducerMessage{ Payload: myPayload })
-	Send(context.Context, ProducerMessage) error
+	Send(context.Context, *ProducerMessage) error
 
 	// Send a message in asynchronous mode
 	// The callback will report back the message being published and
 	// the eventual error in publishing
-	SendAsync(context.Context, ProducerMessage, func(ProducerMessage, error))
+	SendAsync(context.Context, *ProducerMessage, func(MessageID, *ProducerMessage, error))
 
 	// Get the last sequence id that was published by this producer.
 	// This represent either the automatically assigned or custom sequence id (set on the ProducerMessage) that