You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/12/31 06:56:50 UTC
[incubator-inlong] branch master updated: [INLONG-2084][Bug]A bug in the Go SDK demo, and the API result class is not clear enough (#2086)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f5fd3a2 [INLONG-2084][Bug]A bug in the Go SDK demo, and the API result class is not clear enough (#2086)
f5fd3a2 is described below
commit f5fd3a2afee6b6e5a3f234bc53ec55c3a9114049
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri Dec 31 14:55:37 2021 +0800
[INLONG-2084][Bug]A bug in the Go SDK demo, and the API result class is not clear enough (#2086)
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-twins/tubemq-client-go/README.md | 4 ++--
.../tubemq-client-go/client/consumer.go | 9 ++++++++-
.../tubemq-client-go/client/consumer_impl.go | 21 +++++++++++----------
.../tubemq-client-go/client/version.go | 2 +-
.../tubemq-client-go/example/consumer.go | 2 +-
.../example/multi_routine_consumer.go | 2 +-
6 files changed, 24 insertions(+), 16 deletions(-)
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/README.md b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/README.md
index 43d87dd..73e1f96 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/README.md
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/README.md
@@ -61,7 +61,7 @@ defer c.Close()
cr, err := c.GetMessage()
// need to confirm by yourself.
-cr, err = c.Confirm(cr.ConfirmContext, true)
+_, err = c.Confirm(cr.ConfirmContext, true)
for _, msg := range cr.Messages {
fmt.Printf("Received message msgId: %#v -- content: '%s'\n", msg.ID, string(msg.Data))
@@ -89,7 +89,7 @@ defer c.Close()
cr, err := c.GetMessage()
// need to confirm by yourself.
-cr, err = c.Confirm(cr.ConfirmContext, true)
+_, err = c.Confirm(cr.ConfirmContext, true)
for _, msg := range cr.Messages {
fmt.Printf("Received message msgId: %#v -- content: '%s'\n", msg.ID, string(msg.Data))
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer.go b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer.go
index 415c9bf..1238918 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -31,6 +31,13 @@ type ConsumerResult struct {
Messages []*Message
}
+// ConfirmResult of a consumption.
+type ConfirmResult struct {
+ TopicName string
+ ConfirmContext string
+ PeerInfo *PeerInfo
+}
+
var clientID uint64
// Consumer is an interface that abstracts behavior of TubeMQ's consumer
@@ -38,7 +45,7 @@ type Consumer interface {
// GetMessage receive a single message.
GetMessage() (*ConsumerResult, error)
// Confirm the consumption of a message.
- Confirm(confirmContext string, consumed bool) (*ConsumerResult, error)
+ Confirm(confirmContext string, consumed bool) (*ConfirmResult, error)
// GetCurrConsumedInfo returns the consumptions of the consumer.
GetCurrConsumedInfo() map[string]*remote.ConsumerOffset
// Close closes the consumer client and release the resources.
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index ca9cca0..30926a8 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -249,21 +249,21 @@ func (c *consumer) GetMessage() (*ConsumerResult, error) {
}
return nil, err
}
- cs := &ConsumerResult{
+ cr := &ConsumerResult{
TopicName: partition.GetTopic(),
ConfirmContext: confirmContext,
PeerInfo: pi,
}
msgs, err := c.processGetMessageRspB2C(pi, isFiltered, partition, confirmContext, rsp)
if err != nil {
- return cs, err
+ return cr, err
}
- cs.Messages = msgs
- return cs, err
+ cr.Messages = msgs
+ return cr, err
}
// Confirm implementation of TubeMQ consumer.
-func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConsumerResult, error) {
+func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConfirmResult, error) {
partitionKey, bookedTime, err := util.ParseConfirmContext(confirmContext)
if err != nil {
return nil, errs.New(errs.RetBadRequest, "illegel confirm_context content: unregular confirm_context value format")
@@ -294,15 +294,16 @@ func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConsumerResul
CurrOffset: rsp.GetCurrOffset(),
MaxOffset: rsp.GetMaxOffset(),
}
- cs := &ConsumerResult{
- TopicName: partition.GetTopic(),
- PeerInfo: pi,
+ cr := &ConfirmResult{
+ ConfirmContext: confirmContext,
+ TopicName: partition.GetTopic(),
+ PeerInfo: pi,
}
if !rsp.GetSuccess() {
- return cs, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
+ return cr, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
}
c.rmtDataCache.BookPartitionInfo(partitionKey, rsp.GetCurrOffset(), rsp.GetMaxOffset())
- return cs, err
+ return cr, err
}
func (c *consumer) sendConfirmReq2Broker(partition *metadata.Partition, consumed bool) (*protocol.CommitOffsetResponseB2C, error) {
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/version.go b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/version.go
index fad542e..a9c996c 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/version.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/version.go
@@ -18,5 +18,5 @@
package client
const (
- tubeMQClientVersion = "0.1.0"
+ tubeMQClientVersion = "0.1.1"
)
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/example/consumer.go b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/example/consumer.go
index cb432dc..a087ed7 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/example/consumer.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/example/consumer.go
@@ -60,7 +60,7 @@ func main() {
log.Errorf("Get message error %s", err.Error())
continue
}
- cr, err = c.Confirm(cr.ConfirmContext, true)
+ _, err = c.Confirm(cr.ConfirmContext, true)
if err != nil {
log.Errorf("Confirm error %s", err.Error())
continue
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/example/multi_routine_consumer.go b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/example/multi_routine_consumer.go
index d5a2a52..6aa9e0d 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/example/multi_routine_consumer.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/example/multi_routine_consumer.go
@@ -70,7 +70,7 @@ func main() {
log.Errorf("Go routine %d, Get message error %s", i, err.Error())
continue
}
- cr, err = c.Confirm(cr.ConfirmContext, true)
+ _, err = c.Confirm(cr.ConfirmContext, true)
if err != nil {
log.Errorf("Go routine %d, Confirm error %s", i, err.Error())
continue