You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/03/30 08:15:14 UTC
[rocketmq-clients] branch master updated: Golang fix data races (#428)
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 3a7adcb9 Golang fix data races (#428)
3a7adcb9 is described below
commit 3a7adcb942b9e9410a12fe475aa11ae758636077
Author: Paweł Biegun <69...@users.noreply.github.com>
AuthorDate: Thu Mar 30 10:15:08 2023 +0200
Golang fix data races (#428)
* fix memory safety in rpc client
* use atmoic bool instead of bool in DefaultClientMeter
* use atomic bool as validateMessageType in producer options
* use atomic into for maxBodySizeBytes in producerOptions
* fix data race for seconds in message id codec
* fix type mismatch in publishing_message.go
* verify that there is no data race in defaultClientMeterProvider.reset
* add license to metric_test.go
---
golang/message_id_codec.go | 10 ++++++----
golang/metric.go | 14 ++++++++------
golang/metric_test.go | 36 ++++++++++++++++++++++++++++++++++++
golang/producer.go | 6 ++++--
golang/producer_options.go | 12 +++++++-----
golang/publishing_message.go | 2 +-
golang/rpc_client.go | 36 ++++++++++++++++++++++++++++++++----
7 files changed, 94 insertions(+), 22 deletions(-)
diff --git a/golang/message_id_codec.go b/golang/message_id_codec.go
index 579b5132..99b04447 100644
--- a/golang/message_id_codec.go
+++ b/golang/message_id_codec.go
@@ -27,6 +27,8 @@ import (
"sync/atomic"
"time"
+ uberatomic "go.uber.org/atomic"
+
"github.com/apache/rocketmq-clients/golang/v5/pkg/utils"
)
@@ -83,7 +85,7 @@ var (
processFixedStringV1 string
secondsSinceCustomEpoch int64
secondsStartTimestamp int64
- seconds int64
+ seconds uberatomic.Int64
sequence int32
)
@@ -113,7 +115,7 @@ func init() {
secondsSinceCustomEpoch = time.Now().Unix() - time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC).Unix()
// TODO Implement System.nanoTime() in golang, see https://github.com/golang/go/issues/16658
secondsStartTimestamp = time.Now().Unix()
- seconds = deltaSeconds()
+ seconds.Store(deltaSeconds())
sequence = -1
@@ -135,8 +137,8 @@ func (mic *messageIdCodec) NextMessageId() MessageId {
var buffer bytes.Buffer
deltaSeconds := deltaSeconds()
- if seconds != deltaSeconds {
- seconds = deltaSeconds
+ if seconds.Load() != deltaSeconds {
+ seconds.Store(deltaSeconds)
}
if err := binary.Write(&buffer, binary.BigEndian, uint32(deltaSeconds)); err != nil {
diff --git a/golang/metric.go b/golang/metric.go
index 1be579fc..e7f4a156 100644
--- a/golang/metric.go
+++ b/golang/metric.go
@@ -22,6 +22,8 @@ import (
"sync"
"time"
+ "go.uber.org/atomic"
+
"contrib.go.opencensus.io/exporter/ocagent"
"github.com/apache/rocketmq-clients/golang/v5/pkg/utils"
v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
@@ -63,14 +65,14 @@ func init() {
}
type defaultClientMeter struct {
- enabled bool
+ enabled atomic.Bool
endpoints *v2.Endpoints
ocaExporter view.Exporter
mutex sync.Mutex
}
func (dcm *defaultClientMeter) shutdown() {
- if !dcm.enabled {
+ if !dcm.enabled.Load() {
return
}
dcm.mutex.Lock()
@@ -88,7 +90,7 @@ func (dcm *defaultClientMeter) shutdown() {
}
func (dcm *defaultClientMeter) start() {
- if !dcm.enabled {
+ if !dcm.enabled.Load() {
return
}
view.RegisterExporter(dcm.ocaExporter)
@@ -96,7 +98,7 @@ func (dcm *defaultClientMeter) start() {
var NewDefaultClientMeter = func(exporter view.Exporter, on bool, endpoints *v2.Endpoints, clientID string) *defaultClientMeter {
return &defaultClientMeter{
- enabled: on,
+ enabled: *atomic.NewBool(on),
endpoints: endpoints,
ocaExporter: exporter,
}
@@ -163,7 +165,7 @@ func (dmmi *defaultMessageMeterInterceptor) doAfter(messageHookPoints MessageHoo
return nil
}
func (dcmp *defaultClientMeterProvider) isEnabled() bool {
- return dcmp.clientMeter.enabled
+ return dcmp.clientMeter.enabled.Load()
}
func (dcmp *defaultClientMeterProvider) getClientID() string {
return dcmp.client.GetClientID()
@@ -172,7 +174,7 @@ func (dcmp *defaultClientMeterProvider) Reset(metric *v2.Metric) {
dcmp.globalMutex.Lock()
defer dcmp.globalMutex.Unlock()
endpoints := metric.GetEndpoints()
- if dcmp.clientMeter.enabled && metric.GetOn() && utils.CompareEndpoints(dcmp.clientMeter.endpoints, endpoints) {
+ if dcmp.clientMeter.enabled.Load() && metric.GetOn() && utils.CompareEndpoints(dcmp.clientMeter.endpoints, endpoints) {
sugarBaseLogger.Infof("metric settings is satisfied by the current message meter, clientId=%s", dcmp.client.GetClientID())
return
}
diff --git a/golang/metric_test.go b/golang/metric_test.go
new file mode 100644
index 00000000..6dd0b7a3
--- /dev/null
+++ b/golang/metric_test.go
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package golang
+
+import (
+ "testing"
+
+ v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
+)
+
+// This test is designed to verify there is no data race in dcmp.Reset
+func TestDefaultClientMeterProviderResetNoDataRace(t *testing.T) {
+ cli := BuildCLient(t)
+ metric := &v2.Metric{On: false, Endpoints: cli.accessPoint}
+
+ for i := 0; i < 5; i++ {
+ go func() {
+ cli.clientMeterProvider.Reset(metric)
+ }()
+ }
+}
diff --git a/golang/producer.go b/golang/producer.go
index 54e9412f..f2fbf590 100644
--- a/golang/producer.go
+++ b/golang/producer.go
@@ -24,6 +24,8 @@ import (
"sync"
"time"
+ "go.uber.org/atomic"
+
"github.com/apache/rocketmq-clients/golang/v5/pkg/utils"
v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
"google.golang.org/protobuf/types/known/durationpb"
@@ -148,8 +150,8 @@ var NewProducer = func(config *Config, opts ...ProducerOption) (Producer, error)
},
},
requestTimeout: p.cli.opts.timeout,
- validateMessageType: true,
- maxBodySizeBytes: 4 * 1024 * 1024,
+ validateMessageType: *atomic.NewBool(true),
+ maxBodySizeBytes: *atomic.NewInt32(4 * 1024 * 1024),
}
for _, topic := range po.topics {
topicResource := &v2.Resource{
diff --git a/golang/producer_options.go b/golang/producer_options.go
index fd9ba29a..640f1e30 100644
--- a/golang/producer_options.go
+++ b/golang/producer_options.go
@@ -22,6 +22,8 @@ import (
"sync"
"time"
+ "go.uber.org/atomic"
+
v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
"google.golang.org/protobuf/types/known/durationpb"
)
@@ -94,8 +96,8 @@ type producerSettings struct {
clientType v2.ClientType
retryPolicy *v2.RetryPolicy
requestTimeout time.Duration
- validateMessageType bool
- maxBodySizeBytes int
+ validateMessageType atomic.Bool
+ maxBodySizeBytes atomic.Int32
}
func (ps *producerSettings) GetClientID() string {
@@ -114,7 +116,7 @@ func (ps *producerSettings) GetRequestTimeout() time.Duration {
return ps.requestTimeout
}
func (ps *producerSettings) IsValidateMessageType() bool {
- return ps.validateMessageType
+ return ps.validateMessageType.Load()
}
func (ps *producerSettings) toProtobuf() *v2.Settings {
@@ -160,8 +162,8 @@ func (ps *producerSettings) applySettingsCommand(settings *v2.Settings) error {
}
}
}
- ps.validateMessageType = v.Publishing.GetValidateMessageType()
- ps.maxBodySizeBytes = int(v.Publishing.GetMaxBodySize())
+ ps.validateMessageType.Store(v.Publishing.GetValidateMessageType())
+ ps.maxBodySizeBytes.Store(v.Publishing.GetMaxBodySize())
return nil
}
diff --git a/golang/publishing_message.go b/golang/publishing_message.go
index dbf8d2fe..456202ab 100644
--- a/golang/publishing_message.go
+++ b/golang/publishing_message.go
@@ -41,7 +41,7 @@ var NewPublishingMessage = func(msg *Message, settings *producerSettings, txEnab
msg: msg,
}
- maxBodySizeBytes := settings.maxBodySizeBytes
+ maxBodySizeBytes := int(settings.maxBodySizeBytes.Load())
length := len(msg.Body)
if length > maxBodySizeBytes {
diff --git a/golang/rpc_client.go b/golang/rpc_client.go
index 6a593b37..12a78e4f 100644
--- a/golang/rpc_client.go
+++ b/golang/rpc_client.go
@@ -22,7 +22,6 @@ import (
"errors"
"fmt"
"sync"
- "sync/atomic"
"time"
v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
@@ -52,7 +51,6 @@ var _ = RpcClient(&rpcClient{})
type rpcClient struct {
opts rpcClientOptions
- queue atomic.Value
mux sync.Mutex
conn ClientConn
msc v2.MessagingServiceClient
@@ -60,6 +58,14 @@ type rpcClient struct {
activityNanoTime time.Time
}
+/*
+ * Memory safety:
+ * - target, opts, msc are never mutated after creation so it can be read concurrently without locking the mutex
+ * - activityNanoTime is read and written to across threads so the mutex must be locked before read or write operations
+ * - conn is autogenerated and designed to be accessed concurrently but closing the connection is not atomic (I think so I haven't been able to confirm it)
+ * so let's assume to access or mutate it locking the mutex is required.
+ */
+
var NewRpcClient = func(target string, opts ...RpcClientOption) (RpcClient, error) {
rc := &rpcClient{
target: target,
@@ -84,25 +90,35 @@ func (rc *rpcClient) GetTarget() string {
}
func (rc *rpcClient) idleDuration() time.Duration {
- return time.Since(rc.activityNanoTime)
+ rc.mux.Lock()
+ duration := time.Since(rc.activityNanoTime)
+ rc.mux.Unlock()
+ return duration
}
func (rc *rpcClient) Close() {}
func (rc *rpcClient) GracefulStop() error {
+ rc.mux.Lock()
sugarBaseLogger.Warnf("close rpc client, target=%s", rc.target)
- return rc.conn.Close()
+ closeResult := rc.conn.Close()
+ rc.mux.Lock()
+ return closeResult
}
func (rc *rpcClient) QueryRoute(ctx context.Context, request *v2.QueryRouteRequest) (*v2.QueryRouteResponse, error) {
+ rc.mux.Lock()
rc.activityNanoTime = time.Now()
+ rc.mux.Unlock()
resp, err := rc.msc.QueryRoute(ctx, request)
sugarBaseLogger.Debugf("queryRoute request: %v, response: %v, err: %v", request, resp, err)
return resp, err
}
func (rc *rpcClient) SendMessage(ctx context.Context, request *v2.SendMessageRequest) (*v2.SendMessageResponse, error) {
+ rc.mux.Lock()
rc.activityNanoTime = time.Now()
+ rc.mux.Unlock()
resp, err := rc.msc.SendMessage(ctx, request)
sugarBaseLogger.Debugf("sendMessage request: %v, response: %v, err: %v", request, resp, err)
return resp, err
@@ -113,42 +129,54 @@ func (rc *rpcClient) Telemetry(ctx context.Context) (v2.MessagingService_Telemet
}
func (rc *rpcClient) EndTransaction(ctx context.Context, request *v2.EndTransactionRequest) (*v2.EndTransactionResponse, error) {
+ rc.mux.Lock()
rc.activityNanoTime = time.Now()
+ rc.mux.Unlock()
resp, err := rc.msc.EndTransaction(ctx, request)
sugarBaseLogger.Debugf("endTransaction request: %v, response: %v, err: %v", request, resp, err)
return resp, err
}
func (rc *rpcClient) HeartBeat(ctx context.Context, request *v2.HeartbeatRequest) (*v2.HeartbeatResponse, error) {
+ rc.mux.Lock()
rc.activityNanoTime = time.Now()
+ rc.mux.Unlock()
resp, err := rc.msc.Heartbeat(ctx, request)
sugarBaseLogger.Debugf("heartBeat request: %v, response: %v, err: %v", request, resp, err)
return resp, err
}
func (rc *rpcClient) NotifyClientTermination(ctx context.Context, request *v2.NotifyClientTerminationRequest) (*v2.NotifyClientTerminationResponse, error) {
+ rc.mux.Lock()
rc.activityNanoTime = time.Now()
+ rc.mux.Unlock()
resp, err := rc.msc.NotifyClientTermination(ctx, request)
sugarBaseLogger.Debugf("notifyClientTermination request: %v, response: %v, err: %v", request, resp, err)
return resp, err
}
func (rc *rpcClient) ReceiveMessage(ctx context.Context, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error) {
+ rc.mux.Lock()
rc.activityNanoTime = time.Now()
+ rc.mux.Unlock()
resp, err := rc.msc.ReceiveMessage(ctx, request)
sugarBaseLogger.Debugf("receiveMessage request: %v, err: %v", request, err)
return resp, err
}
func (rc *rpcClient) AckMessage(ctx context.Context, request *v2.AckMessageRequest) (*v2.AckMessageResponse, error) {
+ rc.mux.Lock()
rc.activityNanoTime = time.Now()
+ rc.mux.Unlock()
resp, err := rc.msc.AckMessage(ctx, request)
sugarBaseLogger.Debugf("ackMessage request: %v, response: %v, err: %v", request, resp, err)
return resp, err
}
func (rc *rpcClient) ChangeInvisibleDuration(ctx context.Context, request *v2.ChangeInvisibleDurationRequest) (*v2.ChangeInvisibleDurationResponse, error) {
+ rc.mux.Lock()
rc.activityNanoTime = time.Now()
+ rc.mux.Unlock()
resp, err := rc.msc.ChangeInvisibleDuration(ctx, request)
sugarBaseLogger.Debugf("changeInvisibleDuration request: %v, response: %v, err: %v", request, resp, err)
return resp, err