You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/12/31 06:56:50 UTC

[incubator-inlong] branch master updated: [INLONG-2084][Bug]A bug in the Go SDK demo, and the API result class is not clear enough (#2086)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f5fd3a2  [INLONG-2084][Bug]A bug in the Go SDK demo, and the API result class is not clear enough (#2086)
f5fd3a2 is described below

commit f5fd3a2afee6b6e5a3f234bc53ec55c3a9114049
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri Dec 31 14:55:37 2021 +0800

    [INLONG-2084][Bug]A bug in the Go SDK demo, and the API result class is not clear enough (#2086)
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-twins/tubemq-client-go/README.md  |  4 ++--
 .../tubemq-client-go/client/consumer.go             |  9 ++++++++-
 .../tubemq-client-go/client/consumer_impl.go        | 21 +++++++++++----------
 .../tubemq-client-go/client/version.go              |  2 +-
 .../tubemq-client-go/example/consumer.go            |  2 +-
 .../example/multi_routine_consumer.go               |  2 +-
 6 files changed, 24 insertions(+), 16 deletions(-)

diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/README.md b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/README.md
index 43d87dd..73e1f96 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/README.md
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/README.md
@@ -61,7 +61,7 @@ defer c.Close()
 
 cr, err := c.GetMessage()
 // need to confirm by yourself.
-cr, err = c.Confirm(cr.ConfirmContext, true)
+_, err = c.Confirm(cr.ConfirmContext, true)
 
 for _, msg := range cr.Messages {
 	fmt.Printf("Received message msgId: %#v -- content: '%s'\n", msg.ID, string(msg.Data))
@@ -89,7 +89,7 @@ defer c.Close()
 
 cr, err := c.GetMessage()
 // need to confirm by yourself.
-cr, err = c.Confirm(cr.ConfirmContext, true)
+_, err = c.Confirm(cr.ConfirmContext, true)
 
 for _, msg := range cr.Messages {
 	fmt.Printf("Received message msgId: %#v -- content: '%s'\n", msg.ID, string(msg.Data))
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer.go b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer.go
index 415c9bf..1238918 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -31,6 +31,13 @@ type ConsumerResult struct {
 	Messages       []*Message
 }
 
+// ConfirmResult of a consumption.
+type ConfirmResult struct {
+	TopicName      string
+	ConfirmContext string
+	PeerInfo       *PeerInfo
+}
+
 var clientID uint64
 
 // Consumer is an interface that abstracts behavior of TubeMQ's consumer
@@ -38,7 +45,7 @@ type Consumer interface {
 	// GetMessage receive a single message.
 	GetMessage() (*ConsumerResult, error)
 	// Confirm the consumption of a message.
-	Confirm(confirmContext string, consumed bool) (*ConsumerResult, error)
+	Confirm(confirmContext string, consumed bool) (*ConfirmResult, error)
 	// GetCurrConsumedInfo returns the consumptions of the consumer.
 	GetCurrConsumedInfo() map[string]*remote.ConsumerOffset
 	// Close closes the consumer client and release the resources.
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index ca9cca0..30926a8 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -249,21 +249,21 @@ func (c *consumer) GetMessage() (*ConsumerResult, error) {
 		}
 		return nil, err
 	}
-	cs := &ConsumerResult{
+	cr := &ConsumerResult{
 		TopicName:      partition.GetTopic(),
 		ConfirmContext: confirmContext,
 		PeerInfo:       pi,
 	}
 	msgs, err := c.processGetMessageRspB2C(pi, isFiltered, partition, confirmContext, rsp)
 	if err != nil {
-		return cs, err
+		return cr, err
 	}
-	cs.Messages = msgs
-	return cs, err
+	cr.Messages = msgs
+	return cr, err
 }
 
 // Confirm implementation of TubeMQ consumer.
-func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConsumerResult, error) {
+func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConfirmResult, error) {
 	partitionKey, bookedTime, err := util.ParseConfirmContext(confirmContext)
 	if err != nil {
 		return nil, errs.New(errs.RetBadRequest, "illegel confirm_context content: unregular confirm_context value format")
@@ -294,15 +294,16 @@ func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConsumerResul
 		CurrOffset:   rsp.GetCurrOffset(),
 		MaxOffset:    rsp.GetMaxOffset(),
 	}
-	cs := &ConsumerResult{
-		TopicName: partition.GetTopic(),
-		PeerInfo:  pi,
+	cr := &ConfirmResult{
+		ConfirmContext: confirmContext,
+		TopicName:      partition.GetTopic(),
+		PeerInfo:       pi,
 	}
 	if !rsp.GetSuccess() {
-		return cs, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
+		return cr, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
 	}
 	c.rmtDataCache.BookPartitionInfo(partitionKey, rsp.GetCurrOffset(), rsp.GetMaxOffset())
-	return cs, err
+	return cr, err
 }
 
 func (c *consumer) sendConfirmReq2Broker(partition *metadata.Partition, consumed bool) (*protocol.CommitOffsetResponseB2C, error) {
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/version.go b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/version.go
index fad542e..a9c996c 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/version.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/version.go
@@ -18,5 +18,5 @@
 package client
 
 const (
-	tubeMQClientVersion = "0.1.0"
+	tubeMQClientVersion = "0.1.1"
 )
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/example/consumer.go b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/example/consumer.go
index cb432dc..a087ed7 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/example/consumer.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/example/consumer.go
@@ -60,7 +60,7 @@ func main() {
 			log.Errorf("Get message error %s", err.Error())
 			continue
 		}
-		cr, err = c.Confirm(cr.ConfirmContext, true)
+		_, err = c.Confirm(cr.ConfirmContext, true)
 		if err != nil {
 			log.Errorf("Confirm error %s", err.Error())
 			continue
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/example/multi_routine_consumer.go b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/example/multi_routine_consumer.go
index d5a2a52..6aa9e0d 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/example/multi_routine_consumer.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/example/multi_routine_consumer.go
@@ -70,7 +70,7 @@ func main() {
 					log.Errorf("Go routine %d, Get message error %s", i, err.Error())
 					continue
 				}
-				cr, err = c.Confirm(cr.ConfirmContext, true)
+				_, err = c.Confirm(cr.ConfirmContext, true)
 				if err != nil {
 					log.Errorf("Go routine %d, Confirm error %s", i, err.Error())
 					continue