You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/05/24 09:32:42 UTC
[incubator-inlong] 02/03: Add license and construct context in each
rpc request.
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 501b6be59173072d4cfe27c1d97bc169af9feef9
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Mon May 24 14:31:02 2021 +0800
Add license and construct context in each rpc request.
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/config/config.go | 20 ++++++++++++++++++++
tubemq-client-twins/tubemq-client-go/rpc/broker.go | 22 +++++++++++++++++-----
tubemq-client-twins/tubemq-client-go/rpc/client.go | 6 +-----
tubemq-client-twins/tubemq-client-go/rpc/master.go | 15 ++++++++++++---
4 files changed, 50 insertions(+), 13 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/config/config.go b/tubemq-client-twins/tubemq-client-go/config/config.go
index 999d87a..f290f07 100644
--- a/tubemq-client-twins/tubemq-client-go/config/config.go
+++ b/tubemq-client-twins/tubemq-client-go/config/config.go
@@ -1,10 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// package config defines the all the TubeMQ configuration options.
package config
import (
"time"
)
+// Config defines multiple configuration options.
type Config struct {
+ // Net iis the namespace for network-level properties used by Broker and Master.
Net struct {
// How long to wait for a response.
ReadTimeout time.Duration
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/broker.go b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
index aa7b831..b2b614d 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/broker.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
@@ -18,6 +18,8 @@
package rpc
import (
+ "context"
+
"github.com/golang/protobuf/proto"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
@@ -84,7 +86,9 @@ func (c *rpcClient) RegisterRequestC2B(metadata *metadata.Metadata, sub *client.
Request: data,
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
- rsp, err := c.client.DoRequest(c.ctx, req)
+ ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+ defer cancel()
+ rsp, err := c.client.DoRequest(ctx, req)
if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
if v.ResponseException != nil {
return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
@@ -127,7 +131,9 @@ func (c *rpcClient) UnregisterRequestC2B(metadata metadata.Metadata, sub *client
Request: data,
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
- rsp, err := c.client.DoRequest(c.ctx, req)
+ ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+ defer cancel()
+ rsp, err := c.client.DoRequest(ctx, req)
if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
if v.ResponseException != nil {
return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
@@ -170,7 +176,9 @@ func (c *rpcClient) GetMessageRequestC2B(metadata *metadata.Metadata, sub *clien
Request: data,
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
- rsp, err := c.client.DoRequest(c.ctx, req)
+ ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+ defer cancel()
+ rsp, err := c.client.DoRequest(ctx, req)
if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
if v.ResponseException != nil {
return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
@@ -211,7 +219,9 @@ func (c *rpcClient) CommitOffsetRequestC2B(metadata *metadata.Metadata, sub *cli
Request: data,
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
- rsp, err := c.client.DoRequest(c.ctx, req)
+ ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+ defer cancel()
+ rsp, err := c.client.DoRequest(ctx, req)
if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
if v.ResponseException != nil {
return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
@@ -257,7 +267,9 @@ func (c *rpcClient) HeartbeatRequestC2B(metadata *metadata.Metadata, sub *client
req.RpcHeader = &protocol.RpcConnHeader{
Flag: proto.Int32(0),
}
- rsp, err := c.client.DoRequest(c.ctx, req)
+ ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+ defer cancel()
+ rsp, err := c.client.DoRequest(ctx, req)
if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
if v.ResponseException != nil {
return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/client.go b/tubemq-client-twins/tubemq-client-go/rpc/client.go
index 79a446e..46961cd 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/client.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/client.go
@@ -19,8 +19,6 @@
package rpc
import (
- "context"
-
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
@@ -57,16 +55,14 @@ type RPCClient interface {
type rpcClient struct {
pool *multiplexing.Pool
client *transport.Client
- ctx context.Context
config *config.Config
}
// New returns a default TubeMQ rpc Client
-func New(pool *multiplexing.Pool, opts *transport.Options, ctx context.Context, config *config.Config) RPCClient {
+func New(pool *multiplexing.Pool, opts *transport.Options, config *config.Config) RPCClient {
return &rpcClient{
pool: pool,
client: transport.New(opts, pool),
- ctx: ctx,
config: config,
}
}
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/master.go b/tubemq-client-twins/tubemq-client-go/rpc/master.go
index 4f9e7f9..38e2b68 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/master.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/master.go
@@ -18,6 +18,8 @@
package rpc
import (
+ "context"
+
"github.com/golang/protobuf/proto"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
@@ -89,7 +91,10 @@ func (c *rpcClient) RegisterRequestC2M(metadata *metadata.Metadata, sub *client.
Request: data,
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
- rsp, err := c.client.DoRequest(c.ctx, req)
+
+ ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+ defer cancel()
+ rsp, err := c.client.DoRequest(ctx, req)
if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
if v.ResponseException != nil {
return nil, errs.New(errs.RetResponseException, err.Error())
@@ -157,7 +162,9 @@ func (c *rpcClient) HeartRequestC2M(metadata *metadata.Metadata, sub *client.Sub
req.RpcHeader = &protocol.RpcConnHeader{
Flag: proto.Int32(0),
}
- rsp, err := c.client.DoRequest(c.ctx, req)
+ ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+ defer cancel()
+ rsp, err := c.client.DoRequest(ctx, req)
if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
if v.ResponseException != nil {
return nil, errs.New(errs.RetResponseException, err.Error())
@@ -196,7 +203,9 @@ func (c *rpcClient) CloseRequestC2M(metadata *metadata.Metadata, sub *client.Sub
req.RpcHeader = &protocol.RpcConnHeader{
Flag: proto.Int32(0),
}
- rsp, err := c.client.DoRequest(c.ctx, req)
+ ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+ defer cancel()
+ rsp, err := c.client.DoRequest(ctx, req)
if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
if v.ResponseException != nil {
return nil, errs.New(errs.RetResponseException, err.Error())