You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/11/11 17:37:10 UTC
[pulsar-client-go] branch master updated: Added a message id
tracker for acking messages that are batched. (#82)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 1eb64f0 Added a message id tracker for acking messages that are batched. (#82)
1eb64f0 is described below
commit 1eb64f05d980d9d7e3b47a88351c53acd80796ef
Author: cckellogg <cc...@gmail.com>
AuthorDate: Mon Nov 11 09:37:04 2019 -0800
Added a message id tracker for acking messages that are batched. (#82)
* Added a message id tracker for acking messages that are batched.
* Update ack tracker functions.
---
pulsar/impl_message.go | 77 ++++++++++++++++++++++++++++++++++++---------
pulsar/impl_message_test.go | 57 +++++++++++++++++++++++++++++++++
pulsar/message.go | 19 ++++++-----
3 files changed, 132 insertions(+), 21 deletions(-)
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 38d372a..a8e079d 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -18,30 +18,31 @@
package pulsar
import (
+ "math/big"
+ "strings"
+ "sync"
"time"
- "github.com/apache/pulsar-client-go/pkg/pb"
"github.com/golang/protobuf/proto"
-)
-func earliestMessageID() MessageID {
- return newMessageID(-1, -1, -1, -1)
-}
+ "github.com/apache/pulsar-client-go/pkg/pb"
+)
type messageID struct {
ledgerID int64
entryID int64
batchIdx int
partitionIdx int
+
+ tracker *ackTracker
}
-func newMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx int) MessageID {
- return &messageID{
- ledgerID: ledgerID,
- entryID: entryID,
- batchIdx: batchIdx,
- partitionIdx: partitionIdx,
+func (id *messageID) ack() bool {
+ if id.tracker != nil && id.batchIdx > -1 {
+ return id.tracker.ack(id.batchIdx)
}
+
+ return true
}
func (id *messageID) Serialize() []byte {
@@ -70,10 +71,24 @@ func deserializeMessageID(data []byte) (MessageID, error) {
return id, nil
}
-const maxLong int64 = 0x7fffffffffffffff
+func newMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx int) MessageID {
+ return &messageID{
+ ledgerID: ledgerID,
+ entryID: entryID,
+ batchIdx: batchIdx,
+ partitionIdx: partitionIdx,
+ }
+}
-func latestMessageID() MessageID {
- return newMessageID(maxLong, maxLong, -1, -1)
+func newTrackingMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx int,
+ tracker *ackTracker) *messageID {
+ return &messageID{
+ ledgerID: ledgerID,
+ entryID: entryID,
+ batchIdx: batchIdx,
+ partitionIdx: partitionIdx,
+ tracker: tracker,
+ }
}
func timeFromUnixTimestampMillis(timestamp uint64) time.Time {
@@ -126,3 +141,37 @@ func (msg *message) EventTime() time.Time {
func (msg *message) Key() string {
return msg.key
}
+
+func newAckTracker(size int) *ackTracker {
+ var batchIDs *big.Int
+ if size <= 64 {
+ shift := uint32(64 - size)
+ setBits := ^uint64(0) >> shift
+ batchIDs = new(big.Int).SetUint64(setBits)
+ } else {
+ batchIDs, _ = new(big.Int).SetString(strings.Repeat("1", size), 2)
+ }
+ return &ackTracker{
+ size: size,
+ batchIDs: batchIDs,
+ }
+}
+
+type ackTracker struct {
+ sync.Mutex
+ size int
+ batchIDs *big.Int
+}
+
+func (t *ackTracker) ack(batchID int) bool {
+ t.Lock()
+ defer t.Unlock()
+ t.batchIDs = t.batchIDs.SetBit(t.batchIDs, batchID, 0)
+ return len(t.batchIDs.Bits()) == 0
+}
+
+func (t *ackTracker) completed() bool {
+ t.Lock()
+ defer t.Unlock()
+ return len(t.batchIDs.Bits()) == 0
+}
diff --git a/pulsar/impl_message_test.go b/pulsar/impl_message_test.go
index bd48495..e4ba3c1 100644
--- a/pulsar/impl_message_test.go
+++ b/pulsar/impl_message_test.go
@@ -44,3 +44,60 @@ func TestMessageId(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, id)
}
+
+func TestAckTracker(t *testing.T) {
+ tracker := newAckTracker(1)
+ assert.Equal(t, true, tracker.ack(0))
+
+ // test 64
+ tracker = newAckTracker(64)
+ for i := 0; i < 64; i++ {
+ if i < 63 {
+ assert.Equal(t, false, tracker.ack(i))
+ } else {
+ assert.Equal(t, true, tracker.ack(i))
+ }
+ }
+ assert.Equal(t, true, tracker.completed())
+
+ // test large number 1000
+ tracker = newAckTracker(1000)
+ for i := 0; i < 1000; i++ {
+ if i < 999 {
+ assert.Equal(t, false, tracker.ack(i))
+ } else {
+ assert.Equal(t, true, tracker.ack(i))
+ }
+
+ }
+ assert.Equal(t, true, tracker.completed())
+}
+
+func TestAckingMessageIDBatchOne(t *testing.T) {
+ tracker := newAckTracker(1)
+ msgId := newTrackingMessageID(1, 1, 0, 0, tracker)
+ assert.Equal(t, true, msgId.ack())
+ assert.Equal(t, true, tracker.completed())
+}
+
+func TestAckingMessageIDBatchTwo(t *testing.T) {
+ tracker := newAckTracker(2)
+ ids := []*messageID{
+ newTrackingMessageID(1, 1, 0, 0, tracker),
+ newTrackingMessageID(1, 1, 1, 0, tracker),
+ }
+
+ assert.Equal(t, false, ids[0].ack())
+ assert.Equal(t, true, ids[1].ack())
+ assert.Equal(t, true, tracker.completed())
+
+ // try reverse order
+ tracker = newAckTracker(2)
+ ids = []*messageID{
+ newTrackingMessageID(1, 1, 0, 0, tracker),
+ newTrackingMessageID(1, 1, 1, 0, tracker),
+ }
+ assert.Equal(t, false, ids[1].ack())
+ assert.Equal(t, true, ids[0].ack())
+ assert.Equal(t, true, tracker.completed())
+}
diff --git a/pulsar/message.go b/pulsar/message.go
index a70827d..dd4fcff 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -17,7 +17,10 @@
package pulsar
-import "time"
+import (
+ "math"
+ "time"
+)
// ProducerMessage abstraction used in Pulsar producer
type ProducerMessage struct {
@@ -79,10 +82,12 @@ func DeserializeMessageID(data []byte) (MessageID, error) {
return deserializeMessageID(data)
}
-var (
- // EarliestMessage messageID that points to the earliest message available in a topic
- EarliestMessage = earliestMessageID()
+// EarliestMessageID returns a messageID that points to the earliest message available in a topic
+func EarliestMessageID() MessageID {
+ return newMessageID(-1, -1, -1, -1)
+}
- // LatestMessage messageID that points to the latest message
- LatestMessage = latestMessageID()
-)
+// LatestMessage returns a messageID that points to the latest message
+func LatestMessageID() MessageID {
+ return newMessageID(math.MaxInt64, math.MaxInt64, -1, -1)
+}