You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/07/03 07:53:07 UTC

[incubator-inlong] branch INLONG-25 updated: [INLONG-630]Go SDK GetCurrConsumedInfo and GetClientID API (#493)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new c8064e2  [INLONG-630]Go SDK GetCurrConsumedInfo and GetClientID API (#493)
c8064e2 is described below

commit c8064e2dfff28eb9e6eeb73d49781376759db4b1
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Sat Jul 3 15:52:59 2021 +0800

    [INLONG-630]Go SDK GetCurrConsumedInfo and GetClientID API (#493)
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/client/consumer.go                |  8 ++++++--
 .../tubemq-client-go/client/consumer_impl.go           | 18 ++++++++++++++++--
 tubemq-client-twins/tubemq-client-go/remote/remote.go  |  7 +++++++
 3 files changed, 29 insertions(+), 4 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go b/tubemq-client-twins/tubemq-client-go/client/consumer.go
index bb17f03..d10e4a6 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -27,8 +27,10 @@ type ConsumerResult struct {
 	messages       []*Message
 }
 
-// ConsumerOffset of a consumption,
+// ConsumerOffset of a consumption.
 type ConsumerOffset struct {
+	partitionKey string
+	currOffset   int64
 }
 
 var clientID uint64
@@ -40,7 +42,9 @@ type Consumer interface {
 	// Confirm the consumption of a message.
 	Confirm(confirmContext string, consumed bool) (*ConsumerResult, error)
 	// GetCurrConsumedInfo returns the consumptions of the consumer.
-	GetCurrConsumedInfo() (map[string]*ConsumerOffset, error)
+	GetCurrConsumedInfo() map[string]*ConsumerOffset
 	// Close closes the consumer client and release the resources.
 	Close() error
+	// GetClientID returns the clientID of the consumer.
+	GetClientID() string
 }
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 616be17..8163de4 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -316,8 +316,22 @@ func parsePartitionKeyToTopic(partitionKey string) (string, error) {
 }
 
 // GetCurrConsumedInfo implementation of TubeMQ consumer.
-func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
-	panic("implement me")
+func (c *consumer) GetCurrConsumedInfo() map[string]*ConsumerOffset {
+	partitionOffset := c.rmtDataCache.GetCurPartitionOffset()
+	consumedInfo := make(map[string]*ConsumerOffset, len(partitionOffset))
+	for partition, offset := range partitionOffset {
+		co := &ConsumerOffset{
+			partitionKey: partition,
+			currOffset:   offset,
+		}
+		consumedInfo[partition] = co
+	}
+	return consumedInfo
+}
+
+// GetClientID implementation of TubeMQ consumer.
+func (c *consumer) GetClientID() string {
+	return c.clientID
 }
 
 // Close implementation of TubeMQ consumer.
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index c645d3e..8246a56 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -514,3 +514,10 @@ func (r *RmtDataCache) GetAllClosedBrokerParts() map[*metadata.Node][]*metadata.
 	}
 	return brokerPartitions
 }
+
+// GetCurPartitionOffset returns the partition to offset map.
+func (r *RmtDataCache) GetCurPartitionOffset() map[string]int64 {
+	r.dataBookMu.Lock()
+	defer r.dataBookMu.Unlock()
+	return r.partitionOffset
+}