You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:46 UTC
[pulsar-client-go] 09/38: Basic publishing works
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit f7cac7a6506b0091b9a5a291c48b968603a1c07a
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Apr 5 15:38:20 2019 -0700
Basic publishing works
---
pulsar/impl/batch_builder.go | 92 +++++++++++++++++++++++++++++++++++---
pulsar/impl/buffer.go | 39 ++++++++++++++++
pulsar/impl/checksum.go | 9 ++++
pulsar/impl/commands.go | 61 +++++++++++++++++++++++++
pulsar/impl/connection.go | 42 +++++++++++++----
pulsar/impl/utils.go | 20 +++++++++
pulsar/impl_partition_producer.go | 94 +++++++++++++++++++++++++++++++++------
pulsar/message.go | 4 +-
8 files changed, 331 insertions(+), 30 deletions(-)
diff --git a/pulsar/impl/batch_builder.go b/pulsar/impl/batch_builder.go
index 7eb2030..6b0df8e 100644
--- a/pulsar/impl/batch_builder.go
+++ b/pulsar/impl/batch_builder.go
@@ -1,23 +1,101 @@
package impl
+import (
+ "github.com/golang/protobuf/proto"
+ log "github.com/sirupsen/logrus"
+ pb "pulsar-client-go-native/pulsar/pulsar_proto"
+ "time"
+)
+
+const MaxMessageSize = 5 * 1024 * 1024
+
+const MaxBatchSize = 128 * 1024
+
+const DefaultMaxMessagesPerBatch = 1000
+
type BatchBuilder struct {
buffer Buffer
+
+ // Current number of messages in the batch
+ numMessages uint
+
+ // Max number of message allowed in the batch
+ maxMessages uint
+
+ producerName string
+ producerId uint64
+
+ cmdSend *pb.BaseCommand
+ msgMetadata *pb.MessageMetadata
}
-func NewBatchBuilder() *BatchBuilder {
+func NewBatchBuilder(maxMessages uint, producerName string, producerId uint64) *BatchBuilder {
+ if maxMessages == 0 {
+ maxMessages = DefaultMaxMessagesPerBatch
+ }
return &BatchBuilder{
- buffer: NewBuffer(4096),
+ buffer: NewBuffer(4096),
+ numMessages: 0,
+ maxMessages: maxMessages,
+ producerName: producerName,
+ producerId: producerId,
+ cmdSend: baseCommand(pb.BaseCommand_SEND,
+ &pb.CommandSend{
+ ProducerId: &producerId,
+ }),
+ msgMetadata: &pb.MessageMetadata{
+ ProducerName: &producerName,
+ },
}
}
-func (bb *BatchBuilder) isFull() bool {
- return false
+func (bb *BatchBuilder) IsFull() bool {
+ return bb.numMessages >= bb.maxMessages || bb.buffer.ReadableBytes() > MaxBatchSize
+}
+
+func (bb *BatchBuilder) hasSpace(payload []byte) bool {
+ msgSize := uint32(len(payload))
+ return bb.numMessages > 0 && (bb.buffer.ReadableBytes()+msgSize) > MaxBatchSize
}
-func (bb *BatchBuilder) hasSpace(size int) bool {
- return false
+func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceId uint64, payload []byte) bool {
+ if bb.hasSpace(payload) {
+ // The current batch is full. Producer has to call Flush() to
+ return false
+ }
+
+ if bb.numMessages == 0 {
+ bb.msgMetadata.SequenceId = proto.Uint64(sequenceId)
+ bb.msgMetadata.PublishTime = proto.Uint64(TimestampMillis(time.Now()))
+ bb.msgMetadata.SequenceId = proto.Uint64(sequenceId)
+ bb.msgMetadata.ProducerName = &bb.producerName
+
+ bb.cmdSend.Send.SequenceId = proto.Uint64(sequenceId)
+ }
+ serializeSingleMessage(bb.buffer, metadata, payload)
+
+ bb.numMessages += 1
+ return true
+}
+
+func (bb *BatchBuilder) reset() {
+ bb.numMessages = 0
+ bb.buffer.Clear()
}
func (bb *BatchBuilder) Flush() []byte {
- return nil
+ log.Info("BatchBuilder flush: messages: ", bb.numMessages)
+ if bb.numMessages == 0 {
+ // No-Op for empty batch
+ return nil
+ }
+
+ bb.msgMetadata.NumMessagesInBatch = proto.Int32(int32(bb.numMessages))
+ bb.cmdSend.Send.NumMessages = proto.Int32(int32(bb.numMessages))
+
+ buffer := NewBuffer(4096)
+ serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, bb.buffer.ReadableSlice())
+
+ bb.reset()
+ return buffer.ReadableSlice()
}
diff --git a/pulsar/impl/buffer.go b/pulsar/impl/buffer.go
index 8e81f0c..3d03002 100644
--- a/pulsar/impl/buffer.go
+++ b/pulsar/impl/buffer.go
@@ -13,6 +13,8 @@ type Buffer interface {
Read(size uint32) []byte
+ Get(readerIndex uint32, size uint32) []byte
+
ReadableSlice() []byte
WritableSlice() []byte
@@ -23,12 +25,20 @@ type Buffer interface {
// Copy the available portion of data at the beginning of the buffer
MoveToFront()
+ ReadUint16() uint16
ReadUint32() uint32
+ WriteUint16(n uint16)
WriteUint32(n uint32)
+ WriterIndex() uint32
+ ReaderIndex() uint32
+
Write(s []byte)
+ Put(writerIdx uint32, s []byte)
+ PutUint32(n uint32, writerIdx uint32)
+
Resize(newSize uint32)
Clear()
@@ -71,6 +81,10 @@ func (b *buffer) Read(size uint32) []byte {
return res
}
+func (b *buffer) Get(readerIdx uint32, size uint32) []byte {
+ return b.data[readerIdx : readerIdx+size]
+}
+
func (b *buffer) ReadableSlice() []byte {
return b.data[b.readerIdx:b.writerIdx]
}
@@ -83,6 +97,14 @@ func (b *buffer) WrittenBytes(size uint32) {
b.writerIdx += size
}
+func (b *buffer) WriterIndex() uint32 {
+ return b.writerIdx
+}
+
+func (b *buffer) ReaderIndex() uint32 {
+ return b.readerIdx
+}
+
func (b *buffer) MoveToFront() {
size := b.ReadableBytes()
copy(b.data, b.Read(size))
@@ -103,16 +125,33 @@ func (b *buffer) ReadUint32() uint32 {
return binary.BigEndian.Uint32(b.Read(4))
}
+func (b *buffer) ReadUint16() uint16 {
+ return binary.BigEndian.Uint16(b.Read(2))
+}
+
func (b *buffer) WriteUint32(n uint32) {
binary.BigEndian.PutUint32(b.WritableSlice(), n)
b.writerIdx += 4
}
+func (b *buffer) PutUint32(n uint32, idx uint32) {
+ binary.BigEndian.PutUint32(b.data[idx:], n)
+}
+
+func (b *buffer) WriteUint16(n uint16) {
+ binary.BigEndian.PutUint16(b.WritableSlice(), n)
+ b.writerIdx += 2
+}
+
func (b *buffer) Write(s []byte) {
copy(b.WritableSlice(), s)
b.writerIdx += uint32(len(s))
}
+func (b *buffer) Put(writerIdx uint32, s []byte) {
+ copy(b.data[writerIdx:], s)
+}
+
func (b *buffer) Clear() {
b.readerIdx = 0
b.writerIdx = 0
diff --git a/pulsar/impl/checksum.go b/pulsar/impl/checksum.go
new file mode 100644
index 0000000..cbea686
--- /dev/null
+++ b/pulsar/impl/checksum.go
@@ -0,0 +1,9 @@
+package impl
+
+import "hash/crc32"
+
+var crc32cTable = crc32.MakeTable(crc32.Castagnoli)
+
+func Crc32cCheckSum(data []byte) uint32 {
+ return crc32.Checksum(data, crc32cTable)
+}
\ No newline at end of file
diff --git a/pulsar/impl/commands.go b/pulsar/impl/commands.go
index ed9f6b9..6b1c840 100644
--- a/pulsar/impl/commands.go
+++ b/pulsar/impl/commands.go
@@ -8,6 +8,8 @@ import (
const MaxFrameSize = 5 * 1024 * 1024
+const magicCrc32c uint16 = 0x0e01
+
func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand {
cmd := &pb.BaseCommand{
Type: &cmdType,
@@ -25,6 +27,8 @@ func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand
cmd.Ping = msg.(*pb.CommandPing)
case pb.BaseCommand_PONG:
cmd.Pong = msg.(*pb.CommandPong)
+ case pb.BaseCommand_SEND:
+ cmd.Send = msg.(*pb.CommandSend)
default:
log.Panic("Missing command type: ", cmdType)
}
@@ -32,6 +36,63 @@ func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand
return cmd
}
+func serializeSingleMessage(wb Buffer, smm *pb.SingleMessageMetadata, payload []byte) {
+ serialized, err := proto.Marshal(smm)
+ if err != nil {
+ log.WithError(err).Fatal("Protobuf serialization error")
+ }
+
+ wb.Write(serialized)
+ wb.Write(payload)
+}
+
+func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata *pb.MessageMetadata, payload []byte) {
+ // Wire format
+ // [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
+ cmdSize := proto.Size(cmdSend)
+ msgMetadataSize := proto.Size(msgMetadata)
+ payloadSize := len(payload)
+
+ magicAndChecksumLength := 2 + 4 /* magic + checksumLength */
+ headerContentSize := 4 + cmdSize + magicAndChecksumLength + 4 + msgMetadataSize
+ // cmdLength + cmdSize + magicLength + checksumSize + msgMetadataLength + msgMetadataSize
+ totalSize := headerContentSize + payloadSize
+
+ wb.WriteUint32(uint32(totalSize)) // External frame
+
+ // Write cmd
+ wb.WriteUint32(uint32(cmdSize))
+ serialized, err := proto.Marshal(cmdSend)
+ if err != nil {
+ log.WithError(err).Fatal("Protobuf error when serializing cmdSend")
+ }
+
+ wb.Write(serialized)
+
+ // Create checksum placeholder
+ wb.WriteUint16(magicCrc32c)
+ checksumIdx := wb.WriterIndex()
+ wb.WriteUint32(0) // skip 4 bytes of checksum
+
+ // Write metadata
+ metadataStartIdx := wb.WriterIndex()
+ wb.WriteUint32(uint32(msgMetadataSize))
+ serialized, err = proto.Marshal(msgMetadata)
+ if err != nil {
+ log.WithError(err).Fatal("Protobuf error when serializing msgMetadata")
+ }
+
+ wb.Write(serialized)
+ wb.Write(payload)
+
+ // Write checksum at created checksum-placeholder
+ endIdx := wb.WriterIndex()
+ checksum := Crc32cCheckSum(wb.Get(metadataStartIdx, endIdx-metadataStartIdx))
+
+ // set computed checksum
+ wb.PutUint32(checksum, checksumIdx)
+}
+
func ConvertFromStringMap(m map[string]string) []*pb.KeyValue {
list := make([]*pb.KeyValue, len(m))
diff --git a/pulsar/impl/connection.go b/pulsar/impl/connection.go
index 365f1b3..04fa10f 100644
--- a/pulsar/impl/connection.go
+++ b/pulsar/impl/connection.go
@@ -14,6 +14,7 @@ import (
type Connection interface {
SendRequest(requestId uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand))
+ WriteData(data []byte)
Close()
}
@@ -54,6 +55,7 @@ type connection struct {
requestIdGenerator uint64
incomingRequests chan *request
+ writeRequests chan []byte
pendingReqs map[uint64]*request
}
@@ -67,6 +69,9 @@ func newConnection(logicalAddr *url.URL, physicalAddr *url.URL) *connection {
pendingReqs: make(map[uint64]*request),
lastDataReceivedTime: time.Now(),
pingTicker: time.NewTicker(keepAliveInterval),
+
+ incomingRequests: make(chan *request),
+ writeRequests: make(chan []byte),
}
cnx.reader = newConnectionReader(cnx)
cnx.cond = sync.NewCond(cnx)
@@ -164,12 +169,27 @@ func (c *connection) run() {
c.pendingReqs[req.id] = req
c.writeCommand(req.cmd)
+ case data := <-c.writeRequests:
+ c.internalWriteData(data)
+
case _ = <-c.pingTicker.C:
c.sendPing()
}
}
}
+func (c *connection) WriteData(data []byte) {
+ c.writeRequests <- data
+}
+
+func (c *connection) internalWriteData(data []byte) {
+ c.log.Info("Write data: ", len(data))
+ if _, err := c.cnx.Write(data); err != nil {
+ c.log.WithError(err).Warn("Failed to write on connection")
+ c.Close()
+ }
+}
+
func (c *connection) writeCommand(cmd proto.Message) {
// Wire format
// [FRAME_SIZE] [CMD_SIZE][CMD]
@@ -190,11 +210,7 @@ func (c *connection) writeCommand(cmd proto.Message) {
}
c.writeBuffer.Write(serialized)
-
- if _, err := c.cnx.Write(c.writeBuffer.ReadableSlice()); err != nil {
- c.log.WithError(err).Warn("Failed to write on connection")
- c.Close()
- }
+ c.internalWriteData(c.writeBuffer.ReadableSlice())
}
func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []byte) {
@@ -230,6 +246,8 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []by
case pb.BaseCommand_CLOSE_PRODUCER:
case pb.BaseCommand_CLOSE_CONSUMER:
case pb.BaseCommand_SEND_RECEIPT:
+ c.log.Info("Got SEND_RECEIPT: ", cmd.GetSendReceipt())
+
case pb.BaseCommand_SEND_ERROR:
case pb.BaseCommand_MESSAGE:
@@ -246,13 +264,21 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []by
}
}
+func (c *connection) Write(data []byte) {
+ c.writeRequests <- data
+}
+
func (c *connection) SendRequest(requestId uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand)) {
- c.pendingReqs[requestId] = &request{
+ c.incomingRequests <- &request{
id: requestId,
cmd: req,
callback: callback,
}
- c.writeCommand(req)
+}
+
+func (c *connection) internalSendRequest(req *request) {
+ c.pendingReqs[req.id] = req
+ c.writeCommand(req.cmd)
}
func (c *connection) handleResponse(requestId uint64, response *pb.BaseCommand) {
@@ -299,6 +325,7 @@ func (c *connection) Close() {
c.cnx.Close()
c.pingTicker.Stop()
close(c.incomingRequests)
+ close(c.writeRequests)
c.cnx = nil
}
}
@@ -313,4 +340,3 @@ func (c *connection) changeState(state connectionState) {
func (c *connection) newRequestId() uint64 {
return atomic.AddUint64(&c.requestIdGenerator, 1)
}
-
diff --git a/pulsar/impl/utils.go b/pulsar/impl/utils.go
new file mode 100644
index 0000000..02d15b3
--- /dev/null
+++ b/pulsar/impl/utils.go
@@ -0,0 +1,20 @@
+package impl
+
+import (
+ "sync/atomic"
+ "time"
+)
+
+func TimestampMillis(t time.Time) uint64 {
+ return uint64(t.UnixNano()) / uint64(time.Millisecond)
+}
+
+// Perform atomic read and update
+func GetAndAdd(n *uint64, diff uint64) uint64 {
+ for {
+ v := *n
+ if atomic.CompareAndSwapUint64(n, v, v+diff) {
+ return v
+ }
+ }
+}
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index 7bb2165..2677770 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -2,10 +2,12 @@ package pulsar
import (
"context"
+ "github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
"pulsar-client-go-native/pulsar/impl"
pb "pulsar-client-go-native/pulsar/pulsar_proto"
"sync"
+ "time"
)
type partitionProducer struct {
@@ -16,21 +18,36 @@ type partitionProducer struct {
cond *sync.Cond
cnx impl.Connection
- producerName *string
- producerId uint64
+ options *ProducerOptions
+ producerName *string
+ producerId uint64
+ batchBuilder *impl.BatchBuilder
+ sequenceIdGenerator *uint64
+ batchFlushTicker *time.Ticker
// Channel where app is posting messages to be published
eventsChan chan interface{}
}
+const defaultBatchingMaxPublishDelay = 10 * time.Millisecond
+
func newPartitionProducer(client *client, topic string, options *ProducerOptions) (*partitionProducer, error) {
+ var batchingMaxPublishDelay time.Duration
+ if options.BatchingMaxPublishDelay != 0 {
+ batchingMaxPublishDelay = options.BatchingMaxPublishDelay
+ } else {
+ batchingMaxPublishDelay = defaultBatchingMaxPublishDelay
+ }
+
p := &partitionProducer{
- log: log.WithField("topic", topic),
- client: client,
- topic: topic,
- producerId: client.rpcClient.NewProducerId(),
- eventsChan: make(chan interface{}),
+ log: log.WithField("topic", topic),
+ client: client,
+ topic: topic,
+ options: options,
+ producerId: client.rpcClient.NewProducerId(),
+ eventsChan: make(chan interface{}),
+ batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
}
if options.Name != "" {
@@ -74,6 +91,13 @@ func (p *partitionProducer) grabCnx() error {
}
p.producerName = res.Response.ProducerSuccess.ProducerName
+ if p.batchBuilder == nil {
+ p.batchBuilder = impl.NewBatchBuilder(p.options.BatchingMaxMessages, *p.producerName, p.producerId)
+ }
+ if p.sequenceIdGenerator == nil {
+ nextSequenceId := uint64(res.Response.ProducerSuccess.GetLastSequenceId() + 1)
+ p.sequenceIdGenerator = &nextSequenceId
+ }
p.cnx = res.Cnx
p.log.WithField("cnx", res.Cnx).Debug("Connected producer")
return nil
@@ -81,11 +105,15 @@ func (p *partitionProducer) grabCnx() error {
func (p *partitionProducer) run() {
for {
- i := <-p.eventsChan
- switch v := i.(type) {
- case *sendRequest:
- p.log.Debug("Received send request: ", v)
- v.callback(nil, v.msg, nil)
+ select {
+ case i := <-p.eventsChan:
+ switch v := i.(type) {
+ case *sendRequest:
+ p.internalSend(v)
+ }
+
+ case _ = <-p.batchFlushTicker.C:
+ p.internalFlush()
}
}
}
@@ -98,6 +126,47 @@ func (p *partitionProducer) Name() string {
return *p.producerName
}
+func (p *partitionProducer) internalSend(request *sendRequest) {
+ p.log.Debug("Received send request: ", *request)
+
+ msg := request.msg
+
+ if msg.ReplicationClusters == nil {
+ smm := &pb.SingleMessageMetadata{
+ PayloadSize: proto.Int(len(msg.Payload)),
+ }
+
+ if msg.EventTime != nil {
+ smm.EventTime = proto.Uint64(impl.TimestampMillis(*msg.EventTime))
+ }
+
+ if msg.Key != "" {
+ smm.PartitionKey = &msg.Key
+ }
+
+ if msg.Properties != nil {
+ smm.Properties = impl.ConvertFromStringMap(msg.Properties)
+ }
+
+ sequenceId := impl.GetAndAdd(p.sequenceIdGenerator, 1)
+ for ; p.batchBuilder.Add(smm, sequenceId, msg.Payload) == false; {
+ // The current batch is full.. flush it and retry
+ p.internalFlush()
+ }
+ } else {
+ p.log.Panic("TODO: serialize into single message")
+ }
+}
+
+func (p *partitionProducer) internalFlush() {
+ batchData := p.batchBuilder.Flush()
+ if batchData == nil {
+ return
+ }
+
+ p.cnx.WriteData(batchData)
+}
+
func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) error {
wg := sync.WaitGroup{}
wg.Add(1)
@@ -117,7 +186,6 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) erro
wg.Wait()
return err
- return nil
}
type sendRequest struct {
diff --git a/pulsar/message.go b/pulsar/message.go
index 60dd989..12bc269 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -32,13 +32,13 @@ type ProducerMessage struct {
Properties map[string]string
// Set the event time for a given message
- EventTime time.Time
+ EventTime *time.Time
// Override the replication clusters for this message.
ReplicationClusters []string
// Set the sequence id to assign to the current message
- SequenceID int64
+ SequenceID *int64
}
type Message interface {