You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/07/02 09:05:22 UTC
[incubator-inlong] branch INLONG-25 updated: [INLONG-629]Go SDK
Close API (#489)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-25 by this push:
new 1ac164c [INLONG-629]Go SDK Close API (#489)
1ac164c is described below
commit 1ac164c0bb9d8fd0f93d78835934ee0b0077ed58
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri Jul 2 17:05:16 2021 +0800
[INLONG-629]Go SDK Close API (#489)
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/client/consumer.go | 2 +
.../tubemq-client-go/client/consumer_impl.go | 51 ++++++++++++++++++++--
.../tubemq-client-go/client/heartbeat.go | 13 ++++++
.../tubemq-client-go/multiplexing/multiplexing.go | 14 ++++++
.../tubemq-client-go/remote/remote.go | 17 ++++++++
tubemq-client-twins/tubemq-client-go/rpc/client.go | 7 +++
6 files changed, 101 insertions(+), 3 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go b/tubemq-client-twins/tubemq-client-go/client/consumer.go
index a4eec0b..bb17f03 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -41,4 +41,6 @@ type Consumer interface {
Confirm(confirmContext string, consumed bool) (*ConsumerResult, error)
// GetCurrConsumedInfo returns the consumptions of the consumer.
GetCurrConsumedInfo() (map[string]*ConsumerOffset, error)
+ // Close closes the consumer client and release the resources.
+ Close() error
}
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 031d761..616be17 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -320,14 +320,24 @@ func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
panic("implement me")
}
+// Close implementation of TubeMQ consumer.
+func (c *consumer) Close() error {
+ close(c.done)
+ err := c.close2Master()
+ if err != nil {
+ return err
+ }
+ c.closeAllBrokers()
+ c.heartbeatManager.close()
+ c.client.Close()
+ return nil
+}
+
func (c *consumer) processRebalanceEvent() {
for {
select {
case event, ok := <-c.rmtDataCache.EventCh:
if ok {
- if event.GetEventStatus() == int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue {
- break
- }
c.rmtDataCache.ClearEvent()
switch event.GetEventType() {
case metadata.Disconnect, metadata.OnlyDisconnect:
@@ -620,3 +630,38 @@ func (c *consumer) convertMessages(filtered bool, topic string, rsp *protocol.Ge
}
return msgSize, msgs
}
+
+func (c *consumer) close2Master() error {
+ ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+ defer cancel()
+
+ m := &metadata.Metadata{}
+ node := &metadata.Node{}
+ node.SetHost(util.GetLocalHost())
+ node.SetAddress(c.master.Address)
+ m.SetNode(node)
+ sub := &metadata.SubscribeInfo{}
+ sub.SetGroup(c.config.Consumer.Group)
+ m.SetSubscribeInfo(sub)
+ auth := &protocol.AuthenticateInfo{}
+ c.genMasterAuthenticateToken(auth, true)
+ mci := &protocol.MasterCertificateInfo{
+ AuthInfo: auth,
+ }
+ c.subInfo.SetMasterCertificateInfo(mci)
+ rsp, err := c.client.CloseRequestC2M(ctx, m, c.subInfo)
+ if err != nil {
+ return err
+ }
+ if !rsp.GetSuccess() {
+ return errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
+ }
+ return nil
+}
+
+func (c *consumer) closeAllBrokers() {
+ partitions := c.rmtDataCache.GetAllClosedBrokerParts()
+ if len(partitions) > 0 {
+ c.unregister2Broker(partitions)
+ }
+}
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index fdd0d20..88ac12c 100644
--- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -234,3 +234,16 @@ func (h *heartbeatManager) resetBrokerTimer(broker *metadata.Node) {
hm.timer.Reset(interval)
}
}
+
+func (h *heartbeatManager) close() {
+ h.mu.Lock()
+ defer h.mu.Unlock()
+
+ for _, heartbeat := range h.heartbeats {
+ if !heartbeat.timer.Stop() {
+ <-heartbeat.timer.C
+ }
+ heartbeat.timer.Stop()
+ }
+ h.heartbeats = nil
+}
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
index 0c4239c..82c767a 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
@@ -175,6 +175,20 @@ func getCertPool(caCertFile string) (*x509.CertPool, error) {
return nil, nil
}
+// Close will release all the connections.
+func (p *Pool) Close() {
+ p.connections.Range(func(key, value interface{}) bool {
+ connection, ok := value.(*Connection)
+ if !ok {
+ return false
+ }
+ close(connection.done)
+ close(connection.mDone)
+ connection.conn.Close()
+ return true
+ })
+}
+
type recvReader struct {
ctx context.Context
recv chan codec.Response
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index a5310e7..c645d3e 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -497,3 +497,20 @@ func (r *RmtDataCache) GetPartition(key string) *metadata.Partition {
}
return nil
}
+
+// GetAllClosedBrokerParts will return the partitions which should be closed.
+func (r *RmtDataCache) GetAllClosedBrokerParts() map[*metadata.Node][]*metadata.Partition {
+ r.metaMu.Lock()
+ defer r.metaMu.Unlock()
+
+ brokerPartitions := make(map[*metadata.Node][]*metadata.Partition)
+ for _, partition := range r.partitions {
+ partitions, ok := brokerPartitions[partition.GetBroker()]
+ if !ok {
+ brokerPartitions[partition.GetBroker()] = []*metadata.Partition{partition}
+ } else {
+ partitions = append(partitions, partition)
+ }
+ }
+ return brokerPartitions
+}
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/client.go b/tubemq-client-twins/tubemq-client-go/rpc/client.go
index fab5bae..64bba71 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/client.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/client.go
@@ -55,6 +55,8 @@ type RPCClient interface {
HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.HeartResponseM2C, error)
// CloseRequestC2M is the rpc request for a consumer to be closed to master.
CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.CloseResponseM2C, error)
+ // Close will close the rpc client and release the resource.
+ Close()
}
// New returns a default TubeMQ rpc Client
@@ -71,6 +73,11 @@ type rpcClient struct {
config *config.Config
}
+// Close will release the resource of multiplexing pool.
+func (c *rpcClient) Close() {
+ c.pool.Close()
+}
+
func (c *rpcClient) doRequest(ctx context.Context, address string, req codec.RPCRequest) (*protocol.RspResponseBody, error) {
rsp, err := c.client.DoRequest(ctx, address, req)
if err != nil {