You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/03/30 08:15:14 UTC

[rocketmq-clients] branch master updated: Golang fix data races (#428)

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a7adcb9 Golang fix data races (#428)
3a7adcb9 is described below

commit 3a7adcb942b9e9410a12fe475aa11ae758636077
Author: Paweł Biegun <69...@users.noreply.github.com>
AuthorDate: Thu Mar 30 10:15:08 2023 +0200

    Golang fix data races (#428)
    
    * fix memory safety in rpc client
    
    * use atmoic bool instead of bool in DefaultClientMeter
    
    * use atomic bool as validateMessageType in producer options
    
    * use atomic into for maxBodySizeBytes in producerOptions
    
    * fix data race for seconds in message id codec
    
    * fix type mismatch in publishing_message.go
    
    * verify that there is no data race in defaultClientMeterProvider.reset
    
    * add license to metric_test.go
---
 golang/message_id_codec.go   | 10 ++++++----
 golang/metric.go             | 14 ++++++++------
 golang/metric_test.go        | 36 ++++++++++++++++++++++++++++++++++++
 golang/producer.go           |  6 ++++--
 golang/producer_options.go   | 12 +++++++-----
 golang/publishing_message.go |  2 +-
 golang/rpc_client.go         | 36 ++++++++++++++++++++++++++++++++----
 7 files changed, 94 insertions(+), 22 deletions(-)

diff --git a/golang/message_id_codec.go b/golang/message_id_codec.go
index 579b5132..99b04447 100644
--- a/golang/message_id_codec.go
+++ b/golang/message_id_codec.go
@@ -27,6 +27,8 @@ import (
 	"sync/atomic"
 	"time"
 
+	uberatomic "go.uber.org/atomic"
+
 	"github.com/apache/rocketmq-clients/golang/v5/pkg/utils"
 )
 
@@ -83,7 +85,7 @@ var (
 	processFixedStringV1    string
 	secondsSinceCustomEpoch int64
 	secondsStartTimestamp   int64
-	seconds                 int64
+	seconds                 uberatomic.Int64
 	sequence                int32
 )
 
@@ -113,7 +115,7 @@ func init() {
 	secondsSinceCustomEpoch = time.Now().Unix() - time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC).Unix()
 	// TODO Implement System.nanoTime() in golang, see https://github.com/golang/go/issues/16658
 	secondsStartTimestamp = time.Now().Unix()
-	seconds = deltaSeconds()
+	seconds.Store(deltaSeconds())
 
 	sequence = -1
 
@@ -135,8 +137,8 @@ func (mic *messageIdCodec) NextMessageId() MessageId {
 	var buffer bytes.Buffer
 
 	deltaSeconds := deltaSeconds()
-	if seconds != deltaSeconds {
-		seconds = deltaSeconds
+	if seconds.Load() != deltaSeconds {
+		seconds.Store(deltaSeconds)
 	}
 
 	if err := binary.Write(&buffer, binary.BigEndian, uint32(deltaSeconds)); err != nil {
diff --git a/golang/metric.go b/golang/metric.go
index 1be579fc..e7f4a156 100644
--- a/golang/metric.go
+++ b/golang/metric.go
@@ -22,6 +22,8 @@ import (
 	"sync"
 	"time"
 
+	"go.uber.org/atomic"
+
 	"contrib.go.opencensus.io/exporter/ocagent"
 	"github.com/apache/rocketmq-clients/golang/v5/pkg/utils"
 	v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
@@ -63,14 +65,14 @@ func init() {
 }
 
 type defaultClientMeter struct {
-	enabled     bool
+	enabled     atomic.Bool
 	endpoints   *v2.Endpoints
 	ocaExporter view.Exporter
 	mutex       sync.Mutex
 }
 
 func (dcm *defaultClientMeter) shutdown() {
-	if !dcm.enabled {
+	if !dcm.enabled.Load() {
 		return
 	}
 	dcm.mutex.Lock()
@@ -88,7 +90,7 @@ func (dcm *defaultClientMeter) shutdown() {
 }
 
 func (dcm *defaultClientMeter) start() {
-	if !dcm.enabled {
+	if !dcm.enabled.Load() {
 		return
 	}
 	view.RegisterExporter(dcm.ocaExporter)
@@ -96,7 +98,7 @@ func (dcm *defaultClientMeter) start() {
 
 var NewDefaultClientMeter = func(exporter view.Exporter, on bool, endpoints *v2.Endpoints, clientID string) *defaultClientMeter {
 	return &defaultClientMeter{
-		enabled:     on,
+		enabled:     *atomic.NewBool(on),
 		endpoints:   endpoints,
 		ocaExporter: exporter,
 	}
@@ -163,7 +165,7 @@ func (dmmi *defaultMessageMeterInterceptor) doAfter(messageHookPoints MessageHoo
 	return nil
 }
 func (dcmp *defaultClientMeterProvider) isEnabled() bool {
-	return dcmp.clientMeter.enabled
+	return dcmp.clientMeter.enabled.Load()
 }
 func (dcmp *defaultClientMeterProvider) getClientID() string {
 	return dcmp.client.GetClientID()
@@ -172,7 +174,7 @@ func (dcmp *defaultClientMeterProvider) Reset(metric *v2.Metric) {
 	dcmp.globalMutex.Lock()
 	defer dcmp.globalMutex.Unlock()
 	endpoints := metric.GetEndpoints()
-	if dcmp.clientMeter.enabled && metric.GetOn() && utils.CompareEndpoints(dcmp.clientMeter.endpoints, endpoints) {
+	if dcmp.clientMeter.enabled.Load() && metric.GetOn() && utils.CompareEndpoints(dcmp.clientMeter.endpoints, endpoints) {
 		sugarBaseLogger.Infof("metric settings is satisfied by the current message meter, clientId=%s", dcmp.client.GetClientID())
 		return
 	}
diff --git a/golang/metric_test.go b/golang/metric_test.go
new file mode 100644
index 00000000..6dd0b7a3
--- /dev/null
+++ b/golang/metric_test.go
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package golang
+
+import (
+	"testing"
+
+	v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
+)
+
+// This test is designed to verify there is no data race in dcmp.Reset
+func TestDefaultClientMeterProviderResetNoDataRace(t *testing.T) {
+	cli := BuildCLient(t)
+	metric := &v2.Metric{On: false, Endpoints: cli.accessPoint}
+
+	for i := 0; i < 5; i++ {
+		go func() {
+			cli.clientMeterProvider.Reset(metric)
+		}()
+	}
+}
diff --git a/golang/producer.go b/golang/producer.go
index 54e9412f..f2fbf590 100644
--- a/golang/producer.go
+++ b/golang/producer.go
@@ -24,6 +24,8 @@ import (
 	"sync"
 	"time"
 
+	"go.uber.org/atomic"
+
 	"github.com/apache/rocketmq-clients/golang/v5/pkg/utils"
 	v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
 	"google.golang.org/protobuf/types/known/durationpb"
@@ -148,8 +150,8 @@ var NewProducer = func(config *Config, opts ...ProducerOption) (Producer, error)
 			},
 		},
 		requestTimeout:      p.cli.opts.timeout,
-		validateMessageType: true,
-		maxBodySizeBytes:    4 * 1024 * 1024,
+		validateMessageType: *atomic.NewBool(true),
+		maxBodySizeBytes:    *atomic.NewInt32(4 * 1024 * 1024),
 	}
 	for _, topic := range po.topics {
 		topicResource := &v2.Resource{
diff --git a/golang/producer_options.go b/golang/producer_options.go
index fd9ba29a..640f1e30 100644
--- a/golang/producer_options.go
+++ b/golang/producer_options.go
@@ -22,6 +22,8 @@ import (
 	"sync"
 	"time"
 
+	"go.uber.org/atomic"
+
 	v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
 	"google.golang.org/protobuf/types/known/durationpb"
 )
@@ -94,8 +96,8 @@ type producerSettings struct {
 	clientType          v2.ClientType
 	retryPolicy         *v2.RetryPolicy
 	requestTimeout      time.Duration
-	validateMessageType bool
-	maxBodySizeBytes    int
+	validateMessageType atomic.Bool
+	maxBodySizeBytes    atomic.Int32
 }
 
 func (ps *producerSettings) GetClientID() string {
@@ -114,7 +116,7 @@ func (ps *producerSettings) GetRequestTimeout() time.Duration {
 	return ps.requestTimeout
 }
 func (ps *producerSettings) IsValidateMessageType() bool {
-	return ps.validateMessageType
+	return ps.validateMessageType.Load()
 }
 
 func (ps *producerSettings) toProtobuf() *v2.Settings {
@@ -160,8 +162,8 @@ func (ps *producerSettings) applySettingsCommand(settings *v2.Settings) error {
 			}
 		}
 	}
-	ps.validateMessageType = v.Publishing.GetValidateMessageType()
-	ps.maxBodySizeBytes = int(v.Publishing.GetMaxBodySize())
+	ps.validateMessageType.Store(v.Publishing.GetValidateMessageType())
+	ps.maxBodySizeBytes.Store(v.Publishing.GetMaxBodySize())
 
 	return nil
 }
diff --git a/golang/publishing_message.go b/golang/publishing_message.go
index dbf8d2fe..456202ab 100644
--- a/golang/publishing_message.go
+++ b/golang/publishing_message.go
@@ -41,7 +41,7 @@ var NewPublishingMessage = func(msg *Message, settings *producerSettings, txEnab
 		msg: msg,
 	}
 
-	maxBodySizeBytes := settings.maxBodySizeBytes
+	maxBodySizeBytes := int(settings.maxBodySizeBytes.Load())
 
 	length := len(msg.Body)
 	if length > maxBodySizeBytes {
diff --git a/golang/rpc_client.go b/golang/rpc_client.go
index 6a593b37..12a78e4f 100644
--- a/golang/rpc_client.go
+++ b/golang/rpc_client.go
@@ -22,7 +22,6 @@ import (
 	"errors"
 	"fmt"
 	"sync"
-	"sync/atomic"
 	"time"
 
 	v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
@@ -52,7 +51,6 @@ var _ = RpcClient(&rpcClient{})
 
 type rpcClient struct {
 	opts             rpcClientOptions
-	queue            atomic.Value
 	mux              sync.Mutex
 	conn             ClientConn
 	msc              v2.MessagingServiceClient
@@ -60,6 +58,14 @@ type rpcClient struct {
 	activityNanoTime time.Time
 }
 
+/*
+ * Memory safety:
+ * - target, opts, msc are never mutated after creation so it can be read concurrently without locking the mutex
+ * - activityNanoTime is read and written to across threads so the mutex must be locked before read or write operations
+ * - conn is autogenerated and designed to be accessed concurrently but closing the connection is not atomic (I think so I haven't been able to confirm it)
+ *   so let's assume to access or mutate it locking the mutex is required.
+ */
+
 var NewRpcClient = func(target string, opts ...RpcClientOption) (RpcClient, error) {
 	rc := &rpcClient{
 		target: target,
@@ -84,25 +90,35 @@ func (rc *rpcClient) GetTarget() string {
 }
 
 func (rc *rpcClient) idleDuration() time.Duration {
-	return time.Since(rc.activityNanoTime)
+	rc.mux.Lock()
+	duration := time.Since(rc.activityNanoTime)
+	rc.mux.Unlock()
+	return duration
 }
 
 func (rc *rpcClient) Close() {}
 
 func (rc *rpcClient) GracefulStop() error {
+	rc.mux.Lock()
 	sugarBaseLogger.Warnf("close rpc client, target=%s", rc.target)
-	return rc.conn.Close()
+	closeResult := rc.conn.Close()
+	rc.mux.Lock()
+	return closeResult
 }
 
 func (rc *rpcClient) QueryRoute(ctx context.Context, request *v2.QueryRouteRequest) (*v2.QueryRouteResponse, error) {
+	rc.mux.Lock()
 	rc.activityNanoTime = time.Now()
+	rc.mux.Unlock()
 	resp, err := rc.msc.QueryRoute(ctx, request)
 	sugarBaseLogger.Debugf("queryRoute request: %v, response: %v, err: %v", request, resp, err)
 	return resp, err
 }
 
 func (rc *rpcClient) SendMessage(ctx context.Context, request *v2.SendMessageRequest) (*v2.SendMessageResponse, error) {
+	rc.mux.Lock()
 	rc.activityNanoTime = time.Now()
+	rc.mux.Unlock()
 	resp, err := rc.msc.SendMessage(ctx, request)
 	sugarBaseLogger.Debugf("sendMessage request: %v, response: %v, err: %v", request, resp, err)
 	return resp, err
@@ -113,42 +129,54 @@ func (rc *rpcClient) Telemetry(ctx context.Context) (v2.MessagingService_Telemet
 }
 
 func (rc *rpcClient) EndTransaction(ctx context.Context, request *v2.EndTransactionRequest) (*v2.EndTransactionResponse, error) {
+	rc.mux.Lock()
 	rc.activityNanoTime = time.Now()
+	rc.mux.Unlock()
 	resp, err := rc.msc.EndTransaction(ctx, request)
 	sugarBaseLogger.Debugf("endTransaction request: %v, response: %v, err: %v", request, resp, err)
 	return resp, err
 }
 
 func (rc *rpcClient) HeartBeat(ctx context.Context, request *v2.HeartbeatRequest) (*v2.HeartbeatResponse, error) {
+	rc.mux.Lock()
 	rc.activityNanoTime = time.Now()
+	rc.mux.Unlock()
 	resp, err := rc.msc.Heartbeat(ctx, request)
 	sugarBaseLogger.Debugf("heartBeat request: %v, response: %v, err: %v", request, resp, err)
 	return resp, err
 }
 
 func (rc *rpcClient) NotifyClientTermination(ctx context.Context, request *v2.NotifyClientTerminationRequest) (*v2.NotifyClientTerminationResponse, error) {
+	rc.mux.Lock()
 	rc.activityNanoTime = time.Now()
+	rc.mux.Unlock()
 	resp, err := rc.msc.NotifyClientTermination(ctx, request)
 	sugarBaseLogger.Debugf("notifyClientTermination request: %v, response: %v, err: %v", request, resp, err)
 	return resp, err
 }
 
 func (rc *rpcClient) ReceiveMessage(ctx context.Context, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error) {
+	rc.mux.Lock()
 	rc.activityNanoTime = time.Now()
+	rc.mux.Unlock()
 	resp, err := rc.msc.ReceiveMessage(ctx, request)
 	sugarBaseLogger.Debugf("receiveMessage request: %v, err: %v", request, err)
 	return resp, err
 }
 
 func (rc *rpcClient) AckMessage(ctx context.Context, request *v2.AckMessageRequest) (*v2.AckMessageResponse, error) {
+	rc.mux.Lock()
 	rc.activityNanoTime = time.Now()
+	rc.mux.Unlock()
 	resp, err := rc.msc.AckMessage(ctx, request)
 	sugarBaseLogger.Debugf("ackMessage request: %v, response: %v, err: %v", request, resp, err)
 	return resp, err
 }
 
 func (rc *rpcClient) ChangeInvisibleDuration(ctx context.Context, request *v2.ChangeInvisibleDurationRequest) (*v2.ChangeInvisibleDurationResponse, error) {
+	rc.mux.Lock()
 	rc.activityNanoTime = time.Now()
+	rc.mux.Unlock()
 	resp, err := rc.msc.ChangeInvisibleDuration(ctx, request)
 	sugarBaseLogger.Debugf("changeInvisibleDuration request: %v, response: %v, err: %v", request, resp, err)
 	return resp, err