You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/07/02 09:05:22 UTC

[incubator-inlong] branch INLONG-25 updated: [INLONG-629]Go SDK Close API (#489)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new 1ac164c  [INLONG-629]Go SDK Close API (#489)
1ac164c is described below

commit 1ac164c0bb9d8fd0f93d78835934ee0b0077ed58
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri Jul 2 17:05:16 2021 +0800

    [INLONG-629]Go SDK Close API (#489)
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/client/consumer.go            |  2 +
 .../tubemq-client-go/client/consumer_impl.go       | 51 ++++++++++++++++++++--
 .../tubemq-client-go/client/heartbeat.go           | 13 ++++++
 .../tubemq-client-go/multiplexing/multiplexing.go  | 14 ++++++
 .../tubemq-client-go/remote/remote.go              | 17 ++++++++
 tubemq-client-twins/tubemq-client-go/rpc/client.go |  7 +++
 6 files changed, 101 insertions(+), 3 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go b/tubemq-client-twins/tubemq-client-go/client/consumer.go
index a4eec0b..bb17f03 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -41,4 +41,6 @@ type Consumer interface {
 	Confirm(confirmContext string, consumed bool) (*ConsumerResult, error)
 	// GetCurrConsumedInfo returns the consumptions of the consumer.
 	GetCurrConsumedInfo() (map[string]*ConsumerOffset, error)
+	// Close closes the consumer client and release the resources.
+	Close() error
 }
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 031d761..616be17 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -320,14 +320,24 @@ func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
 	panic("implement me")
 }
 
+// Close implementation of TubeMQ consumer.
+func (c *consumer) Close() error {
+	close(c.done)
+	err := c.close2Master()
+	if err != nil {
+		return err
+	}
+	c.closeAllBrokers()
+	c.heartbeatManager.close()
+	c.client.Close()
+	return nil
+}
+
 func (c *consumer) processRebalanceEvent() {
 	for {
 		select {
 		case event, ok := <-c.rmtDataCache.EventCh:
 			if ok {
-				if event.GetEventStatus() == int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue {
-					break
-				}
 				c.rmtDataCache.ClearEvent()
 				switch event.GetEventType() {
 				case metadata.Disconnect, metadata.OnlyDisconnect:
@@ -620,3 +630,38 @@ func (c *consumer) convertMessages(filtered bool, topic string, rsp *protocol.Ge
 	}
 	return msgSize, msgs
 }
+
+func (c *consumer) close2Master() error {
+	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+	defer cancel()
+
+	m := &metadata.Metadata{}
+	node := &metadata.Node{}
+	node.SetHost(util.GetLocalHost())
+	node.SetAddress(c.master.Address)
+	m.SetNode(node)
+	sub := &metadata.SubscribeInfo{}
+	sub.SetGroup(c.config.Consumer.Group)
+	m.SetSubscribeInfo(sub)
+	auth := &protocol.AuthenticateInfo{}
+	c.genMasterAuthenticateToken(auth, true)
+	mci := &protocol.MasterCertificateInfo{
+		AuthInfo: auth,
+	}
+	c.subInfo.SetMasterCertificateInfo(mci)
+	rsp, err := c.client.CloseRequestC2M(ctx, m, c.subInfo)
+	if err != nil {
+		return err
+	}
+	if !rsp.GetSuccess() {
+		return errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
+	}
+	return nil
+}
+
+func (c *consumer) closeAllBrokers() {
+	partitions := c.rmtDataCache.GetAllClosedBrokerParts()
+	if len(partitions) > 0 {
+		c.unregister2Broker(partitions)
+	}
+}
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index fdd0d20..88ac12c 100644
--- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -234,3 +234,16 @@ func (h *heartbeatManager) resetBrokerTimer(broker *metadata.Node) {
 		hm.timer.Reset(interval)
 	}
 }
+
+func (h *heartbeatManager) close() {
+	h.mu.Lock()
+	defer h.mu.Unlock()
+
+	for _, heartbeat := range h.heartbeats {
+		if !heartbeat.timer.Stop() {
+			<-heartbeat.timer.C
+		}
+		heartbeat.timer.Stop()
+	}
+	h.heartbeats = nil
+}
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
index 0c4239c..82c767a 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
@@ -175,6 +175,20 @@ func getCertPool(caCertFile string) (*x509.CertPool, error) {
 	return nil, nil
 }
 
+// Close will release all the connections.
+func (p *Pool) Close() {
+	p.connections.Range(func(key, value interface{}) bool {
+		connection, ok := value.(*Connection)
+		if !ok {
+			return false
+		}
+		close(connection.done)
+		close(connection.mDone)
+		connection.conn.Close()
+		return true
+	})
+}
+
 type recvReader struct {
 	ctx  context.Context
 	recv chan codec.Response
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index a5310e7..c645d3e 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -497,3 +497,20 @@ func (r *RmtDataCache) GetPartition(key string) *metadata.Partition {
 	}
 	return nil
 }
+
+// GetAllClosedBrokerParts will return the partitions which should be closed.
+func (r *RmtDataCache) GetAllClosedBrokerParts() map[*metadata.Node][]*metadata.Partition {
+	r.metaMu.Lock()
+	defer r.metaMu.Unlock()
+
+	brokerPartitions := make(map[*metadata.Node][]*metadata.Partition)
+	for _, partition := range r.partitions {
+		partitions, ok := brokerPartitions[partition.GetBroker()]
+		if !ok {
+			brokerPartitions[partition.GetBroker()] = []*metadata.Partition{partition}
+		} else {
+			partitions = append(partitions, partition)
+		}
+	}
+	return brokerPartitions
+}
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/client.go b/tubemq-client-twins/tubemq-client-go/rpc/client.go
index fab5bae..64bba71 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/client.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/client.go
@@ -55,6 +55,8 @@ type RPCClient interface {
 	HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.HeartResponseM2C, error)
 	// CloseRequestC2M is the rpc request for a consumer to be closed to master.
 	CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.CloseResponseM2C, error)
+	// Close will close the rpc client and release the resource.
+	Close()
 }
 
 // New returns a default TubeMQ rpc Client
@@ -71,6 +73,11 @@ type rpcClient struct {
 	config *config.Config
 }
 
+// Close will release the resource of multiplexing pool.
+func (c *rpcClient) Close() {
+	c.pool.Close()
+}
+
 func (c *rpcClient) doRequest(ctx context.Context, address string, req codec.RPCRequest) (*protocol.RspResponseBody, error) {
 	rsp, err := c.client.DoRequest(ctx, address, req)
 	if err != nil {