You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/05/24 09:32:42 UTC

[incubator-inlong] 02/03: Add license and construct context in each rpc request.

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 501b6be59173072d4cfe27c1d97bc169af9feef9
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Mon May 24 14:31:02 2021 +0800

    Add license and construct context in each rpc request.
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/config/config.go              | 20 ++++++++++++++++++++
 tubemq-client-twins/tubemq-client-go/rpc/broker.go | 22 +++++++++++++++++-----
 tubemq-client-twins/tubemq-client-go/rpc/client.go |  6 +-----
 tubemq-client-twins/tubemq-client-go/rpc/master.go | 15 ++++++++++++---
 4 files changed, 50 insertions(+), 13 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/config/config.go b/tubemq-client-twins/tubemq-client-go/config/config.go
index 999d87a..f290f07 100644
--- a/tubemq-client-twins/tubemq-client-go/config/config.go
+++ b/tubemq-client-twins/tubemq-client-go/config/config.go
@@ -1,10 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// package config defines the all the TubeMQ configuration options.
 package config
 
 import (
 	"time"
 )
 
+// Config defines multiple configuration options.
 type Config struct {
+	// Net iis the namespace for network-level properties used by Broker and Master.
 	Net struct {
 		// How long to wait for a response.
 		ReadTimeout time.Duration
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/broker.go b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
index aa7b831..b2b614d 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/broker.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
@@ -18,6 +18,8 @@
 package rpc
 
 import (
+	"context"
+
 	"github.com/golang/protobuf/proto"
 
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
@@ -84,7 +86,9 @@ func (c *rpcClient) RegisterRequestC2B(metadata *metadata.Metadata, sub *client.
 		Request: data,
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
-	rsp, err := c.client.DoRequest(c.ctx, req)
+	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+	defer cancel()
+	rsp, err := c.client.DoRequest(ctx, req)
 	if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
 		if v.ResponseException != nil {
 			return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
@@ -127,7 +131,9 @@ func (c *rpcClient) UnregisterRequestC2B(metadata metadata.Metadata, sub *client
 		Request: data,
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
-	rsp, err := c.client.DoRequest(c.ctx, req)
+	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+	defer cancel()
+	rsp, err := c.client.DoRequest(ctx, req)
 	if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
 		if v.ResponseException != nil {
 			return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
@@ -170,7 +176,9 @@ func (c *rpcClient) GetMessageRequestC2B(metadata *metadata.Metadata, sub *clien
 		Request: data,
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
-	rsp, err := c.client.DoRequest(c.ctx, req)
+	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+	defer cancel()
+	rsp, err := c.client.DoRequest(ctx, req)
 	if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
 		if v.ResponseException != nil {
 			return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
@@ -211,7 +219,9 @@ func (c *rpcClient) CommitOffsetRequestC2B(metadata *metadata.Metadata, sub *cli
 		Request: data,
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
-	rsp, err := c.client.DoRequest(c.ctx, req)
+	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+	defer cancel()
+	rsp, err := c.client.DoRequest(ctx, req)
 	if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
 		if v.ResponseException != nil {
 			return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
@@ -257,7 +267,9 @@ func (c *rpcClient) HeartbeatRequestC2B(metadata *metadata.Metadata, sub *client
 	req.RpcHeader = &protocol.RpcConnHeader{
 		Flag: proto.Int32(0),
 	}
-	rsp, err := c.client.DoRequest(c.ctx, req)
+	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+	defer cancel()
+	rsp, err := c.client.DoRequest(ctx, req)
 	if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
 		if v.ResponseException != nil {
 			return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/client.go b/tubemq-client-twins/tubemq-client-go/rpc/client.go
index 79a446e..46961cd 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/client.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/client.go
@@ -19,8 +19,6 @@
 package rpc
 
 import (
-	"context"
-
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
@@ -57,16 +55,14 @@ type RPCClient interface {
 type rpcClient struct {
 	pool   *multiplexing.Pool
 	client *transport.Client
-	ctx    context.Context
 	config *config.Config
 }
 
 // New returns a default TubeMQ rpc Client
-func New(pool *multiplexing.Pool, opts *transport.Options, ctx context.Context, config *config.Config) RPCClient {
+func New(pool *multiplexing.Pool, opts *transport.Options, config *config.Config) RPCClient {
 	return &rpcClient{
 		pool:   pool,
 		client: transport.New(opts, pool),
-		ctx:    ctx,
 		config: config,
 	}
 }
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/master.go b/tubemq-client-twins/tubemq-client-go/rpc/master.go
index 4f9e7f9..38e2b68 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/master.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/master.go
@@ -18,6 +18,8 @@
 package rpc
 
 import (
+	"context"
+
 	"github.com/golang/protobuf/proto"
 
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
@@ -89,7 +91,10 @@ func (c *rpcClient) RegisterRequestC2M(metadata *metadata.Metadata, sub *client.
 		Request: data,
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
-	rsp, err := c.client.DoRequest(c.ctx, req)
+
+	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+	defer cancel()
+	rsp, err := c.client.DoRequest(ctx, req)
 	if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
 		if v.ResponseException != nil {
 			return nil, errs.New(errs.RetResponseException, err.Error())
@@ -157,7 +162,9 @@ func (c *rpcClient) HeartRequestC2M(metadata *metadata.Metadata, sub *client.Sub
 	req.RpcHeader = &protocol.RpcConnHeader{
 		Flag: proto.Int32(0),
 	}
-	rsp, err := c.client.DoRequest(c.ctx, req)
+	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+	defer cancel()
+	rsp, err := c.client.DoRequest(ctx, req)
 	if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
 		if v.ResponseException != nil {
 			return nil, errs.New(errs.RetResponseException, err.Error())
@@ -196,7 +203,9 @@ func (c *rpcClient) CloseRequestC2M(metadata *metadata.Metadata, sub *client.Sub
 	req.RpcHeader = &protocol.RpcConnHeader{
 		Flag: proto.Int32(0),
 	}
-	rsp, err := c.client.DoRequest(c.ctx, req)
+	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+	defer cancel()
+	rsp, err := c.client.DoRequest(ctx, req)
 	if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
 		if v.ResponseException != nil {
 			return nil, errs.New(errs.RetResponseException, err.Error())