You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/07/03 07:53:07 UTC
[incubator-inlong] branch INLONG-25 updated: [INLONG-630]Go SDK
GetCurrConsumedInfo and GetClientID API (#493)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-25 by this push:
new c8064e2 [INLONG-630]Go SDK GetCurrConsumedInfo and GetClientID API (#493)
c8064e2 is described below
commit c8064e2dfff28eb9e6eeb73d49781376759db4b1
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Sat Jul 3 15:52:59 2021 +0800
[INLONG-630]Go SDK GetCurrConsumedInfo and GetClientID API (#493)
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/client/consumer.go | 8 ++++++--
.../tubemq-client-go/client/consumer_impl.go | 18 ++++++++++++++++--
tubemq-client-twins/tubemq-client-go/remote/remote.go | 7 +++++++
3 files changed, 29 insertions(+), 4 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go b/tubemq-client-twins/tubemq-client-go/client/consumer.go
index bb17f03..d10e4a6 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -27,8 +27,10 @@ type ConsumerResult struct {
messages []*Message
}
-// ConsumerOffset of a consumption,
+// ConsumerOffset of a consumption.
type ConsumerOffset struct {
+ partitionKey string
+ currOffset int64
}
var clientID uint64
@@ -40,7 +42,9 @@ type Consumer interface {
// Confirm the consumption of a message.
Confirm(confirmContext string, consumed bool) (*ConsumerResult, error)
// GetCurrConsumedInfo returns the consumptions of the consumer.
- GetCurrConsumedInfo() (map[string]*ConsumerOffset, error)
+ GetCurrConsumedInfo() map[string]*ConsumerOffset
// Close closes the consumer client and release the resources.
Close() error
+ // GetClientID returns the clientID of the consumer.
+ GetClientID() string
}
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 616be17..8163de4 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -316,8 +316,22 @@ func parsePartitionKeyToTopic(partitionKey string) (string, error) {
}
// GetCurrConsumedInfo implementation of TubeMQ consumer.
-func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
- panic("implement me")
+func (c *consumer) GetCurrConsumedInfo() map[string]*ConsumerOffset {
+ partitionOffset := c.rmtDataCache.GetCurPartitionOffset()
+ consumedInfo := make(map[string]*ConsumerOffset, len(partitionOffset))
+ for partition, offset := range partitionOffset {
+ co := &ConsumerOffset{
+ partitionKey: partition,
+ currOffset: offset,
+ }
+ consumedInfo[partition] = co
+ }
+ return consumedInfo
+}
+
+// GetClientID implementation of TubeMQ consumer.
+func (c *consumer) GetClientID() string {
+ return c.clientID
}
// Close implementation of TubeMQ consumer.
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index c645d3e..8246a56 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -514,3 +514,10 @@ func (r *RmtDataCache) GetAllClosedBrokerParts() map[*metadata.Node][]*metadata.
}
return brokerPartitions
}
+
+// GetCurPartitionOffset returns the partition to offset map.
+func (r *RmtDataCache) GetCurPartitionOffset() map[string]int64 {
+ r.dataBookMu.Lock()
+ defer r.dataBookMu.Unlock()
+ return r.partitionOffset
+}