You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/24 22:20:24 UTC
[pulsar-client-go] branch master updated: Added semaphore
implementation with lower contention (#298)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new cc08fd6 Added semaphore implementation with lower contention (#298)
cc08fd6 is described below
commit cc08fd61530a78dfd258acc5c23bfc79f42e98d2
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Jun 24 15:20:14 2020 -0700
Added semaphore implementation with lower contention (#298)
---
pulsar/internal/semaphore.go | 92 +++++++++++++++++++++++++++++++--------
pulsar/internal/semaphore_test.go | 60 +++++++++++++++++++++++++
pulsar/producer_partition.go | 11 +----
3 files changed, 135 insertions(+), 28 deletions(-)
diff --git a/pulsar/internal/semaphore.go b/pulsar/internal/semaphore.go
index e08995c..a34497f 100644
--- a/pulsar/internal/semaphore.go
+++ b/pulsar/internal/semaphore.go
@@ -17,27 +17,81 @@
package internal
-// Semaphore is a channel of bool, used to receive a bool type semaphore.
-type Semaphore chan bool
+import (
+ "sync/atomic"
-// Acquire a permit from this semaphore, blocking until one is available.
+ log "github.com/sirupsen/logrus"
+)
-// Acquire a permit, if one is available and returns immediately,
-// reducing the number of available permits by one.
-func (s Semaphore) Acquire() {
- s <- true
+type Semaphore interface {
+ // Acquire a permit, if one is available and returns immediately,
+ // reducing the number of available permits by one.
+ Acquire()
+
+ // Try to acquire a permit. The method will return immediately
+ // with a `true` if it was possible to acquire a permit and
+ // `false` otherwise.
+ TryAcquire() bool
+
+ // Release a permit, returning it to the semaphore.
+ // Release a permit, increasing the number of available permits by
+ // one. If any threads are trying to acquire a permit, then one is
+ // selected and given the permit that was just released. That thread
+ // is (re)enabled for thread scheduling purposes.
+ // There is no requirement that a thread that releases a permit must
+ // have acquired that permit by calling Acquire().
+ // Correct usage of a semaphore is established by programming convention
+ // in the application.
+ Release()
+}
+
+type semaphore struct {
+ maxPermits int32
+ permits int32
+ ch chan bool
+}
+
+func NewSemaphore(maxPermits int32) Semaphore {
+ if maxPermits <= 0 {
+ log.Fatal("Max permits for semaphore needs to be > 0")
+ }
+
+ return &semaphore{
+ maxPermits: maxPermits,
+ permits: 0,
+ ch: make(chan bool),
+ }
+}
+
+func (s *semaphore) Acquire() {
+ permits := atomic.AddInt32(&s.permits, 1)
+ if permits <= s.maxPermits {
+ return
+ }
+
+ // Block on the channel until a new permit is available
+ <-s.ch
+}
+
+func (s *semaphore) TryAcquire() bool {
+ for {
+ currentPermits := atomic.LoadInt32(&s.permits)
+ if currentPermits >= s.maxPermits {
+ // All the permits are already exhausted
+ return false
+ }
+
+ if atomic.CompareAndSwapInt32(&s.permits, currentPermits, currentPermits+1) {
+ // Successfully incremented counter
+ return true
+ }
+ }
}
-// Release a permit, returning it to the semaphore.
-
-// Release a permit, increasing the number of available permits by
-// one. If any threads are trying to acquire a permit, then one is
-// selected and given the permit that was just released. That thread
-// is (re)enabled for thread scheduling purposes.
-// There is no requirement that a thread that releases a permit must
-// have acquired that permit by calling Acquire().
-// Correct usage of a semaphore is established by programming convention
-// in the application.
-func (s Semaphore) Release() {
- <-s
+func (s *semaphore) Release() {
+ permits := atomic.AddInt32(&s.permits, -1)
+ if permits >= s.maxPermits {
+ // Unblock the next in line to acquire the semaphore
+ s.ch <- true
+ }
}
diff --git a/pulsar/internal/semaphore_test.go b/pulsar/internal/semaphore_test.go
new file mode 100644
index 0000000..0de69fc
--- /dev/null
+++ b/pulsar/internal/semaphore_test.go
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestSemaphore(t *testing.T) {
+ s := NewSemaphore(3)
+
+ const n = 10
+
+ wg := sync.WaitGroup{}
+ wg.Add(n)
+
+ for i := 0; i < n; i++ {
+ go func() {
+ s.Acquire()
+ time.Sleep(100 * time.Millisecond)
+ s.Release()
+ wg.Done()
+ }()
+ }
+
+ wg.Wait()
+}
+
+func TestSemaphore_TryAcquire(t *testing.T) {
+ s := NewSemaphore(1)
+
+ s.Acquire()
+
+ assert.False(t, s.TryAcquire())
+
+ s.Release()
+
+ assert.True(t, s.TryAcquire())
+ assert.False(t, s.TryAcquire())
+ s.Release()
+}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 38eb87f..18bb9a3 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -96,7 +96,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
producerID: client.rpcClient.NewProducerID(),
eventsChan: make(chan interface{}, maxPendingMessages),
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
- publishSemaphore: make(internal.Semaphore, maxPendingMessages),
+ publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
pendingQueue: internal.NewBlockingQueue(maxPendingMessages),
lastSequenceID: -1,
partitionIdx: partitionIdx,
@@ -387,14 +387,7 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (Mes
func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error)) {
- p.publishSemaphore.Acquire()
- sr := &sendRequest{
- ctx: ctx,
- msg: msg,
- callback: callback,
- flushImmediately: false,
- }
- p.eventsChan <- sr
+ p.internalSendAsync(ctx, msg, callback, false)
}
func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,