You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/12/11 18:22:47 UTC

[pulsar-client-go] branch master updated: [Issue 118][client_impl.go] fix race condition when accessing client.handlers (#119)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new dfc17ab  [Issue 118][client_impl.go] fix race condition when accessing client.handlers (#119)
dfc17ab is described below

commit dfc17abd753ac2e488021aede74433e681b5df58
Author: Jim Lambert <ji...@bose.com>
AuthorDate: Wed Dec 11 13:22:40 2019 -0500

    [Issue 118][client_impl.go] fix race condition when accessing client.handlers (#119)
    
    * fix race condition when access client.handlers
    
    * added newline to statisfy lic test
    
    * refactor Set(Closable, bool) to Add(Closable) for a more explicit
---
 pulsar/client_impl.go                   | 12 ++++----
 pulsar/internal/client_handlers.go      | 52 +++++++++++++++++++++++++++++++++
 pulsar/internal/client_handlers_test.go | 46 +++++++++++++++++++++++++++++
 3 files changed, 103 insertions(+), 7 deletions(-)

diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 9cb3442..d0c36fe 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -39,7 +39,7 @@ type client struct {
 	lookupService internal.LookupService
 	auth          auth.Provider
 
-	handlers            map[internal.Closable]bool
+	handlers            internal.ClientHandlers
 	producerIDGenerator uint64
 	consumerIDGenerator uint64
 }
@@ -86,14 +86,14 @@ func newClient(options ClientOptions) (Client, error) {
 	}
 	c.rpcClient = internal.NewRPCClient(url, c.cnxPool)
 	c.lookupService = internal.NewLookupService(c.rpcClient, url)
-	c.handlers = make(map[internal.Closable]bool)
+	c.handlers = internal.NewClientHandlers()
 	return c, nil
 }
 
 func (c *client) CreateProducer(options ProducerOptions) (Producer, error) {
 	producer, err := newProducer(c, &options)
 	if err == nil {
-		c.handlers[producer] = true
+		c.handlers.Add(producer)
 	}
 	return producer, err
 }
@@ -103,7 +103,7 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
 	if err != nil {
 		return nil, err
 	}
-	c.handlers[consumer] = true
+	c.handlers.Add(consumer)
 	return consumer, nil
 }
 
@@ -145,9 +145,7 @@ func (c *client) TopicPartitions(topic string) ([]string, error) {
 }
 
 func (c *client) Close() {
-	for handler := range c.handlers {
-		handler.Close()
-	}
+	c.handlers.Close()
 }
 
 func (c *client) namespaceTopics(namespace string) ([]string, error) {
diff --git a/pulsar/internal/client_handlers.go b/pulsar/internal/client_handlers.go
new file mode 100644
index 0000000..1ecfdd9
--- /dev/null
+++ b/pulsar/internal/client_handlers.go
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import "sync"
+
+// ClientHandlerMap is a simple concurrent-safe map for the client type
+type ClientHandlers struct {
+	handlers map[Closable]bool
+	l        *sync.RWMutex
+}
+
+func NewClientHandlers() ClientHandlers {
+	return ClientHandlers{
+		handlers: map[Closable]bool{},
+		l:        &sync.RWMutex{},
+	}
+}
+func (h *ClientHandlers) Add(c Closable) {
+	h.l.Lock()
+	defer h.l.Unlock()
+	h.handlers[c] = true
+}
+func (h *ClientHandlers) Val(c Closable) bool {
+	h.l.RLock()
+	defer h.l.RUnlock()
+	return h.handlers[c]
+}
+
+func (h *ClientHandlers) Close() {
+	h.l.Lock()
+	defer h.l.Unlock()
+
+	for handler := range h.handlers {
+		handler.Close()
+	}
+}
diff --git a/pulsar/internal/client_handlers_test.go b/pulsar/internal/client_handlers_test.go
new file mode 100644
index 0000000..baaf899
--- /dev/null
+++ b/pulsar/internal/client_handlers_test.go
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestClientHandlers(t *testing.T) {
+	h := NewClientHandlers()
+	assert.NotNil(t, h.l)
+	assert.Equal(t, h.handlers, map[Closable]bool{})
+
+	closable := &testClosable{false}
+	h.Add(closable)
+	assert.True(t, h.Val(closable))
+
+	h.Close()
+	t.Log("closable is: ", closable.closed)
+	assert.True(t, closable.closed)
+}
+
+type testClosable struct {
+	closed bool
+}
+
+func (t *testClosable) Close() {
+	t.closed = true
+}