You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:41 UTC
[pulsar-client-go] 04/38: Create producer session
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit ccc596a716e0a15944fc3d270f842cffad8ec1fb
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat Mar 30 07:47:31 2019 -0700
Create producer session
---
pulsar/impl/commands.go | 4 ++
pulsar/impl/connection.go | 7 ++-
pulsar/impl/connection_pool.go | 14 +++--
pulsar/impl/lookup_service.go | 34 ++++++++++--
pulsar/impl/rpc_client.go | 30 +++++++---
pulsar/impl_client.go | 10 +++-
pulsar/impl_partition_producer.go | 112 ++++++++++++++++++++++++++++----------
pulsar/impl_producer.go | 50 ++++++++---------
pulsar/message.go | 4 +-
pulsar/producer.go | 4 +-
10 files changed, 186 insertions(+), 83 deletions(-)
diff --git a/pulsar/impl/commands.go b/pulsar/impl/commands.go
index 90760e1..16e5b60 100644
--- a/pulsar/impl/commands.go
+++ b/pulsar/impl/commands.go
@@ -24,6 +24,10 @@ func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand
cmd.PartitionMetadata = msg.(*pb.CommandPartitionedTopicMetadata)
break
+ case pb.BaseCommand_PRODUCER:
+ cmd.Producer = msg.(*pb.CommandProducer)
+ break
+
default:
log.Panic("Missing command type: ", cmdType)
}
diff --git a/pulsar/impl/connection.go b/pulsar/impl/connection.go
index 6379f6a..28871cb 100644
--- a/pulsar/impl/connection.go
+++ b/pulsar/impl/connection.go
@@ -5,6 +5,7 @@ import (
"github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
"net"
+ "net/url"
pb "pulsar-client-go-native/pulsar/pulsar_proto"
"sync"
"sync/atomic"
@@ -51,11 +52,11 @@ type connection struct {
pendingReqs map[uint64]*request
}
-func newConnection(logicalAddr string, physicalAddr string) *connection {
+func newConnection(logicalAddr *url.URL, physicalAddr *url.URL) *connection {
cnx := &connection{
state: connectionInit,
- logicalAddr: logicalAddr,
- physicalAddr: physicalAddr,
+ logicalAddr: logicalAddr.Host,
+ physicalAddr: physicalAddr.Host,
writeBuffer: NewBuffer(4096),
log: log.WithField("raddr", physicalAddr),
pendingReqs: make(map[uint64]*request),
diff --git a/pulsar/impl/connection_pool.go b/pulsar/impl/connection_pool.go
index 676f205..1fa647f 100644
--- a/pulsar/impl/connection_pool.go
+++ b/pulsar/impl/connection_pool.go
@@ -1,11 +1,13 @@
package impl
import (
+ log "github.com/sirupsen/logrus"
+ "net/url"
"sync"
)
type ConnectionPool interface {
- GetConnection(logicalAddr string, physicalAddr string) (Connection, error)
+ GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error)
// Close all the connections in the pool
Close()
@@ -19,11 +21,13 @@ func NewConnectionPool() ConnectionPool {
return &connectionPool{}
}
-func (p *connectionPool) GetConnection(logicalAddr string, physicalAddr string) (Connection, error) {
- cachedCnx, found := p.pool.Load(logicalAddr)
+func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) {
+ cachedCnx, found := p.pool.Load(logicalAddr.Host)
if found {
cnx := cachedCnx.(*connection)
- if err := cnx.waitUntilReady(); err != nil {
+ log.Debug("Found connection in cache:", cnx.logicalAddr, cnx.physicalAddr)
+
+ if err := cnx.waitUntilReady(); err == nil {
// Connection is ready to be used
return cnx, nil
} else {
@@ -33,7 +37,7 @@ func (p *connectionPool) GetConnection(logicalAddr string, physicalAddr string)
}
// Try to create a new connection
- newCnx, wasCached := p.pool.LoadOrStore(logicalAddr, newConnection(logicalAddr, physicalAddr))
+ newCnx, wasCached := p.pool.LoadOrStore(logicalAddr.Host, newConnection(logicalAddr, physicalAddr))
cnx := newCnx.(*connection)
if !wasCached {
cnx.start()
diff --git a/pulsar/impl/lookup_service.go b/pulsar/impl/lookup_service.go
index 0ebf424..afd0dda 100644
--- a/pulsar/impl/lookup_service.go
+++ b/pulsar/impl/lookup_service.go
@@ -1,6 +1,8 @@
package impl
import (
+ "errors"
+ "fmt"
log "github.com/sirupsen/logrus"
"net/url"
pb "pulsar-client-go-native/pulsar/pulsar_proto"
@@ -16,18 +18,22 @@ type LookupService interface {
}
type lookupService struct {
- rpcClient RpcClient
+ rpcClient RpcClient
+ serviceUrl *url.URL
}
-func NewLookupService(rpcClient RpcClient) LookupService {
+func NewLookupService(rpcClient RpcClient, serviceUrl *url.URL) LookupService {
return &lookupService{
- rpcClient: rpcClient,
+ rpcClient: rpcClient,
+ serviceUrl: serviceUrl,
}
}
func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
// Follow brokers redirect up to certain number of times
- for i := 0; i < 20; i++ {
+ const lookupResultMaxRedirect = 20
+
+ for i := 0; i < lookupResultMaxRedirect; i++ {
id := ls.rpcClient.NewRequestId()
res, err := ls.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_LOOKUP, &pb.CommandLookupTopic{
RequestId: &id,
@@ -42,6 +48,7 @@ func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
lr := res.Response.LookupTopicResponse
switch *lr.Response {
case pb.CommandLookupTopicResponse_Redirect:
+ // TODO: Handle redirects
log.WithField("topic", topic).Infof("Follow redirect to broker. %v / %v - Use proxy: %v",
lr.BrokerServiceUrl, lr.BrokerServiceUrlTls, lr.ProxyThroughServiceUrl)
break
@@ -49,12 +56,27 @@ func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
case pb.CommandLookupTopicResponse_Connect:
log.WithField("topic", topic).Infof("Successfully looked up topic on broker. %s / %s - Use proxy: %t",
lr.GetBrokerServiceUrl(), lr.GetBrokerServiceUrlTls(), lr.GetProxyThroughServiceUrl())
- return nil, nil
+
+ logicalAddress, err := url.Parse(lr.GetBrokerServiceUrl())
+ if err != nil {
+ return nil, err
+ }
+
+ var physicalAddr *url.URL
+ if lr.GetProxyThroughServiceUrl() {
+ physicalAddr = ls.serviceUrl
+ } else {
+ physicalAddr = logicalAddress
+ }
+ return &LookupResult{
+ LogicalAddr: logicalAddress,
+ PhysicalAddr: physicalAddr,
+ }, nil
case pb.CommandLookupTopicResponse_Failed:
log.WithField("topic", topic).Warn("Failed to lookup topic",
lr.Error.String())
- return nil, nil
+ return nil, errors.New(fmt.Sprintf("failed to lookup topic: %s", lr.Error.String()))
}
}
diff --git a/pulsar/impl/rpc_client.go b/pulsar/impl/rpc_client.go
index fd76569..2aa6106 100644
--- a/pulsar/impl/rpc_client.go
+++ b/pulsar/impl/rpc_client.go
@@ -17,31 +17,37 @@ type RpcClient interface {
// Create a new unique request id
NewRequestId() uint64
+ NewProducerId() uint64
+
+ NewConsumerId() uint64
+
// Send a request and block until the result is available
RequestToAnyBroker(requestId uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RpcResult, error)
- Request(logicalAddr string, physicalAddr string, requestId uint64,
+ Request(logicalAddr *url.URL, physicalAddr *url.URL, requestId uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RpcResult, error)
}
type rpcClient struct {
- hostPort string
- pool ConnectionPool
- requestIdGenerator uint64
+ serviceUrl *url.URL
+ pool ConnectionPool
+ requestIdGenerator uint64
+ producerIdGenerator uint64
+ consumerIdGenerator uint64
}
func NewRpcClient(serviceUrl *url.URL, pool ConnectionPool) RpcClient {
return &rpcClient{
- hostPort: serviceUrl.Host,
- pool: pool,
+ serviceUrl: serviceUrl,
+ pool: pool,
}
}
func (c *rpcClient) RequestToAnyBroker(requestId uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RpcResult, error) {
- return c.Request(c.hostPort, c.hostPort, requestId, cmdType, message)
+ return c.Request(c.serviceUrl, c.serviceUrl, requestId, cmdType, message)
}
-func (c *rpcClient) Request(logicalAddr string, physicalAddr string, requestId uint64,
+func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestId uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RpcResult, error) {
// TODO: Add retry logic in case of connection issues
cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
@@ -69,3 +75,11 @@ func (c *rpcClient) Request(logicalAddr string, physicalAddr string, requestId u
func (c *rpcClient) NewRequestId() uint64 {
return atomic.AddUint64(&c.requestIdGenerator, 1)
}
+
+func (c *rpcClient) NewProducerId() uint64 {
+ return atomic.AddUint64(&c.producerIdGenerator, 1)
+}
+
+func (c *rpcClient) NewConsumerId() uint64 {
+ return atomic.AddUint64(&c.consumerIdGenerator, 1)
+}
diff --git a/pulsar/impl_client.go b/pulsar/impl_client.go
index 4a7047f..a1f6e10 100644
--- a/pulsar/impl_client.go
+++ b/pulsar/impl_client.go
@@ -15,7 +15,9 @@ type client struct {
rpcClient impl.RpcClient
lookupService impl.LookupService
- handlers map[impl.Closable]bool
+ handlers map[impl.Closable]bool
+ producerIdGenerator uint64
+ consumerIdGenerator uint64
}
func newClient(options ClientOptions) (Client, error) {
@@ -37,12 +39,13 @@ func newClient(options ClientOptions) (Client, error) {
cnxPool: impl.NewConnectionPool(),
}
c.rpcClient = impl.NewRpcClient(url, c.cnxPool)
- c.lookupService = impl.NewLookupService(c.rpcClient)
+ c.lookupService = impl.NewLookupService(c.rpcClient, url)
+ c.handlers = make(map[impl.Closable]bool)
return c, nil
}
func (client *client) CreateProducer(options ProducerOptions) (Producer, error) {
- producer, err := newProducer(client, options)
+ producer, err := newProducer(client, &options)
if err == nil {
client.handlers[producer] = true
}
@@ -92,6 +95,7 @@ func (client *client) TopicPartitions(topic string) ([]string, error) {
}
}
+
func (client *client) Close() error {
for handler := range client.handlers {
if err := handler.Close(); err != nil {
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index d035ad1..4ba5506 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -3,8 +3,8 @@ package pulsar
import (
"context"
log "github.com/sirupsen/logrus"
- //"pulsar-client-go-native/pulsar/impl"
- //pb "pulsar-client-go-native/pulsar/pulsar_proto"
+ "pulsar-client-go-native/pulsar/impl"
+ pb "pulsar-client-go-native/pulsar/pulsar_proto"
"sync"
)
@@ -14,12 +14,27 @@ type partitionProducer struct {
log *log.Entry
mutex sync.Mutex
cond *sync.Cond
+ cnx impl.Connection
+
+ producerName *string
+ producerId uint64
+
+ // Channel where app is posting messages to be published
+ eventsChan chan interface{}
}
func newPartitionProducer(client *client, topic string, options *ProducerOptions) (*partitionProducer, error) {
p := &partitionProducer{
- log: log.WithField("topic", topic),
+ log: log.WithField("topic", topic),
+ client: client,
+ topic: topic,
+ producerId: client.rpcClient.NewProducerId(),
+ eventsChan: make(chan interface{}),
+ }
+
+ if options.Name != "" {
+ p.producerName = &options.Name
}
err := p.grabCnx()
@@ -27,55 +42,95 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
log.WithError(err).Errorf("Failed to create producer")
return nil, err
} else {
- log.Info("Created producer")
+ log.Info("Created producer on cnx: ")
+ go p.run()
return p, nil
}
}
func (p *partitionProducer) grabCnx() error {
- //lr, err := p.client.lookupService.Lookup(p.topic)
- //if err != nil {
- // p.log.WithError(err).Warn("Failed to lookup topic")
- // return err
- //}
-
- //id := p.client.rpcClient.NewRequestId()
- //p.client.rpcClient.Request(lr.LogicalAddr.Host, lr.PhysicalAddr.Host, id, pb.BaseCommand_PRODUCER, *pb.CommandProducer{
- //
- //})
- //
- //var cnx impl.Connection
- //cnx, err = p.client.cnxPool.GetConnection(lr.LogicalAddr.Host, lr.PhysicalAddr.Host)
- //if err != nil {
- // p.log.WithError(err).Warn("Failed to get connection")
- // return err
- //}
-
- //cnx.
+ lr, err := p.client.lookupService.Lookup(p.topic)
+ if err != nil {
+ p.log.WithError(err).Warn("Failed to lookup topic")
+ return err
+ }
+
+ p.log.Info("Lookup result: ", lr)
+ id := p.client.rpcClient.NewRequestId()
+ res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, &pb.CommandProducer{
+ RequestId: &id,
+ Topic: &p.topic,
+ Encrypted: nil,
+ Metadata: nil,
+ ProducerId: &p.producerId,
+ ProducerName: p.producerName,
+ Schema: nil,
+ })
+
+ if err != nil {
+ p.log.WithError(err).Error("Failed to create producer")
+ return err
+ }
+ p.producerName = res.Response.ProducerSuccess.ProducerName
+ p.cnx = res.Cnx
+ p.log.WithField("cnx", res.Cnx).Info("Created producer")
return nil
}
func (p *partitionProducer) run() {
-
+ for {
+ i := <-p.eventsChan
+ switch v := i.(type) {
+ case *sendRequest:
+ p.log.Info("Received send request: ", v)
+ v.callback(nil, v.msg, nil)
+ }
+ }
}
func (p *partitionProducer) Topic() string {
- return ""
+ return p.topic
}
func (p *partitionProducer) Name() string {
- return ""
+ return *p.producerName
}
-func (p *partitionProducer) Send(context.Context, ProducerMessage) error {
+func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) error {
+ wg := sync.WaitGroup{}
+ wg.Add(1)
+
+ var err error
+
+ p.SendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
+ err = e
+ wg.Done()
+ })
+
+ // When sending synchronously we flush immediately to avoid
+ // the increased latency and reduced throughput of batching
+ if err = p.Flush(); err != nil {
+ return err
+ }
+
+ wg.Wait()
+ return err
return nil
}
-func (p *partitionProducer) SendAsync(context.Context, ProducerMessage, func(ProducerMessage, error)) {
+type sendRequest struct {
+ ctx context.Context
+ msg *ProducerMessage
+ callback func(MessageID, *ProducerMessage, error)
+}
+
+func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error)) {
+ p.eventsChan <- &sendRequest{ctx, msg, callback}
}
func (p *partitionProducer) LastSequenceID() int64 {
+ // TODO: return real last sequence id
return -1
}
@@ -84,5 +139,6 @@ func (p *partitionProducer) Flush() error {
}
func (p *partitionProducer) Close() error {
+ p.log.Info("Closing producer")
return nil
}
diff --git a/pulsar/impl_producer.go b/pulsar/impl_producer.go
index 587b09d..abedfab 100644
--- a/pulsar/impl_producer.go
+++ b/pulsar/impl_producer.go
@@ -2,18 +2,16 @@ package pulsar
import (
"context"
- "fmt"
"pulsar-client-go-native/pulsar/impl"
- "sync"
)
type producer struct {
topic string
producers []Producer
- messageRouter func(ProducerMessage, TopicMetadata) int
+ messageRouter func(*ProducerMessage, TopicMetadata) int
}
-func newProducer(client *client, options ProducerOptions) (*producer, error) {
+func newProducer(client *client, options *ProducerOptions) (*producer, error) {
if options.Topic == "" {
return nil, newError(ResultInvalidTopicName, "Topic name is required for producer")
}
@@ -24,7 +22,7 @@ func newProducer(client *client, options ProducerOptions) (*producer, error) {
if options.MessageRouter == nil {
internalRouter := impl.NewDefaultRouter(options.BatchingMaxPublishDelay)
- p.messageRouter = func(message ProducerMessage, metadata TopicMetadata) int {
+ p.messageRouter = func(message *ProducerMessage, metadata TopicMetadata) int {
return internalRouter(metadata.NumPartitions())
}
}
@@ -45,12 +43,10 @@ func newProducer(client *client, options ProducerOptions) (*producer, error) {
c := make(chan ProducerError, numPartitions)
- for i := 0; i < numPartitions; i++ {
- partition := i
+ for partitionIdx, partition := range partitions {
go func() {
- partitionName := fmt.Sprintf("%s-partition-%d", options.Topic, partition)
- prod, err := newPartitionProducer(client, partitionName, &options)
- c <- ProducerError{partition, prod, err}
+ prod, err := newPartitionProducer(client, partition, options)
+ c <- ProducerError{partitionIdx, prod, err}
}()
}
@@ -85,22 +81,12 @@ func (p *producer) NumPartitions() uint32 {
return uint32(len(p.producers))
}
-func (p *producer) Send(ctx context.Context, msg ProducerMessage) error {
- wg := sync.WaitGroup{}
- wg.Add(1)
-
- var err error
-
- p.SendAsync(ctx, msg, func(message ProducerMessage, e error) {
- err = e
- wg.Done()
- })
-
- wg.Wait()
- return err
+func (p *producer) Send(ctx context.Context, msg *ProducerMessage) error {
+ partition := p.messageRouter(msg, p)
+ return p.producers[partition].Send(ctx, msg)
}
-func (p *producer) SendAsync(ctx context.Context, msg ProducerMessage, callback func(ProducerMessage, error)) {
+func (p *producer) SendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error)) {
partition := p.messageRouter(msg, p)
p.producers[partition].SendAsync(ctx, msg, callback)
}
@@ -117,9 +103,21 @@ func (p *producer) LastSequenceID() int64 {
}
func (p *producer) Flush() error {
- return nil
+ var err error = nil
+ for _, pp := range p.producers {
+ if e := pp.Flush(); e != nil && err == nil {
+ err = e
+ }
+ }
+ return err
}
func (p *producer) Close() error {
- return nil
+ var err error = nil
+ for _, pp := range p.producers {
+ if e := pp.Close(); e != nil && err == nil {
+ err = e
+ }
+ }
+ return err
}
diff --git a/pulsar/message.go b/pulsar/message.go
index ad61704..60dd989 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -83,8 +83,8 @@ func DeserializeMessageID(data []byte) MessageID {
var (
// MessageID that points to the earliest message avaialable in a topic
-//EarliestMessage MessageID = earliestMessageID()
+// TODO: EarliestMessage MessageID = earliestMessageID()
// MessageID that points to the latest message
-//LatestMessage MessageID = latestMessageID()
+// TODO: LatestMessage MessageID = latestMessageID()
)
diff --git a/pulsar/producer.go b/pulsar/producer.go
index 90769cb..0e59e66 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -161,12 +161,12 @@ type Producer interface {
// This call will be blocking until is successfully acknowledged by the Pulsar broker.
// Example:
// producer.Send(ctx, pulsar.ProducerMessage{ Payload: myPayload })
- Send(context.Context, ProducerMessage) error
+ Send(context.Context, *ProducerMessage) error
// Send a message in asynchronous mode
// The callback will report back the message being published and
// the eventual error in publishing
- SendAsync(context.Context, ProducerMessage, func(ProducerMessage, error))
+ SendAsync(context.Context, *ProducerMessage, func(MessageID, *ProducerMessage, error))
// Get the last sequence id that was published by this producer.
// This represent either the automatically assigned or custom sequence id (set on the ProducerMessage) that