You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/07/08 21:17:51 UTC

[pulsar-client-go] branch master updated: Reduce the size of the MessageID structs by one word on 64-bit arch (#316)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fcaf26  Reduce the size of the MessageID structs by one word on 64-bit arch (#316)
6fcaf26 is described below

commit 6fcaf266480c5e7fe258a67d028cb5c69b835b39
Author: dferstay <df...@users.noreply.github.com>
AuthorDate: Wed Jul 8 14:17:41 2020 -0700

    Reduce the size of the MessageID structs by one word on 64-bit arch (#316)
    
    An int occupies one word of memory; on 64-bit machines, this is 8 bytes.
    
    As a result, the messageID struct is 56-bytes:
    * ledgerID - 8 bytes
    * entryID - 8 bytes
    * batchIdx - 8 bytes
    * partitionIdx - 8 bytes
    * tracker - 8 bytes
    * consumer - 16 bytes (1 word for type, 1 word for data address)
    
    This commit changes the type of batchIdx and partitionIdx fields to int32
    which saves one word of memory and maintains alignment of struct fields.
    
    Reducing the size of the MessageID structs is important as they are
    currently allocated on the heap for every message produced or consumed.
    
    Signed-off-by: Daniel Ferstay <df...@splunk.com>
    
    Co-authored-by: Daniel Ferstay <df...@splunk.com>
---
 pulsar/consumer_impl.go      |  2 +-
 pulsar/consumer_partition.go |  8 ++++----
 pulsar/impl_message.go       | 18 +++++++++---------
 pulsar/impl_message_test.go  |  4 ++--
 pulsar/producer_partition.go |  6 +++---
 5 files changed, 19 insertions(+), 19 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 0a3220a..20dc1af 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -459,7 +459,7 @@ func (c *consumer) messageID(msgID MessageID) (*messageID, bool) {
 		return nil, false
 	}
 
-	partition := mid.partitionIdx
+	partition := int(mid.partitionIdx)
 	// did we receive a valid partition index?
 	if partition < 0 || partition >= len(c.consumers) {
 		c.log.Warnf("invalid partition index %d expected a partition between [0-%d]",
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index b4498f6..cffc2b4 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -83,7 +83,7 @@ type partitionConsumer struct {
 	topic        string
 	name         string
 	consumerID   uint64
-	partitionIdx int
+	partitionIdx int32
 
 	// shared channel
 	messageCh chan ConsumerMessage
@@ -120,7 +120,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
 		topic:                options.topic,
 		name:                 options.consumerName,
 		consumerID:           client.rpcClient.NewConsumerID(),
-		partitionIdx:         options.partitionIdx,
+		partitionIdx:         int32(options.partitionIdx),
 		eventsCh:             make(chan interface{}, 3),
 		queueSize:            int32(options.receiverQueueSize),
 		queueCh:              make(chan []*message, options.receiverQueueSize),
@@ -400,7 +400,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		msgID := newTrackingMessageID(
 			int64(pbMsgID.GetLedgerId()),
 			int64(pbMsgID.GetEntryId()),
-			i,
+			int32(i),
 			pc.partitionIdx,
 			ackTracker)
 
@@ -923,7 +923,7 @@ func convertToMessageID(id *pb.MessageIdData) *messageID {
 	}
 
 	if id.BatchIndex != nil {
-		msgID.batchIdx = int(*id.BatchIndex)
+		msgID.batchIdx = *id.BatchIndex
 	}
 
 	return msgID
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 6aa4223..d9574cd 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -31,8 +31,8 @@ import (
 type messageID struct {
 	ledgerID     int64
 	entryID      int64
-	batchIdx     int
-	partitionIdx int
+	batchIdx     int32
+	partitionIdx int32
 
 	tracker  *ackTracker
 	consumer acker
@@ -56,7 +56,7 @@ func (id *messageID) Nack() {
 
 func (id *messageID) ack() bool {
 	if id.tracker != nil && id.batchIdx > -1 {
-		return id.tracker.ack(id.batchIdx)
+		return id.tracker.ack(int(id.batchIdx))
 	}
 	return true
 }
@@ -87,8 +87,8 @@ func (id *messageID) Serialize() []byte {
 	msgID := &pb.MessageIdData{
 		LedgerId:   proto.Uint64(uint64(id.ledgerID)),
 		EntryId:    proto.Uint64(uint64(id.entryID)),
-		BatchIndex: proto.Int(id.batchIdx),
-		Partition:  proto.Int(id.partitionIdx),
+		BatchIndex: proto.Int(int(id.batchIdx)),
+		Partition:  proto.Int(int(id.partitionIdx)),
 	}
 	data, _ := proto.Marshal(msgID)
 	return data
@@ -103,13 +103,13 @@ func deserializeMessageID(data []byte) (MessageID, error) {
 	id := newMessageID(
 		int64(msgID.GetLedgerId()),
 		int64(msgID.GetEntryId()),
-		int(msgID.GetBatchIndex()),
-		int(msgID.GetPartition()),
+		msgID.GetBatchIndex(),
+		msgID.GetPartition(),
 	)
 	return id, nil
 }
 
-func newMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx int) MessageID {
+func newMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx int32) MessageID {
 	return &messageID{
 		ledgerID:     ledgerID,
 		entryID:      entryID,
@@ -118,7 +118,7 @@ func newMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx int)
 	}
 }
 
-func newTrackingMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx int,
+func newTrackingMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx int32,
 	tracker *ackTracker) *messageID {
 	return &messageID{
 		ledgerID:     ledgerID,
diff --git a/pulsar/impl_message_test.go b/pulsar/impl_message_test.go
index 54c3bee..164cff6 100644
--- a/pulsar/impl_message_test.go
+++ b/pulsar/impl_message_test.go
@@ -33,8 +33,8 @@ func TestMessageId(t *testing.T) {
 
 	assert.Equal(t, int64(1), id2.(*messageID).ledgerID)
 	assert.Equal(t, int64(2), id2.(*messageID).entryID)
-	assert.Equal(t, 3, id2.(*messageID).batchIdx)
-	assert.Equal(t, 4, id2.(*messageID).partitionIdx)
+	assert.Equal(t, int32(3), id2.(*messageID).batchIdx)
+	assert.Equal(t, int32(4), id2.(*messageID).partitionIdx)
 
 	id, err = DeserializeMessageID(nil)
 	assert.Error(t, err)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index e4765fc..239fcd6 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -69,7 +69,7 @@ type partitionProducer struct {
 	pendingQueue     internal.BlockingQueue
 	lastSequenceID   int64
 
-	partitionIdx int
+	partitionIdx int32
 }
 
 func newPartitionProducer(client *client, topic string, options *ProducerOptions, partitionIdx int) (
@@ -105,7 +105,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
 		publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
 		pendingQueue:     internal.NewBlockingQueue(maxPendingMessages),
 		lastSequenceID:   -1,
-		partitionIdx:     partitionIdx,
+		partitionIdx:     int32(partitionIdx),
 	}
 
 	if options.Name != "" {
@@ -442,7 +442,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
 			msgID := newMessageID(
 				int64(response.MessageId.GetLedgerId()),
 				int64(response.MessageId.GetEntryId()),
-				idx,
+				int32(idx),
 				p.partitionIdx,
 			)
 			sr.callback(msgID, sr.msg, nil)