You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/11/11 17:37:10 UTC

[pulsar-client-go] branch master updated: Added a message id tracker for acking messages that are batched. (#82)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 1eb64f0  Added a message id tracker for acking messages that are batched. (#82)
1eb64f0 is described below

commit 1eb64f05d980d9d7e3b47a88351c53acd80796ef
Author: cckellogg <cc...@gmail.com>
AuthorDate: Mon Nov 11 09:37:04 2019 -0800

    Added a message id tracker for acking messages that are batched. (#82)
    
    * Added a message id tracker for acking messages that are batched.
    
    * Update ack tracker functions.
---
 pulsar/impl_message.go      | 77 ++++++++++++++++++++++++++++++++++++---------
 pulsar/impl_message_test.go | 57 +++++++++++++++++++++++++++++++++
 pulsar/message.go           | 19 ++++++-----
 3 files changed, 132 insertions(+), 21 deletions(-)

diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 38d372a..a8e079d 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -18,30 +18,31 @@
 package pulsar
 
 import (
+	"math/big"
+	"strings"
+	"sync"
 	"time"
 
-	"github.com/apache/pulsar-client-go/pkg/pb"
 	"github.com/golang/protobuf/proto"
-)
 
-func earliestMessageID() MessageID {
-	return newMessageID(-1, -1, -1, -1)
-}
+	"github.com/apache/pulsar-client-go/pkg/pb"
+)
 
 type messageID struct {
 	ledgerID     int64
 	entryID      int64
 	batchIdx     int
 	partitionIdx int
+
+	tracker *ackTracker
 }
 
-func newMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx int) MessageID {
-	return &messageID{
-		ledgerID:     ledgerID,
-		entryID:      entryID,
-		batchIdx:     batchIdx,
-		partitionIdx: partitionIdx,
+func (id *messageID) ack() bool {
+	if id.tracker != nil && id.batchIdx > -1 {
+		return id.tracker.ack(id.batchIdx)
 	}
+
+	return true
 }
 
 func (id *messageID) Serialize() []byte {
@@ -70,10 +71,24 @@ func deserializeMessageID(data []byte) (MessageID, error) {
 	return id, nil
 }
 
-const maxLong int64 = 0x7fffffffffffffff
+func newMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx int) MessageID {
+	return &messageID{
+		ledgerID:     ledgerID,
+		entryID:      entryID,
+		batchIdx:     batchIdx,
+		partitionIdx: partitionIdx,
+	}
+}
 
-func latestMessageID() MessageID {
-	return newMessageID(maxLong, maxLong, -1, -1)
+func newTrackingMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx int,
+	tracker *ackTracker) *messageID {
+	return &messageID{
+		ledgerID:     ledgerID,
+		entryID:      entryID,
+		batchIdx:     batchIdx,
+		partitionIdx: partitionIdx,
+		tracker:      tracker,
+	}
 }
 
 func timeFromUnixTimestampMillis(timestamp uint64) time.Time {
@@ -126,3 +141,37 @@ func (msg *message) EventTime() time.Time {
 func (msg *message) Key() string {
 	return msg.key
 }
+
+func newAckTracker(size int) *ackTracker {
+	var batchIDs *big.Int
+	if size <= 64 {
+		shift := uint32(64 - size)
+		setBits := ^uint64(0) >> shift
+		batchIDs = new(big.Int).SetUint64(setBits)
+	} else {
+		batchIDs, _ = new(big.Int).SetString(strings.Repeat("1", size), 2)
+	}
+	return &ackTracker{
+		size:     size,
+		batchIDs: batchIDs,
+	}
+}
+
+type ackTracker struct {
+	sync.Mutex
+	size     int
+	batchIDs *big.Int
+}
+
+func (t *ackTracker) ack(batchID int) bool {
+	t.Lock()
+	defer t.Unlock()
+	t.batchIDs = t.batchIDs.SetBit(t.batchIDs, batchID, 0)
+	return len(t.batchIDs.Bits()) == 0
+}
+
+func (t *ackTracker) completed() bool {
+	t.Lock()
+	defer t.Unlock()
+	return len(t.batchIDs.Bits()) == 0
+}
diff --git a/pulsar/impl_message_test.go b/pulsar/impl_message_test.go
index bd48495..e4ba3c1 100644
--- a/pulsar/impl_message_test.go
+++ b/pulsar/impl_message_test.go
@@ -44,3 +44,60 @@ func TestMessageId(t *testing.T) {
 	assert.Error(t, err)
 	assert.Nil(t, id)
 }
+
+func TestAckTracker(t *testing.T) {
+	tracker := newAckTracker(1)
+	assert.Equal(t, true, tracker.ack(0))
+
+	// test 64
+	tracker = newAckTracker(64)
+	for i := 0; i < 64; i++ {
+		if i < 63 {
+			assert.Equal(t, false, tracker.ack(i))
+		} else {
+			assert.Equal(t, true, tracker.ack(i))
+		}
+	}
+	assert.Equal(t, true, tracker.completed())
+
+	// test large number 1000
+	tracker = newAckTracker(1000)
+	for i := 0; i < 1000; i++ {
+		if i < 999 {
+			assert.Equal(t, false, tracker.ack(i))
+		} else {
+			assert.Equal(t, true, tracker.ack(i))
+		}
+
+	}
+	assert.Equal(t, true, tracker.completed())
+}
+
+func TestAckingMessageIDBatchOne(t *testing.T) {
+	tracker := newAckTracker(1)
+	msgId := newTrackingMessageID(1, 1, 0, 0, tracker)
+	assert.Equal(t, true, msgId.ack())
+	assert.Equal(t, true, tracker.completed())
+}
+
+func TestAckingMessageIDBatchTwo(t *testing.T) {
+	tracker := newAckTracker(2)
+	ids := []*messageID{
+		newTrackingMessageID(1, 1, 0, 0, tracker),
+		newTrackingMessageID(1, 1, 1, 0, tracker),
+	}
+
+	assert.Equal(t, false, ids[0].ack())
+	assert.Equal(t, true, ids[1].ack())
+	assert.Equal(t, true, tracker.completed())
+
+	// try reverse order
+	tracker = newAckTracker(2)
+	ids = []*messageID{
+		newTrackingMessageID(1, 1, 0, 0, tracker),
+		newTrackingMessageID(1, 1, 1, 0, tracker),
+	}
+	assert.Equal(t, false, ids[1].ack())
+	assert.Equal(t, true, ids[0].ack())
+	assert.Equal(t, true, tracker.completed())
+}
diff --git a/pulsar/message.go b/pulsar/message.go
index a70827d..dd4fcff 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -17,7 +17,10 @@
 
 package pulsar
 
-import "time"
+import (
+	"math"
+	"time"
+)
 
 // ProducerMessage abstraction used in Pulsar producer
 type ProducerMessage struct {
@@ -79,10 +82,12 @@ func DeserializeMessageID(data []byte) (MessageID, error) {
 	return deserializeMessageID(data)
 }
 
-var (
-	// EarliestMessage messageID that points to the earliest message available in a topic
-	EarliestMessage = earliestMessageID()
+// EarliestMessageID returns a messageID that points to the earliest message available in a topic
+func EarliestMessageID() MessageID {
+	return newMessageID(-1, -1, -1, -1)
+}
 
-	// LatestMessage messageID that points to the latest message
-	LatestMessage = latestMessageID()
-)
+// LatestMessage returns a messageID that points to the latest message
+func LatestMessageID() MessageID {
+	return newMessageID(math.MaxInt64, math.MaxInt64, -1, -1)
+}