You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by "zbw911 (via GitHub)" <gi...@apache.org> on 2023/03/14 07:50:30 UTC

[GitHub] [skywalking-satellite] zbw911 opened a new pull request, #128: Kafka forwards

zbw911 opened a new pull request, #128:
URL: https://github.com/apache/skywalking-satellite/pull/128

   This branch provides several forwarders such as native-tracing-kafka-forwarder, native-management-kafka-forwarder, native-jvm-kafka-forwarder, and native-meter-kafka-forwarder. However, native-profile-kafka-forwarder has not been implemented yet.
   
   1,Use case: Since the .NET Core Agent has not implemented a Kafka reporter, you can use Skywalking Satellite as a proxy to write to Kafka. However, since the OAP Kafka-fetcher-plugin has not implemented a CLR handler, the native-clr-kafka-forwarder has not been implemented in this release.
   
   2,For most scenarios, enabling only native-tracing-kafka-forwarder can meet the needs of most business scenarios, and other requests with relatively small volume can be forwarded using a gRPC client.
   
   3,I have already used it in production environment, it can support applications written in Python, .NET Core, and Java.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on a diff in pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1139507954


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+	"github.com/Shopify/sarama"
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	"google.golang.org/protobuf/proto"
+
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			message = append(message, &sarama.ProducerMessage{

Review Comment:
   Why do you think the `if-else` is a problem? They all belong to the tracing data. For this review, the question is the Attached span hasn't a message key, not another question. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] zbw911 commented on a diff in pull request #128: Kafka forwards

Posted by "zbw911 (via GitHub)" <gi...@apache.org>.
zbw911 commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1148498585


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+
+	"github.com/Shopify/sarama"
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			// SniffData_SpanAttachedEvent is from ebpf agent, skywalking-rover project.
+			// You could find it here,
+			// https://github.com/apache/skywalking-data-collect-protocol/blob/0da9c8b3e111fb51c9f8854cae16d4519462ecfe
+			// /language-agent/Tracing.proto#L244
+			// ref: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1136909393
+			log.Logger.WithField("pipe", f.PipeName).Warnf("native-tracing-kafka-forwarder " +

Review Comment:
   @mrproliu  Help!!!
   I've been trying to do e2e testing for several days now, trying various methods such as using swctl to query with a single sentence, and also modifying service-instance.yml to adapt to the return value, but when it comes to the process list step, it always returns [].
   
   I have not been able to successfully execute the test, and the whole process has been stuck in this part, making progress extremely difficult.
   
   I hope you can help me figure out how to make this part run smoothly, and what kind of **environment or tools** I need to use. I have tried almost all the methods that I can try now.
   
   My usage is Docker Desktop, and the instance is running normally from Containers.
   
   https://github.com/apache/skywalking/tree/master/test/e2e-v2/cases/profiling/ebpf/network/h2
   Below is the original error message from the main branch:
   ```
     ERROR failed to verify the output: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance list --service-name=service, error:
   mismatch (-want +got):
     []interface{}{
   -       map[interface{}]interface{}{
   -               string("attributes"):   []interface{}{},
   -               string("id"):           string("c2VydmljZQ==.1_dGVzdA=="),
   -               string("instanceuuid"): string("c2VydmljZQ==.1_NTFjOGZmOTBjYmE2MTFlZGEwYjIwMjQyYWMxOTAwMDRAMTcyLjI1LjAuNA=="),
   -               string("language"):     string("UNKNOWN"),
   -               string("name"):         string("test"),
   -       },
   +       map[interface{}]interface{}{
   +               string("attributes"): []interface{}{
   +                       map[interface{}]interface{}{string("name"): string("Process No."), string("value"): string("1")},
   +                       map[interface{}]interface{}{string("name"): string("hostname"), string("value"): string("e7786ddc74c7")},
   +                       map[interface{}]interface{}{string("name"): string("OS Name"), string("value"): string("linux")},
   +                       map[interface{}]interface{}{string("name"): string("ipv4s"), string("value"): string("172.25.0.4")},
   +               },
   +               string("id"):           string("c2VydmljZQ==.1_NTFjOGZmOTBjYmE2MTFlZGEwYjIwMjQyYWMxOTAwMDRAMTcyLjI1LjAuNA=="),
   +               string("instanceuuid"): string("c2VydmljZQ==.1_NTFjOGZmOTBjYmE2MTFlZGEwYjIwMjQyYWMxOTAwMDRAMTcyLjI1LjAuNA=="),
   +               string("language"):     string("GO"),
   +               string("name"):         string("51c8ff90cba611eda0b20242ac190004@172.25.0.4"),
   +       },
     }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] zbw911 commented on a diff in pull request #128: Kafka forwards

Posted by "zbw911 (via GitHub)" <gi...@apache.org>.
zbw911 commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1141013138


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+
+	"github.com/Shopify/sarama"
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			// SniffData_SpanAttachedEvent is from ebpf agent, skywalking-rover project.
+			// You could find it here,
+			// https://github.com/apache/skywalking-data-collect-protocol/blob/0da9c8b3e111fb51c9f8854cae16d4519462ecfe
+			// /language-agent/Tracing.proto#L244
+			// ref: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1136909393
+			log.Logger.WithField("pipe", f.PipeName).Warnf("native-tracing-kafka-forwarder " +

Review Comment:
   > It should be works well with windows.
   
   The issue was caused by LF and CRLF. To fix this,   add .sh, env, and mvnw to the .gitattributes file in Git. This will prevent them from being converted to CRLF (\r\n) when cloned.   then use dos2unix or Goland (file->file properties->Line separators->LF unix and macOS(\n)) to resolve the issue. Now it works well on WSL. This is a common problem that Windows users may encounter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on a diff in pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1139510233


##########
plugins/forwarder/kafka/nativelog/forwarder.go:
##########
@@ -80,10 +82,16 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
 		if !ok {
 			continue
 		}
-		for _, l := range data.LogList.Logs {
+		for _, logData := range data.LogList.Logs {
+			logdata := &v3.LogData{}
+			err := proto.Unmarshal(logData, logdata)

Review Comment:
   Why close it? this issue seems not been updated. please follow my comment to fix it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on a diff in pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1139833539


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+
+	"github.com/Shopify/sarama"
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			// SniffData_SpanAttachedEvent is from ebpf agent, skywalking-rover project.
+			// You could find it here,
+			// https://github.com/apache/skywalking-data-collect-protocol/blob/0da9c8b3e111fb51c9f8854cae16d4519462ecfe
+			// /language-agent/Tracing.proto#L244
+			// ref: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1136909393
+			log.Logger.WithField("pipe", f.PipeName).Warnf("native-tracing-kafka-forwarder " +

Review Comment:
   Why there still not implemented?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on a diff in pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1139837096


##########
test/e2e/case/native-protocols-kafka/satellite_config_kafka.yaml:
##########
@@ -0,0 +1,597 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# The logger configuration.
+logger:
+  # The log format pattern configuration.
+  log_pattern: ${SATELLITE_LOGGER_LOG_PATTERN:%time [%level][%field] - %msg}
+  # The time format pattern configuration.
+  time_pattern: ${SATELLITE_LOGGER_TIME_PATTERN:2006-01-02 15:04:05.000}
+  # The lowest level of printing allowed.
+  level: ${SATELLITE_LOGGER_LEVEL:info}
+
+# The Satellite self telemetry configuration.
+telemetry:
+  # The space concept for the deployment, such as the namespace concept in the Kubernetes.
+  cluster: ${SATELLITE_TELEMETRY_CLUSTER:satellite-cluster}
+  # The group concept for the deployment, such as the service resource concept in the Kubernetes.
+  service: ${SATELLITE_TELEMETRY_SERVICE:satellite-service}
+  # The minimum running unit, such as the pod concept in the Kubernetes.
+  instance: ${SATELLITE_TELEMETRY_INSTANCE:satellite-instance}
+  # Telemetry export type, support "prometheus", "metrics_service" or "none"
+  export_type: ${SATELLITE_TELEMETRY_EXPORT_TYPE:prometheus}
+  # Export telemetry data through Prometheus server, only works on "export_type=prometheus".
+  prometheus:
+    # The prometheus server address.
+    address: ${SATELLITE_TELEMETRY_PROMETHEUS_ADDRESS::1234}
+    # The prometheus server metrics endpoint.
+    endpoint: ${SATELLITE_TELEMETRY_PROMETHEUS_ENDPOINT:/metrics}
+  # Export telemetry data through native meter format to OAP backend, only works on "export_type=metrics_service".
+  metrics_service:
+    # The grpc-client plugin name, using the SkyWalking native batch meter protocol
+    client_name: ${SATELLITE_TELEMETRY_METRICS_SERVICE_CLIENT_NAME:grpc-client}
+    # The interval second for sending metrics
+    interval: ${SATELLITE_TELEMETRY_METRICS_SERVICE_INTERVAL:10}
+    # The prefix of telemetry metric name
+    metric_prefix: ${SATELLITE_TELEMETRY_METRICS_SERVICE_METRIC_PREFIX:sw_stl_}
+
+# The sharing plugins referenced by the specific plugins in the different pipes.
+sharing:
+  clients:
+    - plugin_name: "grpc-client"

Review Comment:
   This client is unnecessary, please delete it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] zbw911 commented on a diff in pull request #128: Kafka forwards

Posted by "zbw911 (via GitHub)" <gi...@apache.org>.
zbw911 commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1139956883


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+
+	"github.com/Shopify/sarama"
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			// SniffData_SpanAttachedEvent is from ebpf agent, skywalking-rover project.
+			// You could find it here,
+			// https://github.com/apache/skywalking-data-collect-protocol/blob/0da9c8b3e111fb51c9f8854cae16d4519462ecfe
+			// /language-agent/Tracing.proto#L244
+			// ref: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1136909393
+			log.Logger.WithField("pipe", f.PipeName).Warnf("native-tracing-kafka-forwarder " +

Review Comment:
   Sorry, I misunderstood your question about OAP earlier. I will try to help with that later.
    
   @mrproliu  By the way, do you have detailed documentation or instructions for using e2e? 
   I've been getting errors when trying to use it in the WSL command line (ERROR Local Docker compose exited abnormally whilst running docker-compose: [--env-file /mnt/f/github/apache/skywalking-infra-e2e/examples/compose/env up -d]. exit status 1). I couldn't find a solution in the SkyWalking issue tracker either. I have to submit it to the GitHub workflow for verification, which is a bit inefficient.
   Can you confirm whether e2e can be run on WSL?
   Thank you .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] zbw911 commented on a diff in pull request #128: Kafka forwards

Posted by "zbw911 (via GitHub)" <gi...@apache.org>.
zbw911 commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1139956883


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+
+	"github.com/Shopify/sarama"
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			// SniffData_SpanAttachedEvent is from ebpf agent, skywalking-rover project.
+			// You could find it here,
+			// https://github.com/apache/skywalking-data-collect-protocol/blob/0da9c8b3e111fb51c9f8854cae16d4519462ecfe
+			// /language-agent/Tracing.proto#L244
+			// ref: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1136909393
+			log.Logger.WithField("pipe", f.PipeName).Warnf("native-tracing-kafka-forwarder " +

Review Comment:
   Sorry, I misunderstood your question about OAP earlier. I will try to help with that later.
    
   By the way, do you have detailed documentation or instructions for using e2e? 
   I've been getting errors when trying to use it in the WSL command line (ERROR Local Docker compose exited abnormally whilst running docker-compose: [--env-file /mnt/f/github/apache/skywalking-infra-e2e/examples/compose/env up -d]. exit status 1). I couldn't find a solution in the SkyWalking issue tracker either. I have to submit it to the GitHub workflow for verification, which is a bit inefficient.
   Can you confirm whether e2e can be run on WSL?
   Thank you .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on a diff in pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1140223868


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+
+	"github.com/Shopify/sarama"
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			// SniffData_SpanAttachedEvent is from ebpf agent, skywalking-rover project.
+			// You could find it here,
+			// https://github.com/apache/skywalking-data-collect-protocol/blob/0da9c8b3e111fb51c9f8854cae16d4519462ecfe
+			// /language-agent/Tracing.proto#L244
+			// ref: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1136909393
+			log.Logger.WithField("pipe", f.PipeName).Warnf("native-tracing-kafka-forwarder " +

Review Comment:
   > invalid reference format: no such image: apache/skywalking-ui:8.5.0
   
   The error message clearly shows the docker image just does not exist. You could change it in the docker-compose file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] zbw911 commented on a diff in pull request #128: Kafka forwards

Posted by "zbw911 (via GitHub)" <gi...@apache.org>.
zbw911 commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1140170654


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+
+	"github.com/Shopify/sarama"
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			// SniffData_SpanAttachedEvent is from ebpf agent, skywalking-rover project.
+			// You could find it here,
+			// https://github.com/apache/skywalking-data-collect-protocol/blob/0da9c8b3e111fb51c9f8854cae16d4519462ecfe
+			// /language-agent/Tracing.proto#L244
+			// ref: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1136909393
+			log.Logger.WithField("pipe", f.PipeName).Warnf("native-tracing-kafka-forwarder " +

Review Comment:
   > We don't test on WSL. No one in the maintainer team is using Windows. That is the point. Linux is very easy to set up. You could run from there on a Windows.
   
   I'm going to try installing a Linux desktop on Hyper-V.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] wu-sheng commented on pull request #128: Kafka forwards

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng commented on PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#issuecomment-1572015059

   No update for months. Closing for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] wu-sheng commented on a diff in pull request #128: Kafka forwards

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1139632561


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+	"github.com/Shopify/sarama"
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	"google.golang.org/protobuf/proto"
+
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			message = append(message, &sarama.ProducerMessage{

Review Comment:
   @mrproliu Please be very careful about all the codes this PR is supposed to change. I am super concerned about it lacking fully testing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] zbw911 commented on pull request #128: Kafka forwards

Posted by "zbw911 (via GitHub)" <gi...@apache.org>.
zbw911 commented on PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#issuecomment-1467673221

   > 
   Currently, I have performed gray testing on .NET, Java, and Python in the production environment. I am using Kafka directly for Java and Python, but the .NET agent does not support the use of Kafka. Therefore, I am using Satellite as a proxy to forward requests to Kafka for unified processing. Additionally, Kafka is necessary as a buffer, as I recently encountered a delay of approximately 1.8 billion in data due to an issue with ES, but the data was not lost. When using GRPC to receive OAP alarms, data loss also occurred. Kafka itself has value as a buffer, and there is a need for a means of converting GRPC to Kafka. When I discovered the Satellite project, I realized it was perfect for this type of scenario!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] wu-sheng commented on pull request #128: Kafka forwards

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng commented on PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#issuecomment-1467744777

   Please fix the CI and more important, we require a Kafka involved e2e to verify this feature. @zbw911 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] wu-sheng commented on a diff in pull request #128: Kafka forwards

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1141013937


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+
+	"github.com/Shopify/sarama"
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			// SniffData_SpanAttachedEvent is from ebpf agent, skywalking-rover project.
+			// You could find it here,
+			// https://github.com/apache/skywalking-data-collect-protocol/blob/0da9c8b3e111fb51c9f8854cae16d4519462ecfe
+			// /language-agent/Tracing.proto#L244
+			// ref: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1136909393
+			log.Logger.WithField("pipe", f.PipeName).Warnf("native-tracing-kafka-forwarder " +

Review Comment:
   I think ebpf agent(AttachedEvent) is highly relying on Linux. I even doubt MacOS would work.
   Don't try to depend on one Platforma, sometimes, it just doesn't work from one case to another.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on a diff in pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1137105786


##########
.github/workflows/publish-docker.yaml:
##########
@@ -20,6 +20,7 @@ on:
   push:
     branches:
       - main
+      - kafka-forwards

Review Comment:
   Should be deleted too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on a diff in pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1139839932


##########
test/e2e/case/native-protocols-kafka/satellite_config_kafka.yaml:
##########
@@ -0,0 +1,597 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# The logger configuration.
+logger:
+  # The log format pattern configuration.
+  log_pattern: ${SATELLITE_LOGGER_LOG_PATTERN:%time [%level][%field] - %msg}
+  # The time format pattern configuration.
+  time_pattern: ${SATELLITE_LOGGER_TIME_PATTERN:2006-01-02 15:04:05.000}
+  # The lowest level of printing allowed.
+  level: ${SATELLITE_LOGGER_LEVEL:info}
+
+# The Satellite self telemetry configuration.
+telemetry:
+  # The space concept for the deployment, such as the namespace concept in the Kubernetes.
+  cluster: ${SATELLITE_TELEMETRY_CLUSTER:satellite-cluster}
+  # The group concept for the deployment, such as the service resource concept in the Kubernetes.
+  service: ${SATELLITE_TELEMETRY_SERVICE:satellite-service}
+  # The minimum running unit, such as the pod concept in the Kubernetes.
+  instance: ${SATELLITE_TELEMETRY_INSTANCE:satellite-instance}
+  # Telemetry export type, support "prometheus", "metrics_service" or "none"
+  export_type: ${SATELLITE_TELEMETRY_EXPORT_TYPE:prometheus}
+  # Export telemetry data through Prometheus server, only works on "export_type=prometheus".
+  prometheus:
+    # The prometheus server address.
+    address: ${SATELLITE_TELEMETRY_PROMETHEUS_ADDRESS::1234}
+    # The prometheus server metrics endpoint.
+    endpoint: ${SATELLITE_TELEMETRY_PROMETHEUS_ENDPOINT:/metrics}
+  # Export telemetry data through native meter format to OAP backend, only works on "export_type=metrics_service".
+  metrics_service:
+    # The grpc-client plugin name, using the SkyWalking native batch meter protocol
+    client_name: ${SATELLITE_TELEMETRY_METRICS_SERVICE_CLIENT_NAME:grpc-client}
+    # The interval second for sending metrics
+    interval: ${SATELLITE_TELEMETRY_METRICS_SERVICE_INTERVAL:10}
+    # The prefix of telemetry metric name
+    metric_prefix: ${SATELLITE_TELEMETRY_METRICS_SERVICE_METRIC_PREFIX:sw_stl_}
+
+# The sharing plugins referenced by the specific plugins in the different pipes.
+sharing:
+  clients:
+    - plugin_name: "grpc-client"
+      # The gRPC server address finder type
+      finder_type: ${SATELLITE_GRPC_CLIENT_FINDER:static}
+      # The gRPC server address (default localhost:11800).
+      server_addr: ${SATELLITE_GRPC_CLIENT:127.0.0.1:11800}
+      # The gRPC kubernetes server address finder
+      kubernetes_config:
+        # The kubernetes API server address, If not define means using in kubernetes mode to connect
+        api_server: ${SATELLITE_GRPC_CLIENT_KUBERNETES_API_SERVER:}
+        # The HTTP basic authentication credentials for the targets.
+        basic_auth:
+          # The username for auth.
+          username: ${SATELLITE_GRPC_CLIENT_KUBERNETES_BASIC_AUTH_USERNAME:}
+          # The password for auth.
+          password: ${SATELLITE_GRPC_CLIENT_KUBERNETES_BASIC_AUTH_PASSWORD:}
+          # The password file path for auth.
+          password_file: ${SATELLITE_GRPC_CLIENT_KUBERNETES_BASIC_AUTH_PASSWORD_FILE:}
+        # The bearer token for the targets.
+        bearer_token: ${SATELLITE_GRPC_CLIENT_KUBERNETES_BEARER_TOKEN:}
+        # The bearer token file for the targets.
+        bearer_token_file: ${SATELLITE_GRPC_CLIENT_KUBERNETES_BEARER_TOKEN_FILE:}
+        # HTTP proxy server to use to connect to the targets.
+        proxy_url: ${SATELLITE_GRPC_CLIENT_KUBERNETES_PROXY_URL:}
+        # Used to connect to the targets.
+        tls_config:
+          # The CA cert to use for the targets.
+          ca_file: ${SATELLITE_GRPC_CLIENT_KUBERNETES_TLS_CONFIG_CA_FILE:}
+          # The client cert file for the targets.
+          cert_file: ${SATELLITE_GRPC_CLIENT_KUBERNETES_TLS_CONFIG_CERT_FILE:}
+          # The client key file for the targets.
+          key_file: ${SATELLITE_GRPC_CLIENT_KUBERNETES_TLS_CONFIG_KEY_FILE:}
+          # Used to verify the hostname for the targets.
+          server_name: ${SATELLITE_GRPC_CLIENT_KUBERNETES_TLS_CONFIG_SERVER_NAME:}
+          # Disable target certificate validation.
+          insecure_skip_verify: ${SATELLITE_GRPC_CLIENT_KUBERNETES_TLS_CONFIG_INSECURE_SKIP_VERIFY:}
+        namespaces:
+          # Support to lookup namespaces.
+          - ${SATELLITE_GRPC_CLIENT_KUBERNETES_NAMESPACE:default}
+        # The kind of resource
+        kind: ${SATELLITE_GRPC_CLIENT_KUBERNETES_KIND:pod}
+        # The kind selector
+        selector:
+          # Label selector
+          label: ${SATELLITE_GRPC_CLIENT_KUBERNETES_SELECTOR_LABEL:}
+          # Field selector
+          field: ${SATELLITE_GRPC_CLIENT_KUBERNETES_SELECTOR_FIELD:}
+        # How to get the address exported port
+        extra_port:
+          # Resource target port
+          port: ${SATELLITE_GRPC_CLIENT_KUBERNETES_EXTRA_PORT:11800}
+      # The TLS switch
+      enable_TLS: ${SATELLITE_GRPC_ENABLE_TLS:false}
+      # The file path of client.pem. The config only works when opening the TLS switch.
+      client_pem_path: ${SATELLITE_GRPC_CLIENT_PEM_PATH:"client.pem"}
+      # The file path of client.key. The config only works when opening the TLS switch.
+      client_key_path: ${SATELLITE_GRPC_CLIENT_KEY_PATH:"client.key"}
+      # InsecureSkipVerify controls whether a client verifies the server's certificate chain and host name.
+      insecure_skip_verify: ${SATELLITE_GRPC_INSECURE_SKIP_VERIFY:false}
+      # The file path oca.pem. The config only works when opening the TLS switch.
+      ca_pem_path: ${SATELLITE_grpc_CA_PEM_PATH:"ca.pem"}
+      # How frequently to check the connection(second)
+      check_period: ${SATELLITE_GRPC_CHECK_PERIOD:5}
+      # The auth value when send request
+      authentication: ${SATELLITE_GRPC_AUTHENTICATION:""}
+    - plugin_name: "kafka-client"
+      # The Kafka broker addresses (default localhost:9092). Multiple values are separated by commas.
+      brokers: ${SATELLITE_KAFKA_CLIENT_BROKERS:127.0.0.1:9092}
+      # The Kakfa version should follow this pattern, which is major_minor_veryMinor_patch.
+      version: ${SATELLITE_KAFKA_VERSION:"2.1.1"}
+      # The TLS switch
+      enable_TLS: ${SATELLITE_KAFKA_ENABLE_TLS:false}
+      # The file path of client.pem. The config only works when opening the TLS switch.
+      client_pem_path: ${SATELLITE_KAFKA_CLIENT_PEM_PATH:"client.pem"}
+      # The file path of client.key. The config only works when opening the TLS switch.
+      client_key_path: ${SATELLITE_KAFKA_CLIENT_KEY_PATH:"client.key"}
+      # The file path oca.pem. The config only works when opening the TLS switch.
+      ca_pem_path: ${SATELLITE_KAFKA_CA_PEM_PATH:"ca.pem"}
+  servers:
+    - plugin_name: "grpc-server"
+      # The address of grpc server.
+      address: ${SATELLITE_GRPC_ADDRESS:":11800"}
+      # The TLS cert file path.
+      tls_cert_file: ${SATELLITE_GRPC_TLS_CERT_FILE:""}
+      # The TLS key file path.
+      tls_key_file: ${SATELLITE_GRPC_TLS_KEY_FILE:""}
+      # To Accept Connection Limiter when reach the resource
+      accept_limit:
+        # The max CPU utilization limit
+        cpu_utilization: ${SATELLITE_GRPC_ACCEPT_LIMIT_CPU_UTILIZATION:75}
+        # The max connection count
+        connection_count: ${SATELLITE_GRPC_ACCEPT_LIMIT_CONNECTION_COUNT:4000}
+
+# The working pipe configurations.
+pipes:
+  - common_config:
+      pipe_name: logpipe
+    gatherer:
+      server_name: "grpc-server"
+      receiver:
+        plugin_name: "grpc-native-log-receiver"
+      queue:
+        plugin_name: "memory-queue"
+        # The maximum buffer event size.
+        event_buffer_size: ${SATELLITE_QUEUE_EVENT_BUFFER_SIZE:5000}
+        # The partition count of queue.
+        partition: ${SATELLITE_QUEUE_PARTITION:4}
+    processor:
+      filters:
+    sender:
+      fallbacker:
+        plugin_name: none-fallbacker
+      # The time interval between two flush operations. And the time unit is millisecond.
+      flush_time: ${SATELLITE_LOGPIPE_SENDER_FLUSH_TIME:1000}
+      # The maximum buffer elements.
+      max_buffer_size: ${SATELLITE_LOGPIPE_SENDER_MAX_BUFFER_SIZE:200}
+      # The minimum flush elements.
+      min_flush_events: ${SATELLITE_LOGPIPE_SENDER_MIN_FLUSH_EVENTS:1}
+#      client_name: grpc-client
+#      forwarders:
+#        - plugin_name: native-log-grpc-forwarder
+      client_name: kafka-client
+      forwarders:
+        - plugin_name: native-log-kafka-forwarder
+          topic: ${ATELLITE_FORWARDER_KAFKA_TOPIC_LOGGING:skywalking-logs}
+  - common_config:
+      pipe_name: managementpipe
+    gatherer:
+      server_name: "grpc-server"
+      receiver:
+        plugin_name: "grpc-native-management-receiver"
+      queue:
+        plugin_name: "memory-queue"
+        # The maximum buffer event size.
+        event_buffer_size: ${SATELLITE_QUEUE_EVENT_BUFFER_SIZE:5000}
+        # The partition count of queue.
+        partition: ${SATELLITE_QUEUE_PARTITION:4}
+    processor:
+      filters:
+    sender:
+      fallbacker:
+        plugin_name: none-fallbacker
+      # The time interval between two flush operations. And the time unit is millisecond.
+      flush_time: ${SATELLITE_LOGMANAGEMENT_SENDER_FLUSH_TIME:1000}
+      # The maximum buffer elements.
+      max_buffer_size: ${SATELLITE_LOGMANAGEMENT_SENDER_MAX_BUFFER_SIZE:20}
+      # The minimum flush elements.
+      min_flush_events: ${SATELLITE_LOGMANAGEMENT_SENDER_MIN_FLUSH_EVENTS:1}
+#      client_name: grpc-client
+#      forwarders:
+#        - plugin_name: native-management-grpc-forwarder
+      client_name: kafka-client
+      forwarders:
+        - plugin_name: native-management-kafka-forwarder
+          topic: ${SATELLITE_FORWARDER_KAFKA_TOPIC_MANAGEMENT:skywalking-managements}
+  - common_config:
+      pipe_name: tracingpipe
+    gatherer:
+      server_name: "grpc-server"
+      receiver:
+        plugin_name: "grpc-native-tracing-receiver"
+      queue:
+        plugin_name: "memory-queue"
+        # The maximum buffer event size.
+        event_buffer_size: ${SATELLITE_QUEUE_EVENT_BUFFER_SIZE:5000}
+        # The partition count of queue.
+        partition: ${SATELLITE_QUEUE_PARTITION:4}
+    processor:
+      filters:
+    sender:
+      fallbacker:
+        plugin_name: none-fallbacker
+      # The time interval between two flush operations. And the time unit is millisecond.
+      flush_time: ${SATELLITE_TRACINGPIPE_SENDER_FLUSH_TIME:1000}
+      # The maximum buffer elements.
+      max_buffer_size: ${SATELLITE_TRACINGPIPE_SENDER_MAX_BUFFER_SIZE:200}
+      # The minimum flush elements.
+      min_flush_events: ${SATELLITE_TRACINGPIPE_SENDER_MIN_FLUSH_EVENTS:1}
+      client_name: kafka-client
+      forwarders:
+        - plugin_name: native-tracing-kafka-forwarder
+          topic: ${SATELLITE_FORWARDER_KAFKA_TOPIC_SEGMENT:skywalking-segments}
+  - common_config:

Review Comment:
   There still have many not using pipelines, please remove them, to prevent abuse due to unknown circumstances. Such as profile/CDS/event/CLR/ALS pipe, etc. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] wu-sheng commented on a diff in pull request #128: Kafka forwards

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1137006228


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+	"github.com/Shopify/sarama"
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	"google.golang.org/protobuf/proto"
+
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			message = append(message, &sarama.ProducerMessage{

Review Comment:
   What blocks you? I think this is very easy to add. It is just a part of the trace. Routing like a segment should be enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on a diff in pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1139645943


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+	"github.com/Shopify/sarama"
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	"google.golang.org/protobuf/proto"
+
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			message = append(message, &sarama.ProducerMessage{

Review Comment:
   @zbw911 please finish this issue first, then click the `resolve conversation` button. 
   
   > The existing OAP server cannot parse SpanAttachedEvent
   
   Back to my first comment on this conversation, you should also implement the relevant functions on the OAP side.
   You have already implemented the management Kafka forwarder, so you should know there are also multiple data structures in that service. In that implementation, you already use the `different message key prefixes` to identify them, you can do this in the same way. 
   You should read the OAP code related to the management Kafka receiver: https://github.com/apache/skywalking/blob/master/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandler.java#L61-L71



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] wu-sheng commented on pull request #128: Kafka forwards

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng commented on PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#issuecomment-1467678925

   > At present, I think we can accept him. I checked the current official agents, they are basically only supported by the gRPC protocol. The satellite itself can be used as a protocol conversion, which may be an option for agents that need to be modified on a large scale. If all Agents provide Kafka's agreement later, I think it can be deleted in satellite.
   
   OK, if that is the bar, I think we will never reach :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] wu-sheng commented on a diff in pull request #128: Kafka forwards

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1137070280


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+	"github.com/Shopify/sarama"
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	"google.golang.org/protobuf/proto"
+
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			message = append(message, &sarama.ProducerMessage{

Review Comment:
   You don't need to understand all. You just need to copy and change a case of it.
   Take this, https://github.com/apache/skywalking/blob/2e04b069bfba24b1233dd66588217d08bdde7710/.github/workflows/skywalking.yaml#L523-L524.
   
   Verify relative traces, but may be not real attached events.
   
   https://github.com/apache/skywalking/blob/master/test/e2e-v2/cases/profiling/ebpf/network/network-cases.yaml#L143-L156
   
   @mrproliu What do you want to add?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] zbw911 commented on a diff in pull request #128: Kafka forwards

Posted by "zbw911 (via GitHub)" <gi...@apache.org>.
zbw911 commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1137483051


##########
plugins/forwarder/kafka/nativelog/forwarder.go:
##########
@@ -80,10 +82,16 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
 		if !ok {
 			continue
 		}
-		for _, l := range data.LogList.Logs {
+		for _, logData := range data.LogList.Logs {
+			logdata := &v3.LogData{}
+			err := proto.Unmarshal(logData, logdata)

Review Comment:
   I agree with your point of view. Let's make it run first and optimize it later. Then, let's mark it as resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] zbw911 commented on a diff in pull request #128: Kafka forwards

Posted by "zbw911 (via GitHub)" <gi...@apache.org>.
zbw911 commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1138641925


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+	"github.com/Shopify/sarama"
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	"google.golang.org/protobuf/proto"
+
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			message = append(message, &sarama.ProducerMessage{

Review Comment:
   > Why does the attached event hasn't a key of the message? Also, will you create a PR for handling the attached event in the OAP?
   
   A TracingType currently covers two types of business requests, which has led to the current if-else problem. The setting of SniffType needs to be further detailed and each type should be a type of business.
   
   Types should be defined as more specific types rather than classifications. SniffType should be mapped to the service layer in proto.
   
   Finally, my solution is to add SpanAttachedEventType = 17 in https://github.com/apache/skywalking-goapi/blob/main/satellite/data/v1/SniffData.proto#L47.
   
   If possible, I will branch the goapi project, and then modify the skywalking-satellite project.
   
   @mrproliu  
   
   -------
   v1.SniffData_SpanAttachedEvent->SpanAttachedEventReportService:  https://github.com/apache/skywalking-data-collect-protocol/blob/0da9c8b3e111fb51c9f8854cae16d4519462ecfe/language-agent/Tracing.proto#L233-L237
   
   v1.SniffData_Segment->TraceSegmentReportService:  https://github.com/apache/skywalking-data-collect-protocol/blob/0da9c8b3e111fb51c9f8854cae16d4519462ecfe/language-agent/Tracing.proto#L33-L46



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] wu-sheng commented on a diff in pull request #128: Kafka forwards

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1140021147


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+
+	"github.com/Shopify/sarama"
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			// SniffData_SpanAttachedEvent is from ebpf agent, skywalking-rover project.
+			// You could find it here,
+			// https://github.com/apache/skywalking-data-collect-protocol/blob/0da9c8b3e111fb51c9f8854cae16d4519462ecfe
+			// /language-agent/Tracing.proto#L244
+			// ref: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1136909393
+			log.Logger.WithField("pipe", f.PipeName).Warnf("native-tracing-kafka-forwarder " +

Review Comment:
   We don't test on WSL. No one in the maintainer team is using Windows. That is the point.  Linux is very easy to set up. You could run from there on a Windows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on a diff in pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1137107997


##########
docs/en/setup/plugins/forwarder_native-meter-kafka-forwarder.md:
##########
@@ -0,0 +1,15 @@
+# Forwarder/native-meter-kafka-forwarder
+## Description
+This is a synchronization Kafka forwarder with the SkyWalking native meter protocol.
+## DefaultConfig
+```yaml
+# The remote topic. 
+topic: "skywalking-meters"
+```
+## Configuration
+|Name|Type|Description|
+|----|----|-----------|
+| routing_rule_lru_cache_size | int |  |

Review Comment:
   Seem like you didn't run the `make gen-docs` again. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] zbw911 commented on a diff in pull request #128: Kafka forwards

Posted by "zbw911 (via GitHub)" <gi...@apache.org>.
zbw911 commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1148498585


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+
+	"github.com/Shopify/sarama"
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			// SniffData_SpanAttachedEvent is from ebpf agent, skywalking-rover project.
+			// You could find it here,
+			// https://github.com/apache/skywalking-data-collect-protocol/blob/0da9c8b3e111fb51c9f8854cae16d4519462ecfe
+			// /language-agent/Tracing.proto#L244
+			// ref: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1136909393
+			log.Logger.WithField("pipe", f.PipeName).Warnf("native-tracing-kafka-forwarder " +

Review Comment:
   @mrproliu  Help!!!
   I've been trying to do e2e testing for several days now, trying various methods such as using swctl to query with a single sentence, and also modifying service-instance.yml to adapt to the return value, but when it comes to the process list step, it always returns [].
   
   I have not been able to successfully execute the test, and the whole process has been stuck in this part, making progress extremely difficult.
   
   I hope you can help me figure out how to make this part run smoothly, and what kind of **environment or tools** I need to use. I have tried almost all the methods that I can try now.
   
   My usage is Docker Desktop, and the instance is running normally from Containers.
   
   https://github.com/apache/skywalking/tree/master/test/e2e-v2/cases/profiling/ebpf/network/h2
   Below is the original error message from the main branch:
   ```
     ERROR failed to verify the output: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance list --service-name=service, error:
   mismatch (-want +got):
     []interface{}{
   -       map[interface{}]interface{}{
   -               string("attributes"):   []interface{}{},
   -               string("id"):           string("c2VydmljZQ==.1_dGVzdA=="),
   -               string("instanceuuid"): string("c2VydmljZQ==.1_NTFjOGZmOTBjYmE2MTFlZGEwYjIwMjQyYWMxOTAwMDRAMTcyLjI1LjAuNA=="),
   -               string("language"):     string("UNKNOWN"),
   -               string("name"):         string("test"),
   -       },
   +       map[interface{}]interface{}{
   +               string("attributes"): []interface{}{
   +                       map[interface{}]interface{}{string("name"): string("Process No."), string("value"): string("1")},
   +                       map[interface{}]interface{}{string("name"): string("hostname"), string("value"): string("e7786ddc74c7")},
   +                       map[interface{}]interface{}{string("name"): string("OS Name"), string("value"): string("linux")},
   +                       map[interface{}]interface{}{string("name"): string("ipv4s"), string("value"): string("172.25.0.4")},
   +               },
   +               string("id"):           string("c2VydmljZQ==.1_NTFjOGZmOTBjYmE2MTFlZGEwYjIwMjQyYWMxOTAwMDRAMTcyLjI1LjAuNA=="),
   +               string("instanceuuid"): string("c2VydmljZQ==.1_NTFjOGZmOTBjYmE2MTFlZGEwYjIwMjQyYWMxOTAwMDRAMTcyLjI1LjAuNA=="),
   +               string("language"):     string("GO"),
   +               string("name"):         string("51c8ff90cba611eda0b20242ac190004@172.25.0.4"),
   +       },
     }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] zbw911 commented on a diff in pull request #128: Kafka forwards

Posted by "zbw911 (via GitHub)" <gi...@apache.org>.
zbw911 commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1148498585


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+
+	"github.com/Shopify/sarama"
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			// SniffData_SpanAttachedEvent is from ebpf agent, skywalking-rover project.
+			// You could find it here,
+			// https://github.com/apache/skywalking-data-collect-protocol/blob/0da9c8b3e111fb51c9f8854cae16d4519462ecfe
+			// /language-agent/Tracing.proto#L244
+			// ref: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1136909393
+			log.Logger.WithField("pipe", f.PipeName).Warnf("native-tracing-kafka-forwarder " +

Review Comment:
   @mrproliu  Help!!!
   I've been trying to do e2e testing for several days now, trying various methods such as using swctl to query with a single sentence, and also modifying service-instance.yml to adapt to the return value, but when it comes to the process list step, it always returns [].
   
   I have not been able to successfully execute the test, and the whole process has been stuck in this part, making progress extremely difficult.
   
   I hope you can help me figure out how to make this part run smoothly, and what kind of environment or tools I need to use. I have tried almost all the methods that I can try now.
   
   My usage is Docker Desktop, and the instance is running normally from Containers.
   
   Below is the original error message from the main branch:
   ----------
   ```
     ERROR failed to verify the output: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance list --service-name=service, error:
   mismatch (-want +got):
     []interface{}{
   -       map[interface{}]interface{}{
   -               string("attributes"):   []interface{}{},
   -               string("id"):           string("c2VydmljZQ==.1_dGVzdA=="),
   -               string("instanceuuid"): string("c2VydmljZQ==.1_NTFjOGZmOTBjYmE2MTFlZGEwYjIwMjQyYWMxOTAwMDRAMTcyLjI1LjAuNA=="),
   -               string("language"):     string("UNKNOWN"),
   -               string("name"):         string("test"),
   -       },
   +       map[interface{}]interface{}{
   +               string("attributes"): []interface{}{
   +                       map[interface{}]interface{}{string("name"): string("Process No."), string("value"): string("1")},
   +                       map[interface{}]interface{}{string("name"): string("hostname"), string("value"): string("e7786ddc74c7")},
   +                       map[interface{}]interface{}{string("name"): string("OS Name"), string("value"): string("linux")},
   +                       map[interface{}]interface{}{string("name"): string("ipv4s"), string("value"): string("172.25.0.4")},
   +               },
   +               string("id"):           string("c2VydmljZQ==.1_NTFjOGZmOTBjYmE2MTFlZGEwYjIwMjQyYWMxOTAwMDRAMTcyLjI1LjAuNA=="),
   +               string("instanceuuid"): string("c2VydmljZQ==.1_NTFjOGZmOTBjYmE2MTFlZGEwYjIwMjQyYWMxOTAwMDRAMTcyLjI1LjAuNA=="),
   +               string("language"):     string("GO"),
   +               string("name"):         string("51c8ff90cba611eda0b20242ac190004@172.25.0.4"),
   +       },
     }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] wu-sheng commented on pull request #128: Kafka forwards

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng commented on PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#issuecomment-1467597846

   I want to argue the necessarily of this. If you need a Kafka as buffer and transparent layer between agents and OAP, why do you cost resources on satellite?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] zbw911 commented on pull request #128: Kafka forwards

Posted by "zbw911 (via GitHub)" <gi...@apache.org>.
zbw911 commented on PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#issuecomment-1467671352

   > 
   Currently, I have performed gray testing on .NET, Java, and Python in the production environment. I am using Kafka directly for Java and Python, but the .NET agent does not support the use of Kafka. Therefore, I am using Satellite as a proxy to forward requests to Kafka for unified processing. Additionally, Kafka is necessary as a buffer, as I recently encountered a delay of approximately 1.8 billion in data due to an issue with ES, but the data was not lost. When using GRPC to receive OAP alarms, data loss also occurred. Kafka itself has value as a buffer, and there is a need for a means of converting GRPC to Kafka. When I discovered the Satellite project, I realized it was perfect for this type of scenario!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] zbw911 commented on a diff in pull request #128: Kafka forwards

Posted by "zbw911 (via GitHub)" <gi...@apache.org>.
zbw911 commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1137466464


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+	"github.com/Shopify/sarama"
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	"google.golang.org/protobuf/proto"
+
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			message = append(message, &sarama.ProducerMessage{

Review Comment:
   I'm not quite sure if it's like this. If it is, I'll continue tomorrow when I have time. Thank you. 
   https://github.com/zbw911/skywalking-satellite/blob/e86c410f1d06961f9b4be0dd26131772245f1ed2/plugins/forwarder/kafka/nativetracing/forwarder.go#L94
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on a diff in pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1137952035


##########
plugins/forwarder/kafka/nativelog/forwarder.go:
##########
@@ -80,10 +82,16 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
 		if !ok {
 			continue
 		}
-		for _, l := range data.LogList.Logs {
+		for _, logData := range data.LogList.Logs {
+			logdata := &v3.LogData{}
+			err := proto.Unmarshal(logData, logdata)

Review Comment:
   For now, the gRPC didn't forward the same instance to the upstream, just make sure one stream send to the one upstream. 
   If Kafka wants to forward to the same partition, then, just following what I say, decode the first log in the list, which is an element of the channel, don't decode them all. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on a diff in pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1137122400


##########
plugins/forwarder/kafka/nativelog/forwarder.go:
##########
@@ -80,10 +82,16 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
 		if !ok {
 			continue
 		}
-		for _, l := range data.LogList.Logs {
+		for _, logData := range data.LogList.Logs {
+			logdata := &v3.LogData{}
+			err := proto.Unmarshal(logData, logdata)

Review Comment:
   BTW: Have you ever tested the CPU consumption of `Unmarshal` every time you transfer data? We found that the need for meaningless encoding and decoding in each transmission will consume a lot of CPU resources, so the byte array will be transmitted directly.
   Of course, this is just an optimization item, and you can deal with it if you are interested. The principle of processing is also very simple: The downstream service is usually the same service name in one stream, and each element in the queue is also composed of the same stream, so you can take out the first element when sending and use `Unmarshal` to get the service name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#issuecomment-1470092580

   @zbw911 I have reviewed it again, please fix the issue. And some problems marked as solved have not been solved. Please deal with them. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] wu-sheng commented on a diff in pull request #128: Kafka forwards

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1139969414


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+
+	"github.com/Shopify/sarama"
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			// SniffData_SpanAttachedEvent is from ebpf agent, skywalking-rover project.
+			// You could find it here,
+			// https://github.com/apache/skywalking-data-collect-protocol/blob/0da9c8b3e111fb51c9f8854cae16d4519462ecfe
+			// /language-agent/Tracing.proto#L244
+			// ref: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1136909393
+			log.Logger.WithField("pipe", f.PipeName).Warnf("native-tracing-kafka-forwarder " +

Review Comment:
   > Can you confirm whether e2e can be run on WSL?
   
   Using Linux and macOS would be much much better than WSL on Windows. That is pain.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] zbw911 commented on a diff in pull request #128: Kafka forwards

Posted by "zbw911 (via GitHub)" <gi...@apache.org>.
zbw911 commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1140012240


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+
+	"github.com/Shopify/sarama"
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			// SniffData_SpanAttachedEvent is from ebpf agent, skywalking-rover project.
+			// You could find it here,
+			// https://github.com/apache/skywalking-data-collect-protocol/blob/0da9c8b3e111fb51c9f8854cae16d4519462ecfe
+			// /language-agent/Tracing.proto#L244
+			// ref: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1136909393
+			log.Logger.WithField("pipe", f.PipeName).Warnf("native-tracing-kafka-forwarder " +

Review Comment:
   Do you really not support WSL? If that's the case, then I'll have to switch to that Mac. PS: I'm not very used to using Mac! :(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] zbw911 commented on a diff in pull request #128: Kafka forwards

Posted by "zbw911 (via GitHub)" <gi...@apache.org>.
zbw911 commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1148498585


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+
+	"github.com/Shopify/sarama"
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			// SniffData_SpanAttachedEvent is from ebpf agent, skywalking-rover project.
+			// You could find it here,
+			// https://github.com/apache/skywalking-data-collect-protocol/blob/0da9c8b3e111fb51c9f8854cae16d4519462ecfe
+			// /language-agent/Tracing.proto#L244
+			// ref: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1136909393
+			log.Logger.WithField("pipe", f.PipeName).Warnf("native-tracing-kafka-forwarder " +

Review Comment:
   @mrproliu  Help!!!
   I've been trying to do e2e testing for several days now, trying various methods such as using swctl to query with a single sentence, and also modifying service-instance.yml to adapt to the return value, but when it comes to the process list step, it always returns [].
   
   I have not been able to successfully execute the test, and the whole process has been stuck in this part, making progress extremely difficult.
   
   I hope you can help me figure out how to make this part run smoothly, and what kind of environment or tools I need to use. I have tried almost all the methods that I can try now.
   
   My usage is Docker Desktop, and the instance is running normally from Containers.
   
   https://github.com/apache/skywalking/tree/master/test/e2e-v2/cases/profiling/ebpf/network/h2
   Below is the original error message from the main branch:
   ```
     ERROR failed to verify the output: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance list --service-name=service, error:
   mismatch (-want +got):
     []interface{}{
   -       map[interface{}]interface{}{
   -               string("attributes"):   []interface{}{},
   -               string("id"):           string("c2VydmljZQ==.1_dGVzdA=="),
   -               string("instanceuuid"): string("c2VydmljZQ==.1_NTFjOGZmOTBjYmE2MTFlZGEwYjIwMjQyYWMxOTAwMDRAMTcyLjI1LjAuNA=="),
   -               string("language"):     string("UNKNOWN"),
   -               string("name"):         string("test"),
   -       },
   +       map[interface{}]interface{}{
   +               string("attributes"): []interface{}{
   +                       map[interface{}]interface{}{string("name"): string("Process No."), string("value"): string("1")},
   +                       map[interface{}]interface{}{string("name"): string("hostname"), string("value"): string("e7786ddc74c7")},
   +                       map[interface{}]interface{}{string("name"): string("OS Name"), string("value"): string("linux")},
   +                       map[interface{}]interface{}{string("name"): string("ipv4s"), string("value"): string("172.25.0.4")},
   +               },
   +               string("id"):           string("c2VydmljZQ==.1_NTFjOGZmOTBjYmE2MTFlZGEwYjIwMjQyYWMxOTAwMDRAMTcyLjI1LjAuNA=="),
   +               string("instanceuuid"): string("c2VydmljZQ==.1_NTFjOGZmOTBjYmE2MTFlZGEwYjIwMjQyYWMxOTAwMDRAMTcyLjI1LjAuNA=="),
   +               string("language"):     string("GO"),
   +               string("name"):         string("51c8ff90cba611eda0b20242ac190004@172.25.0.4"),
   +       },
     }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] zbw911 commented on a diff in pull request #128: Kafka forwards

Posted by "zbw911 (via GitHub)" <gi...@apache.org>.
zbw911 commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1137022542


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+	"github.com/Shopify/sarama"
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	"google.golang.org/protobuf/proto"
+
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			message = append(message, &sarama.ProducerMessage{

Review Comment:
   I am not familiar with Rover and  cannot conduct testing. If I were to proceed without proper knowledge, it could negatively impact the quality.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on a diff in pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1137103242


##########
configs/satellite_config_kafka.yaml:
##########
@@ -0,0 +1,605 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# The logger configuration.
+logger:

Review Comment:
   Why close this? please delete this file, if you want to E2E test with this file, please move it to the test directory. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on a diff in pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1137104527


##########
.github/workflows/build-and-test.yaml:
##########
@@ -21,6 +21,7 @@ on:
   push:
     branches:
       - main
+      - kafka-forwards

Review Comment:
   This branch does not exist. please delete it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on a diff in pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1137122400


##########
plugins/forwarder/kafka/nativelog/forwarder.go:
##########
@@ -80,10 +82,16 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
 		if !ok {
 			continue
 		}
-		for _, l := range data.LogList.Logs {
+		for _, logData := range data.LogList.Logs {
+			logdata := &v3.LogData{}
+			err := proto.Unmarshal(logData, logdata)

Review Comment:
   BTW: Have you ever tested the CPU consumption of `Unmarshal` every time you transfer data? We found that the need for meaningless encoding and decoding in each transmission will consume a lot of CPU resources, so the byte array will be transmitted directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on a diff in pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1137122400


##########
plugins/forwarder/kafka/nativelog/forwarder.go:
##########
@@ -80,10 +82,16 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
 		if !ok {
 			continue
 		}
-		for _, l := range data.LogList.Logs {
+		for _, logData := range data.LogList.Logs {
+			logdata := &v3.LogData{}
+			err := proto.Unmarshal(logData, logdata)

Review Comment:
   BTW: Have you ever tested the CPU consumption of `Unmarshal` every time you transfer data? We found that the need for meaningless encoding and decoding in each transmission will consume a lot of CPU resources, so the byte array will be transmitted directly.
   Of course, this is just an optimization item, and you can deal with it if you are interested. The principle of processing is also very simple: downstream services usually have the same Service name in the one Stream, so when gRPC accepts data, you can decode the first Log, take out the name and save it to the queue. When kafka sends data, it can be sent according to the existing Service name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on a diff in pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1137153597


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+	"github.com/Shopify/sarama"
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	"google.golang.org/protobuf/proto"
+
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			message = append(message, &sarama.ProducerMessage{

Review Comment:
   > Take this, https://github.com/apache/skywalking/blob/2e04b069bfba24b1233dd66588217d08bdde7710/.github/workflows/skywalking.yaml#L523-L524.
   
   I think this is a good example of the setup of the attached event testing, and the test case is https://github.com/apache/skywalking/blob/master/test/e2e-v2/cases/profiling/ebpf/network/network-cases.yaml#L136-L141.
   
   This E2E test must be running in the Linux environment. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] wu-sheng closed pull request #128: Kafka forwards

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng closed pull request #128: Kafka forwards
URL: https://github.com/apache/skywalking-satellite/pull/128


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] wu-sheng commented on a diff in pull request #128: Kafka forwards

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1136909393


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+	"github.com/Shopify/sarama"
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	"google.golang.org/protobuf/proto"
+
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			message = append(message, &sarama.ProducerMessage{

Review Comment:
   You could find it here, https://github.com/apache/skywalking-data-collect-protocol/blob/0da9c8b3e111fb51c9f8854cae16d4519462ecfe/language-agent/Tracing.proto#L244



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] wu-sheng commented on a diff in pull request #128: Kafka forwards

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1137107425


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+	"github.com/Shopify/sarama"
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	"google.golang.org/protobuf/proto"
+
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			message = append(message, &sarama.ProducerMessage{

Review Comment:
   This is not resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on a diff in pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1137913791


##########
plugins/forwarder/kafka/nativelog/forwarder.go:
##########
@@ -80,10 +82,16 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
 		if !ok {
 			continue
 		}
-		for _, l := range data.LogList.Logs {
+		for _, logData := range data.LogList.Logs {
+			logdata := &v3.LogData{}
+			err := proto.Unmarshal(logData, logdata)

Review Comment:
   When the current log uses gRPC to receive messages, it does not `Unmarshal`, it directly reads binary data for processing. When using gRPC to send data, directly send the current stream for ten days, because it does not need to do anything based on metadata, so he does not need to use `Umarshal` to do anything.
   So, gRPC is fine. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on a diff in pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1137913791


##########
plugins/forwarder/kafka/nativelog/forwarder.go:
##########
@@ -80,10 +82,16 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
 		if !ok {
 			continue
 		}
-		for _, l := range data.LogList.Logs {
+		for _, logData := range data.LogList.Logs {
+			logdata := &v3.LogData{}
+			err := proto.Unmarshal(logData, logdata)

Review Comment:
   When the current log uses gRPC to receive messages, it does not `Unmarshal`, it directly reads binary data for processing. When using gRPC to send data, directly send the current stream for ten days, because it does not need to do anything based on metadata, so he does not need to use `Unmarshal` to do anything.
   So, gRPC is fine. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on a diff in pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1135259663


##########
.github/workflows/e2e-native-kafka.yaml:
##########
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+name: E2E
+
+on:
+  pull_request:
+  push:
+    branches:
+      - main
+      - kafka
+
+jobs:
+  NativeProtocolsE2E:
+    strategy:
+      matrix:
+        test:
+          - name: Native Management Kafka/JVM/Tracing/Meter/Event/Log/Profile/CDS
+            config: test/e2e/case/native-protocols-kafka/e2e.yaml

Review Comment:
   I think add this case to the existing [e2e.yaml](https://github.com/apache/skywalking-satellite/blob/b40809512baeff56eec7b95220dee71c4fc175fd/.github/workflows/e2e-native.yaml#L29-L32) is enough. 



##########
configs/satellite_config_kafka.yaml:
##########
@@ -0,0 +1,605 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# The logger configuration.
+logger:

Review Comment:
   Why we need another configuration file, Kafka I think is only optional, please remove this file. 



##########
plugins/forwarder/kafka/nativejvm/forwarder.go:
##########
@@ -0,0 +1,104 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativejvm
+
+import (
+	"fmt"
+	"reflect"
+
+	"github.com/Shopify/sarama"
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-jvm-kafka-forwarder"
+	ShowName = "Native jvm Kafka Forwarder"

Review Comment:
   ```suggestion
   	ShowName = "Native JVM Kafka Forwarder"
   ```



##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+	"github.com/Shopify/sarama"
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	"google.golang.org/protobuf/proto"
+
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			message = append(message, &sarama.ProducerMessage{

Review Comment:
   Why does the attached event hasn't a key of the message? Also, will you create a PR for handling the attached event in the OAP?



##########
plugins/forwarder/kafka/nativeprofile/forwarder.go:
##########
@@ -0,0 +1,112 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativeprofile

Review Comment:
   Please delete the comment file.



##########
test/e2e/base/base-compose-kafka.yml:
##########
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2.1'
+
+services:

Review Comment:
   The `base-compose.yml` is enough, please add the service there. 



##########
test/e2e/case/native-protocols-kafka/e2e.yaml:
##########
@@ -0,0 +1,123 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file is used to show how to write configuration files and can be used to test.
+
+setup:
+  env: compose
+  file: docker-compose.yml
+  timeout: 1200
+  init-system-environment: ../../base/env
+  steps:
+    - name: install yq
+      command: bash test/e2e/base/scripts/prepare/setup-e2e-shell/install.sh yq
+    - name: install swctl
+      command: bash test/e2e/base/scripts/prepare/setup-e2e-shell/install.sh swctl
+    - name: install etcdctl
+      command: bash test/e2e/base/scripts/prepare/setup-e2e-shell/install.sh etcdctl
+
+trigger:
+  action: http
+  interval: 3s
+  times: 5
+  url: http://${consumer_host}:${consumer_9090}/info
+  method: POST
+
+verify:
+  # verify with retry strategy
+  retry:
+    # max retry count
+    count: 20
+    # the interval between two retries, in millisecond.
+    interval: 10s
+  cases:
+    # basic check: service list
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql service ls
+      expected: expected/service.yml
+    # basic check: service metrics
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name service_sla --service-name e2e-service-provider |yq e 'to_entries' -
+      expected: expected/metrics-has-value.yml
+    # basic check: service endpoint
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql endpoint list --keyword=info --service-name e2e-service-provider
+      expected: expected/service-endpoint.yml
+    # basic check: service endpoint metrics
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name endpoint_cpm --endpoint-name POST:/info --service-name e2e-service-provider |yq e 'to_entries' -
+      expected: expected/metrics-has-value.yml
+
+    # native management: service instance list
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance list --service-name e2e-service-provider
+      expected: expected/service-instance.yml
+
+    # native jvm: service instance jvm metrics
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name instance_jvm_thread_live_count --service-name e2e-service-provider --instance-name provider1 |yq e 'to_entries' -
+      expected: expected/metrics-has-value.yml
+
+    # native tracing: trace segment list
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace ls
+      expected: expected/traces-list.yml
+    # native tracing: trace detail
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace $(swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace ls|grep -A 5 '/info'|tail -n1|awk -F ' ' '{print $2}')
+      expected: expected/trace-info-detail.yml
+
+    # native meter: instance meter
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name meter_jvm_memory_used --instance-name provider1 --service-name e2e-service-provider |yq e 'to_entries' -
+      expected: expected/metrics-has-value.yml
+
+    # native event: event list
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql event list --service-name e2e-service-provider --instance-name provider1
+      expected: expected/event-list.yml
+
+    # native log: logs list
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql logs list --service-name e2e-service-provider --trace-id=$(swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace ls|grep -A 5 '/info'|tail -n1|awk -F ' ' '{print $2}')
+      expected: expected/logs-list.yml
+
+    # native profile: create task
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profiling trace create --service-name e2e-service-provider --endpoint-name POST:/info --start-time=$((($(date +%s)+5)*1000)) --duration=1 --min-duration-threshold=0 --dump-period=10 --max-sampling-count=9

Review Comment:
   The profile and CDS protocol is not implemented, so why do we need to check this? Please check it locally and remove the unnecessary expected files. 



##########
plugins/forwarder/kafka/nativemeter/forwarder.go:
##########
@@ -0,0 +1,125 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativemeter
+
+import (
+	"fmt"
+	"reflect"
+	"time"
+
+	"github.com/Shopify/sarama"
+	"google.golang.org/protobuf/proto"
+	"k8s.io/apimachinery/pkg/util/cache"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-meter-kafka-forwarder"
+	ShowName = "Native Meter Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	RoutingRuleLRUCacheSize int `mapstructure:"routing_rule_lru_cache_size"`
+	// The TTL of the LRU cache size for hosting routine rules of service instance.
+	RoutingRuleLRUCacheTTL int    `mapstructure:"routing_rule_lru_cache_ttl"`

Review Comment:
   Please delete the useless code.



##########
test/e2e/base/satellite/Dockerfile-Kafka:
##########
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM golang:1.18

Review Comment:
   Please don't re-create other Dockerfile of the satellite. we should only use the same image. If you just want to copy the configuration file, please use the volume. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on a diff in pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1137140222


##########
plugins/forwarder/kafka/nativemeter/forwarder.go:
##########
@@ -0,0 +1,125 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativemeter
+
+import (
+	"fmt"
+	"reflect"
+	"time"
+
+	"github.com/Shopify/sarama"
+	"google.golang.org/protobuf/proto"
+	"k8s.io/apimachinery/pkg/util/cache"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-meter-kafka-forwarder"
+	ShowName = "Native Meter Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	RoutingRuleLRUCacheSize int `mapstructure:"routing_rule_lru_cache_size"`
+	// The TTL of the LRU cache size for hosting routine rules of service instance.
+	RoutingRuleLRUCacheTTL int    `mapstructure:"routing_rule_lru_cache_ttl"`

Review Comment:
   Please fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] zbw911 commented on a diff in pull request #128: Kafka forwards

Posted by "zbw911 (via GitHub)" <gi...@apache.org>.
zbw911 commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1137375878


##########
plugins/forwarder/kafka/nativelog/forwarder.go:
##########
@@ -80,10 +82,16 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
 		if !ok {
 			continue
 		}
-		for _, l := range data.LogList.Logs {
+		for _, logData := range data.LogList.Logs {
+			logdata := &v3.LogData{}
+			err := proto.Unmarshal(logData, logdata)

Review Comment:
   Yes, there is indeed some impact on performance, although I haven't delved deeply into how much. In a previous version, I generated entities with only a few necessary fields for deserialization through a proto file. However, I found that I needed to maintain another entity. As a result, I deleted the generated files. Another ultimate solution, as you suggested, is to unpack the binary stream and obtain the corresponding fields based on the delimiter. We'll leave this as an optimization point for now, as it is sufficient for our current needs. However, as more applications are integrated later, we may optimize according to these solutions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] wu-sheng commented on a diff in pull request #128: Kafka forwards

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1139627409


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+	"github.com/Shopify/sarama"
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	"google.golang.org/protobuf/proto"
+
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			message = append(message, &sarama.ProducerMessage{

Review Comment:
   Are you really reading the protocol? Read more carefully, rather than guessing.
   
   https://github.com/apache/skywalking-data-collect-protocol/blob/0da9c8b3e111fb51c9f8854cae16d4519462ecfe/language-agent/Tracing.proto#L233-L237
   
   What you thought is actually impossible from gRPC stand point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on a diff in pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1140020793


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+
+	"github.com/Shopify/sarama"
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			// SniffData_SpanAttachedEvent is from ebpf agent, skywalking-rover project.
+			// You could find it here,
+			// https://github.com/apache/skywalking-data-collect-protocol/blob/0da9c8b3e111fb51c9f8854cae16d4519462ecfe
+			// /language-agent/Tracing.proto#L244
+			// ref: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1136909393
+			log.Logger.WithField("pipe", f.PipeName).Warnf("native-tracing-kafka-forwarder " +

Review Comment:
   It should be works well with windows. Could you provide more detail about the error? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#issuecomment-1467675262

   At present, I think we can accept him. I checked the current official agents, they are basically only supported by the gRPC protocol. The satellite itself can be used as a protocol conversion, which may be an option for agents that need to be modified on a large scale. 
   If all Agents provide Kafka's agreement later, I think it can be deleted in satellite.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] zbw911 commented on a diff in pull request #128: Kafka forwards

Posted by "zbw911 (via GitHub)" <gi...@apache.org>.
zbw911 commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1137466464


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+	"github.com/Shopify/sarama"
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	"google.golang.org/protobuf/proto"
+
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			message = append(message, &sarama.ProducerMessage{

Review Comment:
   I'm not quite sure if it's like this. If it is, I'll continue tomorrow when I have time. Thank you. 
   https://github.com/zbw911/skywalking-satellite/blob/e86c410f1d06961f9b4be0dd26131772245f1ed2/plugins/forwarder/kafka/nativetracing/forwarder.go#L94
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#issuecomment-1469215856

   There are still a lot of unprocessed previews here. Please handle them all.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] zbw911 commented on a diff in pull request #128: Kafka forwards

Posted by "zbw911 (via GitHub)" <gi...@apache.org>.
zbw911 commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1136898398


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+	"github.com/Shopify/sarama"
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	"google.golang.org/protobuf/proto"
+
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			message = append(message, &sarama.ProducerMessage{

Review Comment:
   Today I was looking for the entity of SniffData_SpanAttachedEvent corresponding to the code of a Java agent, but I couldn't find it. I also tried to debug the project with breakpoints, but it didn't go into this part of the code. Can we ignore this part of the code for now, or could you give me some hints on how to locate it? Thank you.   @mrproliu 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] zbw911 commented on a diff in pull request #128: Kafka forwards

Posted by "zbw911 (via GitHub)" <gi...@apache.org>.
zbw911 commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1136995930


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+	"github.com/Shopify/sarama"
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	"google.golang.org/protobuf/proto"
+
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			message = append(message, &sarama.ProducerMessage{

Review Comment:
   I carefully looked into it and we might not be able to add the SpanAttachedEvent feature to OAP anytime soon. So, for now, we can simply warn users that the current version of Kafka  forwarder  does not support this feature. It's not the best solution, but it's the only option we have for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] wu-sheng commented on a diff in pull request #128: Kafka forwards

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1137910692


##########
plugins/forwarder/kafka/nativelog/forwarder.go:
##########
@@ -80,10 +82,16 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
 		if !ok {
 			continue
 		}
-		for _, l := range data.LogList.Logs {
+		for _, logData := range data.LogList.Logs {
+			logdata := &v3.LogData{}
+			err := proto.Unmarshal(logData, logdata)

Review Comment:
   @mrproliu As a log forwarder, what it is the current way(gRPC) to process this? Is this consistent with that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] zbw911 commented on a diff in pull request #128: Kafka forwards

Posted by "zbw911 (via GitHub)" <gi...@apache.org>.
zbw911 commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1139616866


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+	"github.com/Shopify/sarama"
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	"google.golang.org/protobuf/proto"
+
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			message = append(message, &sarama.ProducerMessage{

Review Comment:
   I think the best approach for now is to temporarily abandon support for *v1.SniffData_SpanAttachedEvent in Kafka mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on a diff in pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1137105296


##########
.github/workflows/e2e-istio.yaml:
##########
@@ -21,6 +21,7 @@ on:
   push:
     branches:
       - main
+      - kafka-forwards

Review Comment:
   Should be deleted too.



##########
.github/workflows/e2e-native.yaml:
##########
@@ -20,6 +20,7 @@ on:
   push:
     branches:
       - main
+      - kafka-forwards

Review Comment:
   Should be deleted too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] wu-sheng commented on a diff in pull request #128: Kafka forwards

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1137918463


##########
plugins/forwarder/kafka/nativelog/forwarder.go:
##########
@@ -80,10 +82,16 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
 		if !ok {
 			continue
 		}
-		for _, l := range data.LogList.Logs {
+		for _, logData := range data.LogList.Logs {
+			logdata := &v3.LogData{}
+			err := proto.Unmarshal(logData, logdata)

Review Comment:
   Then, we need to make Kafka follow the same way. If the gRPC forwards to the same channel, the Kafka forward should to the same partition.
   @mrproliu Let's make sure this Kafka forward is always consistent with gRPC routing.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] zbw911 commented on a diff in pull request #128: Kafka forwards

Posted by "zbw911 (via GitHub)" <gi...@apache.org>.
zbw911 commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1139956883


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+
+	"github.com/Shopify/sarama"
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			// SniffData_SpanAttachedEvent is from ebpf agent, skywalking-rover project.
+			// You could find it here,
+			// https://github.com/apache/skywalking-data-collect-protocol/blob/0da9c8b3e111fb51c9f8854cae16d4519462ecfe
+			// /language-agent/Tracing.proto#L244
+			// ref: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1136909393
+			log.Logger.WithField("pipe", f.PipeName).Warnf("native-tracing-kafka-forwarder " +

Review Comment:
   Sorry, I misunderstood your question about OAP earlier. I will try to   that later.
    
   @mrproliu  By the way, do you have detailed documentation or instructions for using e2e? 
   I've been getting errors when trying to use it in the WSL command line (ERROR Local Docker compose exited abnormally whilst running docker-compose: [--env-file /mnt/f/github/apache/skywalking-infra-e2e/examples/compose/env up -d]. exit status 1). I couldn't find a solution in the SkyWalking issue tracker either. I have to submit it to the GitHub workflow for verification, which is a bit inefficient.
   Can you confirm whether e2e can be run on WSL?
   Thank you .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] wu-sheng commented on pull request #128: Kafka forwards

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng commented on PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#issuecomment-1467611653

   Directly setting up a Kafka sender at the agent side would be much better to maintain this kind of forwarder by satellite.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] wu-sheng commented on pull request #128: Kafka forwards

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng commented on PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#issuecomment-1467626812

   About the existing Kafka forward of logs, we explained why it is there at https://github.com/apache/skywalking-satellite/pull/123#issuecomment-1328990817
   
   Anyway, @mrproliu, do you want to accept this? Or it is time we remove the Kafka forward(log) permanently in case of further confusion?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] wu-sheng commented on pull request #128: Kafka forwards

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng commented on PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#issuecomment-1467676455

   Notice, I am not arguing for Kafka VS gRPC. The key point is why don't implement Kafka sender on .net agent side like Java and Python, it has better performance than using Satellite as an extra node.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] zbw911 commented on a diff in pull request #128: Kafka forwards

Posted by "zbw911 (via GitHub)" <gi...@apache.org>.
zbw911 commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1139625186


##########
plugins/forwarder/kafka/nativelog/forwarder.go:
##########
@@ -80,10 +82,16 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
 		if !ok {
 			continue
 		}
-		for _, l := range data.LogList.Logs {
+		for _, logData := range data.LogList.Logs {
+			logdata := &v3.LogData{}
+			err := proto.Unmarshal(logData, logdata)

Review Comment:
   Ok, I known. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] mrproliu commented on a diff in pull request #128: Kafka forwards

Posted by "mrproliu (via GitHub)" <gi...@apache.org>.
mrproliu commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1139837096


##########
test/e2e/case/native-protocols-kafka/satellite_config_kafka.yaml:
##########
@@ -0,0 +1,597 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# The logger configuration.
+logger:
+  # The log format pattern configuration.
+  log_pattern: ${SATELLITE_LOGGER_LOG_PATTERN:%time [%level][%field] - %msg}
+  # The time format pattern configuration.
+  time_pattern: ${SATELLITE_LOGGER_TIME_PATTERN:2006-01-02 15:04:05.000}
+  # The lowest level of printing allowed.
+  level: ${SATELLITE_LOGGER_LEVEL:info}
+
+# The Satellite self telemetry configuration.
+telemetry:
+  # The space concept for the deployment, such as the namespace concept in the Kubernetes.
+  cluster: ${SATELLITE_TELEMETRY_CLUSTER:satellite-cluster}
+  # The group concept for the deployment, such as the service resource concept in the Kubernetes.
+  service: ${SATELLITE_TELEMETRY_SERVICE:satellite-service}
+  # The minimum running unit, such as the pod concept in the Kubernetes.
+  instance: ${SATELLITE_TELEMETRY_INSTANCE:satellite-instance}
+  # Telemetry export type, support "prometheus", "metrics_service" or "none"
+  export_type: ${SATELLITE_TELEMETRY_EXPORT_TYPE:prometheus}
+  # Export telemetry data through Prometheus server, only works on "export_type=prometheus".
+  prometheus:
+    # The prometheus server address.
+    address: ${SATELLITE_TELEMETRY_PROMETHEUS_ADDRESS::1234}
+    # The prometheus server metrics endpoint.
+    endpoint: ${SATELLITE_TELEMETRY_PROMETHEUS_ENDPOINT:/metrics}
+  # Export telemetry data through native meter format to OAP backend, only works on "export_type=metrics_service".
+  metrics_service:
+    # The grpc-client plugin name, using the SkyWalking native batch meter protocol
+    client_name: ${SATELLITE_TELEMETRY_METRICS_SERVICE_CLIENT_NAME:grpc-client}
+    # The interval second for sending metrics
+    interval: ${SATELLITE_TELEMETRY_METRICS_SERVICE_INTERVAL:10}
+    # The prefix of telemetry metric name
+    metric_prefix: ${SATELLITE_TELEMETRY_METRICS_SERVICE_METRIC_PREFIX:sw_stl_}
+
+# The sharing plugins referenced by the specific plugins in the different pipes.
+sharing:
+  clients:
+    - plugin_name: "grpc-client"

Review Comment:
   This client is unnecessary, please remove.



##########
test/e2e/case/native-protocols-kafka/satellite_config_kafka.yaml:
##########
@@ -0,0 +1,597 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# The logger configuration.
+logger:
+  # The log format pattern configuration.
+  log_pattern: ${SATELLITE_LOGGER_LOG_PATTERN:%time [%level][%field] - %msg}
+  # The time format pattern configuration.
+  time_pattern: ${SATELLITE_LOGGER_TIME_PATTERN:2006-01-02 15:04:05.000}
+  # The lowest level of printing allowed.
+  level: ${SATELLITE_LOGGER_LEVEL:info}
+
+# The Satellite self telemetry configuration.
+telemetry:
+  # The space concept for the deployment, such as the namespace concept in the Kubernetes.
+  cluster: ${SATELLITE_TELEMETRY_CLUSTER:satellite-cluster}
+  # The group concept for the deployment, such as the service resource concept in the Kubernetes.
+  service: ${SATELLITE_TELEMETRY_SERVICE:satellite-service}
+  # The minimum running unit, such as the pod concept in the Kubernetes.
+  instance: ${SATELLITE_TELEMETRY_INSTANCE:satellite-instance}
+  # Telemetry export type, support "prometheus", "metrics_service" or "none"
+  export_type: ${SATELLITE_TELEMETRY_EXPORT_TYPE:prometheus}
+  # Export telemetry data through Prometheus server, only works on "export_type=prometheus".
+  prometheus:
+    # The prometheus server address.
+    address: ${SATELLITE_TELEMETRY_PROMETHEUS_ADDRESS::1234}
+    # The prometheus server metrics endpoint.
+    endpoint: ${SATELLITE_TELEMETRY_PROMETHEUS_ENDPOINT:/metrics}
+  # Export telemetry data through native meter format to OAP backend, only works on "export_type=metrics_service".
+  metrics_service:
+    # The grpc-client plugin name, using the SkyWalking native batch meter protocol
+    client_name: ${SATELLITE_TELEMETRY_METRICS_SERVICE_CLIENT_NAME:grpc-client}
+    # The interval second for sending metrics
+    interval: ${SATELLITE_TELEMETRY_METRICS_SERVICE_INTERVAL:10}
+    # The prefix of telemetry metric name
+    metric_prefix: ${SATELLITE_TELEMETRY_METRICS_SERVICE_METRIC_PREFIX:sw_stl_}
+
+# The sharing plugins referenced by the specific plugins in the different pipes.
+sharing:
+  clients:
+    - plugin_name: "grpc-client"

Review Comment:
   This client is unnecessary, please delete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] zbw911 commented on a diff in pull request #128: Kafka forwards

Posted by "zbw911 (via GitHub)" <gi...@apache.org>.
zbw911 commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1140128315


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+
+	"github.com/Shopify/sarama"
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			// SniffData_SpanAttachedEvent is from ebpf agent, skywalking-rover project.
+			// You could find it here,
+			// https://github.com/apache/skywalking-data-collect-protocol/blob/0da9c8b3e111fb51c9f8854cae16d4519462ecfe
+			// /language-agent/Tracing.proto#L244
+			// ref: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1136909393
+			log.Logger.WithField("pipe", f.PipeName).Warnf("native-tracing-kafka-forwarder " +

Review Comment:
   > It should be works well with windows. Could you provide more detail about the error?
   
   the example of skywalking-infra-e2e:
   
   1,windows CMD:
   ```
   F:\github\apache\skywalking-infra-e2e\bin\windows>.\e2e.exe  run -c ../../examples/compose/e2e.yaml
   INFO load the e2e config successfully             
   : invalid reference format: no such image: apache/skywalking-oap-server:8.5.0-es6
   INFO deleting docker compose cluster...           
   INFO cleanup part finished successfully           
   ERROR Local Docker compose exited abnormally whilst running docker-compose.exe: [--env-file F:\github\apache\skywalking-infra-e2e\examples\compose\env up -d]. exit status 1 
   ```
   
   2,windows PowerShell:
   ```
   PS F:\github\apache\skywalking-infra-e2e\bin\windows> .\e2e.exe  run -c ../../examples/compose/e2e.yaml
   INFO load the e2e config successfully
   : invalid reference format: no such image: apache/skywalking-ui:8.5.0
   INFO deleting docker compose cluster...
   INFO cleanup part finished successfully
   ERROR Local Docker compose exited abnormally whilst running docker-compose.exe: [--env-file F:\github\apache\skywalking-infra-e2e\examples\compose\env up -d]. exit status 1
   ```
   
   3,WSL 2:
   ```
   root@DESKTOP-P1D7M3U:/mnt/f/github/apache/skywalking-infra-e2e/bin/linux# e2e run -c ../../examples/compose/e2e.yaml
   INFO load the e2e config successfully
   : invalid reference format: no such image: apache/skywalking-ui:8.5.0
   INFO deleting docker compose cluster...
   INFO cleanup part finished successfully
   ERROR Local Docker compose exited abnormally whilst running docker-compose: [--env-file /mnt/f/github/apache/skywalking-infra-e2e/examples/compose/env up -d]. exit status 1
   ```
   
   
   
   docker-compose :
   ```
   root@DESKTOP-P1D7M3U:/mnt/f/github/apache/skywalking-infra-e2e/examples/compose# docker-compose --env-file=./env  up -d
   [+] Running 2/2
    ⠿ Container compose-oap-1  Healthy                                                                                                                                                                                                                                  11.6s 
    ⠿ Container compose-ui-1   Started  
   ```
   
   I have been able to successfully start using the docker-compose command on both Windows and WSL. However, I have never been successful in using the e2e command directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] zbw911 commented on a diff in pull request #128: Kafka forwards

Posted by "zbw911 (via GitHub)" <gi...@apache.org>.
zbw911 commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1139614673


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+	"github.com/Shopify/sarama"
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	"google.golang.org/protobuf/proto"
+
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			message = append(message, &sarama.ProducerMessage{

Review Comment:
   The existing OAP server cannot parse SpanAttachedEvent (https://github.com/apache/skywalking/blob/2e04b069bfba24b1233dd66588217d08bdde7710/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/TraceSegmentHandler.java#L71-L86).
   
   Parsing in the current mode may look like the following:
   ``` 
   try{
           SegmentObject segment = SegmentObject.parseFrom(record.value().get());
           segmentParserService.send(segment);
   }
   catch(InvalidProtocolBufferException e)
   {
         try{
             SpanAttachedEvent spanAttachedEvent = SpanAttachedEvent.parseFrom(record.value().get());
             //.................
         }
         catch(InvalidProtocolBufferException e )
         {
         // ...........
         }
   }
   ```
   
   Making changes to the above code is somewhat difficult.
   
   Therefore, a warning message should be added to the tracing forwarder, explicitly indicating that the current Kafka mode does not support SpanAttachedEvent, so that users can be aware of this limitation and take appropriate action.
   
   Thus, instead of adding a key to SpanAttachedEvent, we should explicitly inform the user that the current Kafka mode does not support this event to prevent misuse.
   
   If necessary, we can increase the log level to error.
   
   Additionally, I suggest refining the type in goapi as a possible future option.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] wu-sheng commented on a diff in pull request #128: Kafka forwards

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1139630647


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+	"github.com/Shopify/sarama"
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	"google.golang.org/protobuf/proto"
+
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			message = append(message, &sarama.ProducerMessage{

Review Comment:
   All protocols are only getting accepted if the backend supports it. 
   Be clear about the status, and be patient in reading projects and docs. 
   
   You are proposing a big change. From a maintainer perspective, it is already not easy to follow your first contribution as huge, you should get familiar with the project more rather than keeping saying `best approach` or `should`.
   
   Again and again, we have given you the e2e to simulate how to use ebpf agent and all these protocols. You at least should follow them and try them.
   
   What you replied makes me concerned you are only pushing this in hurry, rather than really want to understand what is the case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] wu-sheng commented on a diff in pull request #128: Kafka forwards

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1137392862


##########
plugins/forwarder/kafka/nativelog/forwarder.go:
##########
@@ -80,10 +82,16 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
 		if !ok {
 			continue
 		}
-		for _, l := range data.LogList.Logs {
+		for _, logData := range data.LogList.Logs {
+			logdata := &v3.LogData{}
+			err := proto.Unmarshal(logData, logdata)

Review Comment:
   I don't think we should loss performance due to add Kafka forward.
   That isn't how things work. 
   Please keep what we do for now, until open process mechanism discussion, and have new resolution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking-satellite] wu-sheng commented on a diff in pull request #128: Kafka forwards

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng commented on code in PR #128:
URL: https://github.com/apache/skywalking-satellite/pull/128#discussion_r1136908307


##########
plugins/forwarder/kafka/nativetracing/forwarder.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"fmt"
+	"reflect"
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+	"github.com/Shopify/sarama"
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	"google.golang.org/protobuf/proto"
+
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+	Name     = "native-tracing-kafka-forwarder"
+	ShowName = "Native Tracing Kafka Forwarder"
+)
+
+type Forwarder struct {
+	config.CommonFields
+	Topic    string `mapstructure:"topic"` // The forwarder topic.
+	producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) ShowName() string {
+	return ShowName
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization Kafka forwarder with the SkyWalking native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return `
+# The remote topic. 
+topic: "skywalking-segments"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(sarama.Client)
+	if !ok {
+		return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	producer, err := sarama.NewSyncProducerFromClient(client)
+	if err != nil {
+		return err
+	}
+	f.producer = producer
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	var message []*sarama.ProducerMessage
+	for _, e := range batch {
+		switch data := e.GetData().(type) {
+		case *v1.SniffData_Segment:
+			segmentObject := &v3.SegmentObject{}
+			err := proto.Unmarshal(data.Segment, segmentObject)
+			if err != nil {
+				return err
+			}
+
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Key:   sarama.StringEncoder(segmentObject.GetTraceSegmentId()),
+				Value: sarama.ByteEncoder(data.Segment),
+			})
+		case *v1.SniffData_SpanAttachedEvent:
+			message = append(message, &sarama.ProducerMessage{

Review Comment:
   `SniffData_SpanAttachedEvent` is from ebpf agent, skywalking-rover project.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org