You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:46 UTC

[pulsar-client-go] 09/38: Basic publishing works

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit f7cac7a6506b0091b9a5a291c48b968603a1c07a
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Apr 5 15:38:20 2019 -0700

    Basic publishing works
---
 pulsar/impl/batch_builder.go      | 92 +++++++++++++++++++++++++++++++++++---
 pulsar/impl/buffer.go             | 39 ++++++++++++++++
 pulsar/impl/checksum.go           |  9 ++++
 pulsar/impl/commands.go           | 61 +++++++++++++++++++++++++
 pulsar/impl/connection.go         | 42 +++++++++++++----
 pulsar/impl/utils.go              | 20 +++++++++
 pulsar/impl_partition_producer.go | 94 +++++++++++++++++++++++++++++++++------
 pulsar/message.go                 |  4 +-
 8 files changed, 331 insertions(+), 30 deletions(-)

diff --git a/pulsar/impl/batch_builder.go b/pulsar/impl/batch_builder.go
index 7eb2030..6b0df8e 100644
--- a/pulsar/impl/batch_builder.go
+++ b/pulsar/impl/batch_builder.go
@@ -1,23 +1,101 @@
 package impl
 
+import (
+	"github.com/golang/protobuf/proto"
+	log "github.com/sirupsen/logrus"
+	pb "pulsar-client-go-native/pulsar/pulsar_proto"
+	"time"
+)
+
+const MaxMessageSize = 5 * 1024 * 1024
+
+const MaxBatchSize = 128 * 1024
+
+const DefaultMaxMessagesPerBatch = 1000
+
 type BatchBuilder struct {
 	buffer Buffer
+
+	// Current number of messages in the batch
+	numMessages uint
+
+	// Max number of message allowed in the batch
+	maxMessages uint
+
+	producerName string
+	producerId   uint64
+
+	cmdSend     *pb.BaseCommand
+	msgMetadata *pb.MessageMetadata
 }
 
-func NewBatchBuilder() *BatchBuilder {
+func NewBatchBuilder(maxMessages uint, producerName string, producerId uint64) *BatchBuilder {
+	if maxMessages == 0 {
+		maxMessages = DefaultMaxMessagesPerBatch
+	}
 	return &BatchBuilder{
-		buffer: NewBuffer(4096),
+		buffer:       NewBuffer(4096),
+		numMessages:  0,
+		maxMessages:  maxMessages,
+		producerName: producerName,
+		producerId:   producerId,
+		cmdSend: baseCommand(pb.BaseCommand_SEND,
+			&pb.CommandSend{
+				ProducerId: &producerId,
+			}),
+		msgMetadata: &pb.MessageMetadata{
+			ProducerName: &producerName,
+		},
 	}
 }
 
-func (bb *BatchBuilder) isFull() bool {
-	return false
+func (bb *BatchBuilder) IsFull() bool {
+	return bb.numMessages >= bb.maxMessages || bb.buffer.ReadableBytes() > MaxBatchSize
+}
+
+func (bb *BatchBuilder) hasSpace(payload []byte) bool {
+	msgSize := uint32(len(payload))
+	return bb.numMessages > 0 && (bb.buffer.ReadableBytes()+msgSize) > MaxBatchSize
 }
 
-func (bb *BatchBuilder) hasSpace(size int) bool {
-	return false
+func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceId uint64, payload []byte) bool {
+	if bb.hasSpace(payload) {
+		// The current batch is full. Producer has to call Flush() to
+		return false
+	}
+
+	if bb.numMessages == 0 {
+		bb.msgMetadata.SequenceId = proto.Uint64(sequenceId)
+		bb.msgMetadata.PublishTime = proto.Uint64(TimestampMillis(time.Now()))
+		bb.msgMetadata.SequenceId = proto.Uint64(sequenceId)
+		bb.msgMetadata.ProducerName = &bb.producerName
+
+		bb.cmdSend.Send.SequenceId = proto.Uint64(sequenceId)
+	}
+	serializeSingleMessage(bb.buffer, metadata, payload)
+
+	bb.numMessages += 1
+	return true
+}
+
+func (bb *BatchBuilder) reset() {
+	bb.numMessages = 0
+	bb.buffer.Clear()
 }
 
 func (bb *BatchBuilder) Flush() []byte {
-	return nil
+	log.Info("BatchBuilder flush: messages: ", bb.numMessages)
+	if bb.numMessages == 0 {
+		// No-Op for empty batch
+		return nil
+	}
+
+	bb.msgMetadata.NumMessagesInBatch = proto.Int32(int32(bb.numMessages))
+	bb.cmdSend.Send.NumMessages = proto.Int32(int32(bb.numMessages))
+
+	buffer := NewBuffer(4096)
+	serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, bb.buffer.ReadableSlice())
+
+	bb.reset()
+	return buffer.ReadableSlice()
 }
diff --git a/pulsar/impl/buffer.go b/pulsar/impl/buffer.go
index 8e81f0c..3d03002 100644
--- a/pulsar/impl/buffer.go
+++ b/pulsar/impl/buffer.go
@@ -13,6 +13,8 @@ type Buffer interface {
 
 	Read(size uint32) []byte
 
+	Get(readerIndex uint32, size uint32) []byte
+
 	ReadableSlice() []byte
 
 	WritableSlice() []byte
@@ -23,12 +25,20 @@ type Buffer interface {
 	// Copy the available portion of data at the beginning of the buffer
 	MoveToFront()
 
+	ReadUint16() uint16
 	ReadUint32() uint32
 
+	WriteUint16(n uint16)
 	WriteUint32(n uint32)
 
+	WriterIndex() uint32
+	ReaderIndex() uint32
+
 	Write(s []byte)
 
+	Put(writerIdx uint32, s []byte)
+	PutUint32(n uint32, writerIdx uint32)
+
 	Resize(newSize uint32)
 
 	Clear()
@@ -71,6 +81,10 @@ func (b *buffer) Read(size uint32) []byte {
 	return res
 }
 
+func (b *buffer) Get(readerIdx uint32, size uint32) []byte {
+	return b.data[readerIdx : readerIdx+size]
+}
+
 func (b *buffer) ReadableSlice() []byte {
 	return b.data[b.readerIdx:b.writerIdx]
 }
@@ -83,6 +97,14 @@ func (b *buffer) WrittenBytes(size uint32) {
 	b.writerIdx += size
 }
 
+func (b *buffer) WriterIndex() uint32 {
+	return b.writerIdx
+}
+
+func (b *buffer) ReaderIndex() uint32 {
+	return b.readerIdx
+}
+
 func (b *buffer) MoveToFront() {
 	size := b.ReadableBytes()
 	copy(b.data, b.Read(size))
@@ -103,16 +125,33 @@ func (b *buffer) ReadUint32() uint32 {
 	return binary.BigEndian.Uint32(b.Read(4))
 }
 
+func (b *buffer) ReadUint16() uint16 {
+	return binary.BigEndian.Uint16(b.Read(2))
+}
+
 func (b *buffer) WriteUint32(n uint32) {
 	binary.BigEndian.PutUint32(b.WritableSlice(), n)
 	b.writerIdx += 4
 }
 
+func (b *buffer) PutUint32(n uint32, idx uint32) {
+	binary.BigEndian.PutUint32(b.data[idx:], n)
+}
+
+func (b *buffer) WriteUint16(n uint16) {
+	binary.BigEndian.PutUint16(b.WritableSlice(), n)
+	b.writerIdx += 2
+}
+
 func (b *buffer) Write(s []byte) {
 	copy(b.WritableSlice(), s)
 	b.writerIdx += uint32(len(s))
 }
 
+func (b *buffer) Put(writerIdx uint32, s []byte) {
+	copy(b.data[writerIdx:], s)
+}
+
 func (b *buffer) Clear() {
 	b.readerIdx = 0
 	b.writerIdx = 0
diff --git a/pulsar/impl/checksum.go b/pulsar/impl/checksum.go
new file mode 100644
index 0000000..cbea686
--- /dev/null
+++ b/pulsar/impl/checksum.go
@@ -0,0 +1,9 @@
+package impl
+
+import "hash/crc32"
+
+var crc32cTable = crc32.MakeTable(crc32.Castagnoli)
+
+func Crc32cCheckSum(data []byte) uint32 {
+	return crc32.Checksum(data, crc32cTable)
+}
\ No newline at end of file
diff --git a/pulsar/impl/commands.go b/pulsar/impl/commands.go
index ed9f6b9..6b1c840 100644
--- a/pulsar/impl/commands.go
+++ b/pulsar/impl/commands.go
@@ -8,6 +8,8 @@ import (
 
 const MaxFrameSize = 5 * 1024 * 1024
 
+const magicCrc32c uint16 = 0x0e01
+
 func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand {
 	cmd := &pb.BaseCommand{
 		Type: &cmdType,
@@ -25,6 +27,8 @@ func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand
 		cmd.Ping = msg.(*pb.CommandPing)
 	case pb.BaseCommand_PONG:
 		cmd.Pong = msg.(*pb.CommandPong)
+	case pb.BaseCommand_SEND:
+		cmd.Send = msg.(*pb.CommandSend)
 	default:
 		log.Panic("Missing command type: ", cmdType)
 	}
@@ -32,6 +36,63 @@ func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand
 	return cmd
 }
 
+func serializeSingleMessage(wb Buffer, smm *pb.SingleMessageMetadata, payload []byte) {
+	serialized, err := proto.Marshal(smm)
+	if err != nil {
+		log.WithError(err).Fatal("Protobuf serialization error")
+	}
+
+	wb.Write(serialized)
+	wb.Write(payload)
+}
+
+func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata *pb.MessageMetadata, payload []byte) {
+	// Wire format
+	// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
+	cmdSize := proto.Size(cmdSend)
+	msgMetadataSize := proto.Size(msgMetadata)
+	payloadSize := len(payload)
+
+	magicAndChecksumLength := 2 + 4 /* magic + checksumLength */
+	headerContentSize := 4 + cmdSize + magicAndChecksumLength + 4 + msgMetadataSize
+	// cmdLength + cmdSize + magicLength + checksumSize + msgMetadataLength + msgMetadataSize
+	totalSize := headerContentSize + payloadSize
+
+	wb.WriteUint32(uint32(totalSize))  // External frame
+
+	// Write cmd
+	wb.WriteUint32(uint32(cmdSize))
+	serialized, err := proto.Marshal(cmdSend)
+	if err != nil {
+		log.WithError(err).Fatal("Protobuf error when serializing cmdSend")
+	}
+
+	wb.Write(serialized)
+
+	// Create checksum placeholder
+	wb.WriteUint16(magicCrc32c)
+	checksumIdx := wb.WriterIndex()
+	wb.WriteUint32(0) // skip 4 bytes of checksum
+
+	// Write metadata
+	metadataStartIdx := wb.WriterIndex()
+	wb.WriteUint32(uint32(msgMetadataSize))
+	serialized, err = proto.Marshal(msgMetadata)
+	if err != nil {
+		log.WithError(err).Fatal("Protobuf error when serializing msgMetadata")
+	}
+
+	wb.Write(serialized)
+	wb.Write(payload)
+
+	// Write checksum at created checksum-placeholder
+	endIdx := wb.WriterIndex()
+	checksum := Crc32cCheckSum(wb.Get(metadataStartIdx, endIdx-metadataStartIdx))
+
+	// set computed checksum
+	wb.PutUint32(checksum, checksumIdx)
+}
+
 func ConvertFromStringMap(m map[string]string) []*pb.KeyValue {
 	list := make([]*pb.KeyValue, len(m))
 
diff --git a/pulsar/impl/connection.go b/pulsar/impl/connection.go
index 365f1b3..04fa10f 100644
--- a/pulsar/impl/connection.go
+++ b/pulsar/impl/connection.go
@@ -14,6 +14,7 @@ import (
 
 type Connection interface {
 	SendRequest(requestId uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand))
+	WriteData(data []byte)
 	Close()
 }
 
@@ -54,6 +55,7 @@ type connection struct {
 	requestIdGenerator uint64
 
 	incomingRequests chan *request
+	writeRequests    chan []byte
 	pendingReqs      map[uint64]*request
 }
 
@@ -67,6 +69,9 @@ func newConnection(logicalAddr *url.URL, physicalAddr *url.URL) *connection {
 		pendingReqs:          make(map[uint64]*request),
 		lastDataReceivedTime: time.Now(),
 		pingTicker:           time.NewTicker(keepAliveInterval),
+
+		incomingRequests: make(chan *request),
+		writeRequests:    make(chan []byte),
 	}
 	cnx.reader = newConnectionReader(cnx)
 	cnx.cond = sync.NewCond(cnx)
@@ -164,12 +169,27 @@ func (c *connection) run() {
 			c.pendingReqs[req.id] = req
 			c.writeCommand(req.cmd)
 
+		case data := <-c.writeRequests:
+			c.internalWriteData(data)
+
 		case _ = <-c.pingTicker.C:
 			c.sendPing()
 		}
 	}
 }
 
+func (c *connection) WriteData(data []byte) {
+	c.writeRequests <- data
+}
+
+func (c *connection) internalWriteData(data []byte) {
+	c.log.Info("Write data: ", len(data))
+	if _, err := c.cnx.Write(data); err != nil {
+		c.log.WithError(err).Warn("Failed to write on connection")
+		c.Close()
+	}
+}
+
 func (c *connection) writeCommand(cmd proto.Message) {
 	// Wire format
 	// [FRAME_SIZE] [CMD_SIZE][CMD]
@@ -190,11 +210,7 @@ func (c *connection) writeCommand(cmd proto.Message) {
 	}
 
 	c.writeBuffer.Write(serialized)
-
-	if _, err := c.cnx.Write(c.writeBuffer.ReadableSlice()); err != nil {
-		c.log.WithError(err).Warn("Failed to write on connection")
-		c.Close()
-	}
+	c.internalWriteData(c.writeBuffer.ReadableSlice())
 }
 
 func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []byte) {
@@ -230,6 +246,8 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []by
 	case pb.BaseCommand_CLOSE_PRODUCER:
 	case pb.BaseCommand_CLOSE_CONSUMER:
 	case pb.BaseCommand_SEND_RECEIPT:
+		c.log.Info("Got SEND_RECEIPT: ", cmd.GetSendReceipt())
+
 	case pb.BaseCommand_SEND_ERROR:
 
 	case pb.BaseCommand_MESSAGE:
@@ -246,13 +264,21 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []by
 	}
 }
 
+func (c *connection) Write(data []byte) {
+	c.writeRequests <- data
+}
+
 func (c *connection) SendRequest(requestId uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand)) {
-	c.pendingReqs[requestId] = &request{
+	c.incomingRequests <- &request{
 		id:       requestId,
 		cmd:      req,
 		callback: callback,
 	}
-	c.writeCommand(req)
+}
+
+func (c *connection) internalSendRequest(req *request) {
+	c.pendingReqs[req.id] = req
+	c.writeCommand(req.cmd)
 }
 
 func (c *connection) handleResponse(requestId uint64, response *pb.BaseCommand) {
@@ -299,6 +325,7 @@ func (c *connection) Close() {
 		c.cnx.Close()
 		c.pingTicker.Stop()
 		close(c.incomingRequests)
+		close(c.writeRequests)
 		c.cnx = nil
 	}
 }
@@ -313,4 +340,3 @@ func (c *connection) changeState(state connectionState) {
 func (c *connection) newRequestId() uint64 {
 	return atomic.AddUint64(&c.requestIdGenerator, 1)
 }
-
diff --git a/pulsar/impl/utils.go b/pulsar/impl/utils.go
new file mode 100644
index 0000000..02d15b3
--- /dev/null
+++ b/pulsar/impl/utils.go
@@ -0,0 +1,20 @@
+package impl
+
+import (
+	"sync/atomic"
+	"time"
+)
+
+func TimestampMillis(t time.Time) uint64 {
+	return uint64(t.UnixNano()) / uint64(time.Millisecond)
+}
+
+// Perform atomic read and update
+func GetAndAdd(n *uint64, diff uint64) uint64 {
+	for {
+		v := *n
+		if atomic.CompareAndSwapUint64(n, v, v+diff) {
+			return v
+		}
+	}
+}
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index 7bb2165..2677770 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -2,10 +2,12 @@ package pulsar
 
 import (
 	"context"
+	"github.com/golang/protobuf/proto"
 	log "github.com/sirupsen/logrus"
 	"pulsar-client-go-native/pulsar/impl"
 	pb "pulsar-client-go-native/pulsar/pulsar_proto"
 	"sync"
+	"time"
 )
 
 type partitionProducer struct {
@@ -16,21 +18,36 @@ type partitionProducer struct {
 	cond   *sync.Cond
 	cnx    impl.Connection
 
-	producerName *string
-	producerId   uint64
+	options             *ProducerOptions
+	producerName        *string
+	producerId          uint64
+	batchBuilder        *impl.BatchBuilder
+	sequenceIdGenerator *uint64
+	batchFlushTicker    *time.Ticker
 
 	// Channel where app is posting messages to be published
 	eventsChan chan interface{}
 }
 
+const defaultBatchingMaxPublishDelay = 10 * time.Millisecond
+
 func newPartitionProducer(client *client, topic string, options *ProducerOptions) (*partitionProducer, error) {
 
+	var batchingMaxPublishDelay time.Duration
+	if options.BatchingMaxPublishDelay != 0 {
+		batchingMaxPublishDelay = options.BatchingMaxPublishDelay
+	} else {
+		batchingMaxPublishDelay = defaultBatchingMaxPublishDelay
+	}
+
 	p := &partitionProducer{
-		log:        log.WithField("topic", topic),
-		client:     client,
-		topic:      topic,
-		producerId: client.rpcClient.NewProducerId(),
-		eventsChan: make(chan interface{}),
+		log:              log.WithField("topic", topic),
+		client:           client,
+		topic:            topic,
+		options:          options,
+		producerId:       client.rpcClient.NewProducerId(),
+		eventsChan:       make(chan interface{}),
+		batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
 	}
 
 	if options.Name != "" {
@@ -74,6 +91,13 @@ func (p *partitionProducer) grabCnx() error {
 	}
 
 	p.producerName = res.Response.ProducerSuccess.ProducerName
+	if p.batchBuilder == nil {
+		p.batchBuilder = impl.NewBatchBuilder(p.options.BatchingMaxMessages, *p.producerName, p.producerId)
+	}
+	if p.sequenceIdGenerator == nil {
+		nextSequenceId := uint64(res.Response.ProducerSuccess.GetLastSequenceId() + 1)
+		p.sequenceIdGenerator = &nextSequenceId
+	}
 	p.cnx = res.Cnx
 	p.log.WithField("cnx", res.Cnx).Debug("Connected producer")
 	return nil
@@ -81,11 +105,15 @@ func (p *partitionProducer) grabCnx() error {
 
 func (p *partitionProducer) run() {
 	for {
-		i := <-p.eventsChan
-		switch v := i.(type) {
-		case *sendRequest:
-			p.log.Debug("Received send request: ", v)
-			v.callback(nil, v.msg, nil)
+		select {
+		case i := <-p.eventsChan:
+			switch v := i.(type) {
+			case *sendRequest:
+				p.internalSend(v)
+			}
+
+		case _ = <-p.batchFlushTicker.C:
+			p.internalFlush()
 		}
 	}
 }
@@ -98,6 +126,47 @@ func (p *partitionProducer) Name() string {
 	return *p.producerName
 }
 
+func (p *partitionProducer) internalSend(request *sendRequest) {
+	p.log.Debug("Received send request: ", *request)
+
+	msg := request.msg
+
+	if msg.ReplicationClusters == nil {
+		smm := &pb.SingleMessageMetadata{
+			PayloadSize: proto.Int(len(msg.Payload)),
+		}
+
+		if msg.EventTime != nil {
+			smm.EventTime = proto.Uint64(impl.TimestampMillis(*msg.EventTime))
+		}
+
+		if msg.Key != "" {
+			smm.PartitionKey = &msg.Key
+		}
+
+		if msg.Properties != nil {
+			smm.Properties = impl.ConvertFromStringMap(msg.Properties)
+		}
+
+		sequenceId := impl.GetAndAdd(p.sequenceIdGenerator, 1)
+		for ; p.batchBuilder.Add(smm, sequenceId, msg.Payload) == false; {
+			// The current batch is full.. flush it and retry
+			p.internalFlush()
+		}
+	} else {
+		p.log.Panic("TODO: serialize into single message")
+	}
+}
+
+func (p *partitionProducer) internalFlush() {
+	batchData := p.batchBuilder.Flush()
+	if batchData == nil {
+		return
+	}
+
+	p.cnx.WriteData(batchData)
+}
+
 func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) error {
 	wg := sync.WaitGroup{}
 	wg.Add(1)
@@ -117,7 +186,6 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) erro
 
 	wg.Wait()
 	return err
-	return nil
 }
 
 type sendRequest struct {
diff --git a/pulsar/message.go b/pulsar/message.go
index 60dd989..12bc269 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -32,13 +32,13 @@ type ProducerMessage struct {
 	Properties map[string]string
 
 	// Set the event time for a given message
-	EventTime time.Time
+	EventTime *time.Time
 
 	// Override the replication clusters for this message.
 	ReplicationClusters []string
 
 	// Set the sequence id to assign to the current message
-	SequenceID int64
+	SequenceID *int64
 }
 
 type Message interface {