You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/24 22:20:24 UTC

[pulsar-client-go] branch master updated: Added semaphore implementation with lower contention (#298)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new cc08fd6  Added semaphore implementation with lower contention (#298)
cc08fd6 is described below

commit cc08fd61530a78dfd258acc5c23bfc79f42e98d2
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Jun 24 15:20:14 2020 -0700

    Added semaphore implementation with lower contention (#298)
---
 pulsar/internal/semaphore.go      | 92 +++++++++++++++++++++++++++++++--------
 pulsar/internal/semaphore_test.go | 60 +++++++++++++++++++++++++
 pulsar/producer_partition.go      | 11 +----
 3 files changed, 135 insertions(+), 28 deletions(-)

diff --git a/pulsar/internal/semaphore.go b/pulsar/internal/semaphore.go
index e08995c..a34497f 100644
--- a/pulsar/internal/semaphore.go
+++ b/pulsar/internal/semaphore.go
@@ -17,27 +17,81 @@
 
 package internal
 
-// Semaphore is a channel of bool, used to receive a bool type semaphore.
-type Semaphore chan bool
+import (
+	"sync/atomic"
 
-// Acquire a permit from this semaphore, blocking until one is available.
+	log "github.com/sirupsen/logrus"
+)
 
-// Acquire a permit, if one is available and returns immediately,
-// reducing the number of available permits by one.
-func (s Semaphore) Acquire() {
-	s <- true
+type Semaphore interface {
+	// Acquire a permit, if one is available and returns immediately,
+	// reducing the number of available permits by one.
+	Acquire()
+
+	// Try to acquire a permit. The method will return immediately
+	// with a `true` if it was possible to acquire a permit and
+	// `false` otherwise.
+	TryAcquire() bool
+
+	// Release a permit, returning it to the semaphore.
+	// Release a permit, increasing the number of available permits by
+	// one.  If any threads are trying to acquire a permit, then one is
+	// selected and given the permit that was just released.  That thread
+	// is (re)enabled for thread scheduling purposes.
+	// There is no requirement that a thread that releases a permit must
+	// have acquired that permit by calling Acquire().
+	// Correct usage of a semaphore is established by programming convention
+	// in the application.
+	Release()
+}
+
+type semaphore struct {
+	maxPermits int32
+	permits    int32
+	ch         chan bool
+}
+
+func NewSemaphore(maxPermits int32) Semaphore {
+	if maxPermits <= 0 {
+		log.Fatal("Max permits for semaphore needs to be > 0")
+	}
+
+	return &semaphore{
+		maxPermits: maxPermits,
+		permits:    0,
+		ch:         make(chan bool),
+	}
+}
+
+func (s *semaphore) Acquire() {
+	permits := atomic.AddInt32(&s.permits, 1)
+	if permits <= s.maxPermits {
+		return
+	}
+
+	// Block on the channel until a new permit is available
+	<-s.ch
+}
+
+func (s *semaphore) TryAcquire() bool {
+	for {
+		currentPermits := atomic.LoadInt32(&s.permits)
+		if currentPermits >= s.maxPermits {
+			// All the permits are already exhausted
+			return false
+		}
+
+		if atomic.CompareAndSwapInt32(&s.permits, currentPermits, currentPermits+1) {
+			// Successfully incremented counter
+			return true
+		}
+	}
 }
 
-// Release a permit, returning it to the semaphore.
-
-// Release a permit, increasing the number of available permits by
-// one.  If any threads are trying to acquire a permit, then one is
-// selected and given the permit that was just released.  That thread
-// is (re)enabled for thread scheduling purposes.
-// There is no requirement that a thread that releases a permit must
-// have acquired that permit by calling Acquire().
-// Correct usage of a semaphore is established by programming convention
-// in the application.
-func (s Semaphore) Release() {
-	<-s
+func (s *semaphore) Release() {
+	permits := atomic.AddInt32(&s.permits, -1)
+	if permits >= s.maxPermits {
+		// Unblock the next in line to acquire the semaphore
+		s.ch <- true
+	}
 }
diff --git a/pulsar/internal/semaphore_test.go b/pulsar/internal/semaphore_test.go
new file mode 100644
index 0000000..0de69fc
--- /dev/null
+++ b/pulsar/internal/semaphore_test.go
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestSemaphore(t *testing.T) {
+	s := NewSemaphore(3)
+
+	const n = 10
+
+	wg := sync.WaitGroup{}
+	wg.Add(n)
+
+	for i := 0; i < n; i++ {
+		go func() {
+			s.Acquire()
+			time.Sleep(100 * time.Millisecond)
+			s.Release()
+			wg.Done()
+		}()
+	}
+
+	wg.Wait()
+}
+
+func TestSemaphore_TryAcquire(t *testing.T) {
+	s := NewSemaphore(1)
+
+	s.Acquire()
+
+	assert.False(t, s.TryAcquire())
+
+	s.Release()
+
+	assert.True(t, s.TryAcquire())
+	assert.False(t, s.TryAcquire())
+	s.Release()
+}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 38eb87f..18bb9a3 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -96,7 +96,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
 		producerID:       client.rpcClient.NewProducerID(),
 		eventsChan:       make(chan interface{}, maxPendingMessages),
 		batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
-		publishSemaphore: make(internal.Semaphore, maxPendingMessages),
+		publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
 		pendingQueue:     internal.NewBlockingQueue(maxPendingMessages),
 		lastSequenceID:   -1,
 		partitionIdx:     partitionIdx,
@@ -387,14 +387,7 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (Mes
 
 func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,
 	callback func(MessageID, *ProducerMessage, error)) {
-	p.publishSemaphore.Acquire()
-	sr := &sendRequest{
-		ctx:              ctx,
-		msg:              msg,
-		callback:         callback,
-		flushImmediately: false,
-	}
-	p.eventsChan <- sr
+	p.internalSendAsync(ctx, msg, callback, false)
 }
 
 func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,