You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/12/11 18:22:47 UTC
[pulsar-client-go] branch master updated: [Issue
118][client_impl.go] fix race condition when accessing client.handlers
(#119)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new dfc17ab [Issue 118][client_impl.go] fix race condition when accessing client.handlers (#119)
dfc17ab is described below
commit dfc17abd753ac2e488021aede74433e681b5df58
Author: Jim Lambert <ji...@bose.com>
AuthorDate: Wed Dec 11 13:22:40 2019 -0500
[Issue 118][client_impl.go] fix race condition when accessing client.handlers (#119)
* fix race condition when access client.handlers
* added newline to statisfy lic test
* refactor Set(Closable, bool) to Add(Closable) for a more explicit
---
pulsar/client_impl.go | 12 ++++----
pulsar/internal/client_handlers.go | 52 +++++++++++++++++++++++++++++++++
pulsar/internal/client_handlers_test.go | 46 +++++++++++++++++++++++++++++
3 files changed, 103 insertions(+), 7 deletions(-)
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 9cb3442..d0c36fe 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -39,7 +39,7 @@ type client struct {
lookupService internal.LookupService
auth auth.Provider
- handlers map[internal.Closable]bool
+ handlers internal.ClientHandlers
producerIDGenerator uint64
consumerIDGenerator uint64
}
@@ -86,14 +86,14 @@ func newClient(options ClientOptions) (Client, error) {
}
c.rpcClient = internal.NewRPCClient(url, c.cnxPool)
c.lookupService = internal.NewLookupService(c.rpcClient, url)
- c.handlers = make(map[internal.Closable]bool)
+ c.handlers = internal.NewClientHandlers()
return c, nil
}
func (c *client) CreateProducer(options ProducerOptions) (Producer, error) {
producer, err := newProducer(c, &options)
if err == nil {
- c.handlers[producer] = true
+ c.handlers.Add(producer)
}
return producer, err
}
@@ -103,7 +103,7 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
if err != nil {
return nil, err
}
- c.handlers[consumer] = true
+ c.handlers.Add(consumer)
return consumer, nil
}
@@ -145,9 +145,7 @@ func (c *client) TopicPartitions(topic string) ([]string, error) {
}
func (c *client) Close() {
- for handler := range c.handlers {
- handler.Close()
- }
+ c.handlers.Close()
}
func (c *client) namespaceTopics(namespace string) ([]string, error) {
diff --git a/pulsar/internal/client_handlers.go b/pulsar/internal/client_handlers.go
new file mode 100644
index 0000000..1ecfdd9
--- /dev/null
+++ b/pulsar/internal/client_handlers.go
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import "sync"
+
+// ClientHandlerMap is a simple concurrent-safe map for the client type
+type ClientHandlers struct {
+ handlers map[Closable]bool
+ l *sync.RWMutex
+}
+
+func NewClientHandlers() ClientHandlers {
+ return ClientHandlers{
+ handlers: map[Closable]bool{},
+ l: &sync.RWMutex{},
+ }
+}
+func (h *ClientHandlers) Add(c Closable) {
+ h.l.Lock()
+ defer h.l.Unlock()
+ h.handlers[c] = true
+}
+func (h *ClientHandlers) Val(c Closable) bool {
+ h.l.RLock()
+ defer h.l.RUnlock()
+ return h.handlers[c]
+}
+
+func (h *ClientHandlers) Close() {
+ h.l.Lock()
+ defer h.l.Unlock()
+
+ for handler := range h.handlers {
+ handler.Close()
+ }
+}
diff --git a/pulsar/internal/client_handlers_test.go b/pulsar/internal/client_handlers_test.go
new file mode 100644
index 0000000..baaf899
--- /dev/null
+++ b/pulsar/internal/client_handlers_test.go
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestClientHandlers(t *testing.T) {
+ h := NewClientHandlers()
+ assert.NotNil(t, h.l)
+ assert.Equal(t, h.handlers, map[Closable]bool{})
+
+ closable := &testClosable{false}
+ h.Add(closable)
+ assert.True(t, h.Val(closable))
+
+ h.Close()
+ t.Log("closable is: ", closable.closed)
+ assert.True(t, closable.closed)
+}
+
+type testClosable struct {
+ closed bool
+}
+
+func (t *testClosable) Close() {
+ t.closed = true
+}