You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by "mrproliu (via GitHub)" <gi...@apache.org> on 2023/03/14 10:17:27 UTC

[GitHub] [skywalking-satellite] mrproliu commented on a diff in pull request #128: Kafka forwards

mrproliu commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1135259663


##########
.github/workflows/e2e-native-kafka.yaml:
##########
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+name: E2E
+
+on:
+  pull_request:
+  push:
+    branches:
+      - main
+      - kafka
+
+jobs:
+  NativeProtocolsE2E:
+    strategy:
+      matrix:
+        test:
+          - name: Native Management Kafka/JVM/Tracing/Meter/Event/Log/Profile/CDS
+            config: test/e2e/case/native-protocols-kafka/e2e.yaml

Review Comment:
   I think add this case to the existing [e2e.yaml](https://github.com/apache/skywalking-satellite/blob/b40809512baeff56eec7b95220dee71c4fc175fd/.github/workflows/e2e-native.yaml#L29-L32) is enough. 



##########
configs/satellite_config_kafka.yaml:
##########
@@ -0,0 +1,605 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# The logger configuration.
+logger:

Review Comment:
   Why we need another configuration file, Kafka I think is only optional, please remove this file. 



##########
plugins/forwarder/kafka/nativejvm/forwarder.go:
##########
@@ -0,0 +1,104 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativejvm
+
+import (
+	"fmt"
+	"reflect"
+
+	"github.com/Shopify/sarama"
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-jvm-kafka-forwarder"
+	ShowName = "Native jvm Kafka Forwarder"

Review Comment:
   ```suggestion
   	ShowName = "Native JVM Kafka Forwarder"
   ```



##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+	"github.com/Shopify/sarama"
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	"google.golang.org/protobuf/proto"
+
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			message = append(message, &sarama.ProducerMessage{

Review Comment:
   Why does the attached event hasn't a key of the message? Also, will you create a PR for handling the attached event in the OAP?



##########
plugins/forwarder/kafka/nativeprofile/forwarder.go:
##########
@@ -0,0 +1,112 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativeprofile

Review Comment:
   Please delete the comment file.



##########
test/e2e/base/base-compose-kafka.yml:
##########
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2.1'
+
+services:

Review Comment:
   The `base-compose.yml` is enough, please add the service there. 



##########
test/e2e/case/native-protocols-kafka/e2e.yaml:
##########
@@ -0,0 +1,123 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file is used to show how to write configuration files and can be used to test.
+
+setup:
+  env: compose
+  file: docker-compose.yml
+  timeout: 1200
+  init-system-environment: ../../base/env
+  steps:
+    - name: install yq
+      command: bash test/e2e/base/scripts/prepare/setup-e2e-shell/install.sh yq
+    - name: install swctl
+      command: bash test/e2e/base/scripts/prepare/setup-e2e-shell/install.sh swctl
+    - name: install etcdctl
+      command: bash test/e2e/base/scripts/prepare/setup-e2e-shell/install.sh etcdctl
+
+trigger:
+  action: http
+  interval: 3s
+  times: 5
+  url: http://${consumer_host}:${consumer_9090}/info
+  method: POST
+
+verify:
+  # verify with retry strategy
+  retry:
+    # max retry count
+    count: 20
+    # the interval between two retries, in millisecond.
+    interval: 10s
+  cases:
+    # basic check: service list
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql service ls
+      expected: expected/service.yml
+    # basic check: service metrics
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name service_sla --service-name e2e-service-provider |yq e 'to_entries' -
+      expected: expected/metrics-has-value.yml
+    # basic check: service endpoint
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql endpoint list --keyword=info --service-name e2e-service-provider
+      expected: expected/service-endpoint.yml
+    # basic check: service endpoint metrics
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name endpoint_cpm --endpoint-name POST:/info --service-name e2e-service-provider |yq e 'to_entries' -
+      expected: expected/metrics-has-value.yml
+
+    # native management: service instance list
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance list --service-name e2e-service-provider
+      expected: expected/service-instance.yml
+
+    # native jvm: service instance jvm metrics
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name instance_jvm_thread_live_count --service-name e2e-service-provider --instance-name provider1 |yq e 'to_entries' -
+      expected: expected/metrics-has-value.yml
+
+    # native tracing: trace segment list
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace ls
+      expected: expected/traces-list.yml
+    # native tracing: trace detail
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace $(swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace ls|grep -A 5 '/info'|tail -n1|awk -F ' ' '{print $2}')
+      expected: expected/trace-info-detail.yml
+
+    # native meter: instance meter
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name meter_jvm_memory_used --instance-name provider1 --service-name e2e-service-provider |yq e 'to_entries' -
+      expected: expected/metrics-has-value.yml
+
+    # native event: event list
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql event list --service-name e2e-service-provider --instance-name provider1
+      expected: expected/event-list.yml
+
+    # native log: logs list
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql logs list --service-name e2e-service-provider --trace-id=$(swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace ls|grep -A 5 '/info'|tail -n1|awk -F ' ' '{print $2}')
+      expected: expected/logs-list.yml
+
+    # native profile: create task
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profiling trace create --service-name e2e-service-provider --endpoint-name POST:/info --start-time=$((($(date +%s)+5)*1000)) --duration=1 --min-duration-threshold=0 --dump-period=10 --max-sampling-count=9

Review Comment:
   The profile and CDS protocol is not implemented, so why do we need to check this? Please check it locally and remove the unnecessary expected files. 



##########
plugins/forwarder/kafka/nativemeter/forwarder.go:
##########
@@ -0,0 +1,125 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativemeter
+
+import (
+	"fmt"
+	"reflect"
+	"time"
+
+	"github.com/Shopify/sarama"
+	"google.golang.org/protobuf/proto"
+	"k8s.io/apimachinery/pkg/util/cache"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-meter-kafka-forwarder"
+	ShowName = "Native Meter Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	RoutingRuleLRUCacheSize int `mapstructure:"routing_rule_lru_cache_size"`
+	// The TTL of the LRU cache size for hosting routine rules of service instance.
+	RoutingRuleLRUCacheTTL int    `mapstructure:"routing_rule_lru_cache_ttl"`

Review Comment:
   Please delete the useless code.



##########
test/e2e/base/satellite/Dockerfile-Kafka:
##########
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM golang:1.18

Review Comment:
   Please don't re-create other Dockerfile of the satellite. we should only use the same image. If you just want to copy the configuration file, please use the volume. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org