You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/10 06:44:38 UTC
[incubator-inlong] branch master updated: [INLONG-1783]Make topic
filters take effect in Go SDK (#1784)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 246536f [INLONG-1783]Make topic filters take effect in Go SDK (#1784)
246536f is described below
commit 246536f43cb0317a284e9924b1e24eafffd492b8
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Wed Nov 10 14:44:34 2021 +0800
[INLONG-1783]Make topic filters take effect in Go SDK (#1784)
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/sub/info.go | 4 +++
.../tubemq-client-go/sub/info_test.go | 40 ++++++++++++++++++++++
2 files changed, 44 insertions(+)
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/sub/info.go b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/sub/info.go
index 6f0df64..3f4088a 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/sub/info.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/sub/info.go
@@ -61,9 +61,13 @@ func NewSubInfo(config *config.Config) *SubInfo {
topicFilters: config.Consumer.TopicFilters,
}
s.topicConds = make([]string, 0, len(config.Consumer.TopicFilters))
+ s.topicFilter = make(map[string]bool)
for topic, filters := range config.Consumer.TopicFilters {
cond := topic + "#"
count := 0
+ if len(filters) > 0 {
+ s.topicFilter[topic] = true
+ }
for _, filter := range filters {
if count > 0 {
cond += ","
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/sub/info_test.go b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/sub/info_test.go
new file mode 100644
index 0000000..fd37124
--- /dev/null
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/sub/info_test.go
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sub
+
+import (
+ "testing"
+
+ "github.com/apache/incubator-inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/config"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestNewSubInfo(t *testing.T) {
+ address := "127.0.0.1:9092,127.0.0.1:9093?topic=Topic1&filters=12312323&filters=1212&topic=Topic2&filters=121212&filters=2321323&group=Group&tlsEnable=false&msgNotFoundWait=10000&heartbeatMaxRetryTimes=6"
+ c, err := config.ParseAddress(address)
+ assert.Nil(t, err)
+
+ topicFilters := make(map[string][]string)
+ topicFilters["Topic1"] = []string{"12312323", "1212"}
+ topicFilters["Topic2"] = []string{"121212", "2321323"}
+
+ s := NewSubInfo(c)
+ assert.Equal(t, s.topics, []string{"Topic1", "Topic2"})
+ assert.Equal(t, s.topicFilters, topicFilters)
+ assert.Equal(t, s.topicFilter, map[string]bool{"Topic1": true, "Topic2": true})
+}