You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/09/15 06:31:23 UTC

[incubator-inlong] branch INLONG-25 updated: [INLONG-1558]Go SDK multi goroutine consumer example (#1559)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new 758edd7  [INLONG-1558]Go SDK multi goroutine consumer example (#1559)
758edd7 is described below

commit 758edd7e87fdf9c4858da40f25d507200f442c10
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Wed Sep 15 14:31:16 2021 +0800

    [INLONG-1558]Go SDK multi goroutine consumer example (#1559)
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../example/multi_routine_consumer.go              | 100 +++++++++++++++++++++
 1 file changed, 100 insertions(+)

diff --git a/tubemq-client-twins/tubemq-client-go/example/multi_routine_consumer.go b/tubemq-client-twins/tubemq-client-go/example/multi_routine_consumer.go
new file mode 100644
index 0000000..dd257a5
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/example/multi_routine_consumer.go
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/log"
+)
+
+var lastPrintTime int64
+var lastMsgCount int64
+var lastPrintCount int64
+
+func main() {
+	// Example for using config directly
+	// cfg := config.NewDefaultConfig()
+	// For topic filter
+	// cfg.Consumer.TopicFilters = map[string][]string{"topic1": {"filter1", "filter2"}, "topic2": {"filter3", "filter4"}}
+	// For part offset
+	// cfg.Consumer.PartitionOffset = map[string]int64{"181895251:test_1": 0, "181895251:test_2": 10}
+
+	// Example for parseAddress
+	cfg, err := config.ParseAddress("9.23.27.160:8099?topics=test_1&group=test_group")
+	// For topic filter
+	// cfg, err := config.ParseAddress("9.23.27.160:8099?topics=Topic1@12312323,1212;Topic2@121212,2321323&group=test_group")
+	if err != nil {
+		log.Errorf("Failed to parse address", err.Error())
+		panic(err)
+	}
+	c, err := client.NewConsumer(cfg)
+	if err != nil {
+		log.Errorf("new consumer error %s", err.Error())
+		panic(err)
+	}
+	numGoRoutine := 15
+	var wg sync.WaitGroup
+	for i := 0; i < numGoRoutine; i++ {
+		wg.Add(1)
+		go func(i int) {
+			defer wg.Done()
+			start := time.Now()
+			for {
+				elapsed := time.Since(start)
+				if elapsed >= 10*time.Minute {
+					break
+				}
+				cr, err := c.GetMessage()
+				if err != nil {
+					log.Errorf("Go routine %d, Get message error %s", i, err.Error())
+					continue
+				}
+				cr, err = c.Confirm(cr.ConfirmContext, true)
+				if err != nil {
+					log.Errorf("Go routine %d, Confirm error %s", i, err.Error())
+					continue
+				}
+				reportMsg(int64(len(cr.Messages)))
+			}
+			log.Infof("Go routine %d finished", i)
+		}(i)
+	}
+	wg.Wait()
+	err = c.Close()
+	if err != nil {
+		log.Errorf("Close err %s", err.Error())
+		panic(err)
+	}
+}
+
+func reportMsg(cnt int64) {
+	lastTime := atomic.LoadInt64(&lastPrintTime)
+	atomic.AddInt64(&lastMsgCount, cnt)
+	curCount := atomic.LoadInt64(&lastMsgCount)
+	curTime := time.Now().UnixNano() / int64(time.Second)
+	if curCount-atomic.LoadInt64(&lastPrintCount) >= 50000 || curTime-lastTime > 90 {
+		atomic.StoreInt64(&lastPrintTime, curTime)
+		log.Infof("Current time %d Current message count=%d", curTime, atomic.LoadInt64(&lastMsgCount))
+		atomic.StoreInt64(&lastPrintCount, curCount)
+	}
+}