You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/11/22 14:11:59 UTC
[skywalking] branch master updated: Add MeterReportService collectBatch method. (#8165)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 8436135 Add MeterReportService collectBatch method. (#8165)
8436135 is described below
commit 8436135dc919351a6d0ab42ff48c85e1b3104b53
Author: liqiangz <li...@gmail.com>
AuthorDate: Mon Nov 22 22:11:39 2021 +0800
Add MeterReportService collectBatch method. (#8165)
---
CHANGES.md | 1 +
apm-protocol/apm-network/src/main/proto | 2 +-
.../provider/handler/MeterServiceHandler.java | 30 +++++++
test/e2e-v2/cases/kafka/meter/e2e.yaml | 2 +-
.../meter/expected/metrics-has-value.yml} | 30 +------
.../kafka/meter/expected/service-instance.yml | 40 +++++++++
.../meter/expected/service.yml} | 31 ++-----
.../cases/{ => kafka}/meter/meter-cases.yaml | 2 +-
.../meter/{docker-compose.yml => batch-meter.yaml} | 31 ++-----
test/e2e-v2/cases/meter/docker-compose.yml | 25 +++++-
test/e2e-v2/cases/meter/meter-cases.yaml | 5 ++
.../controller/MeterMetricSenderController.java | 97 ++++++++++++++++++++++
.../java-test-service/e2e-protocol/src/main/proto | 2 +-
13 files changed, 215 insertions(+), 83 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 6283e24..0883217 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -55,6 +55,7 @@ Release Notes.
* Fix concurrency bug in MAL `increase`-related calculation.
* Fix a null pointer bug when building `SampleFamily`.
* Fix the so11y latency of persistence execution latency not correct in ElasticSearch storage.
+* Add `MeterReportService` `collectBatch` method.
#### UI
diff --git a/apm-protocol/apm-network/src/main/proto b/apm-protocol/apm-network/src/main/proto
index 21492e4..fbbe955 160000
--- a/apm-protocol/apm-network/src/main/proto
+++ b/apm-protocol/apm-network/src/main/proto
@@ -1 +1 @@
-Subproject commit 21492e496b797567d0e127f4510509baf73e10fd
+Subproject commit fbbe955545fd2c942ca59cd05720a084d010bb8a
diff --git a/oap-server/server-receiver-plugin/skywalking-meter-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/meter/provider/handler/MeterServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-meter-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/meter/provider/handler/MeterServiceHandler.java
index b2f0d2e..73cdd44 100644
--- a/oap-server/server-receiver-plugin/skywalking-meter-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/meter/provider/handler/MeterServiceHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-meter-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/meter/provider/handler/MeterServiceHandler.java
@@ -22,6 +22,7 @@ import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.common.v3.Commands;
import org.apache.skywalking.apm.network.language.agent.v3.MeterData;
+import org.apache.skywalking.apm.network.language.agent.v3.MeterDataCollection;
import org.apache.skywalking.apm.network.language.agent.v3.MeterReportServiceGrpc;
import org.apache.skywalking.oap.server.analyzer.provider.meter.process.IMeterProcessService;
import org.apache.skywalking.oap.server.analyzer.provider.meter.process.MeterProcessor;
@@ -87,4 +88,33 @@ public class MeterServiceHandler extends MeterReportServiceGrpc.MeterReportServi
}
};
}
+
+ @Override
+ public StreamObserver<MeterDataCollection> collectBatch(StreamObserver<Commands> responseObserver) {
+ return new StreamObserver<MeterDataCollection>() {
+ @Override
+ public void onNext(MeterDataCollection meterDataCollection) {
+ final MeterProcessor processor = processService.createProcessor();
+ try (HistogramMetrics.Timer ignored = histogram.createTimer()) {
+ meterDataCollection.getMeterDataList().forEach(processor::read);
+ processor.process();
+ } catch (Exception e) {
+ errorCounter.inc();
+ log.error(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ log.error(throwable.getMessage(), throwable);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onCompleted() {
+ responseObserver.onNext(Commands.newBuilder().build());
+ responseObserver.onCompleted();
+ }
+ };
+ }
}
diff --git a/test/e2e-v2/cases/kafka/meter/e2e.yaml b/test/e2e-v2/cases/kafka/meter/e2e.yaml
index 4fff535..398897d 100644
--- a/test/e2e-v2/cases/kafka/meter/e2e.yaml
+++ b/test/e2e-v2/cases/kafka/meter/e2e.yaml
@@ -42,4 +42,4 @@ verify:
interval: 3s
cases:
- includes:
- - ../../meter/meter-cases.yaml
+ - meter-cases.yaml
diff --git a/test/e2e-v2/cases/meter/docker-compose.yml b/test/e2e-v2/cases/kafka/meter/expected/metrics-has-value.yml
similarity index 62%
copy from test/e2e-v2/cases/meter/docker-compose.yml
copy to test/e2e-v2/cases/kafka/meter/expected/metrics-has-value.yml
index 7ab0404..5359e6d 100644
--- a/test/e2e-v2/cases/meter/docker-compose.yml
+++ b/test/e2e-v2/cases/kafka/meter/expected/metrics-has-value.yml
@@ -13,29 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-version: '2.1'
-
-services:
- oap:
- environment:
- SW_METER_ANALYZER_ACTIVE_FILES: spring-sleuth
- extends:
- file: ../../script/docker-compose/base-compose.yml
- service: oap
- ports:
- - 12800
-
- provider:
- extends:
- file: ../../script/docker-compose/base-compose.yml
- service: provider
- environment:
- SW_METER_REPORT_INTERVAL: 5
- depends_on:
- oap:
- condition: service_healthy
- ports:
- - 9090
-
-networks:
- e2e:
+{{- contains . }}
+- key: {{ notEmpty .key }}
+ value: {{ ge .value 1 }}
+{{- end }}
diff --git a/test/e2e-v2/cases/kafka/meter/expected/service-instance.yml b/test/e2e-v2/cases/kafka/meter/expected/service-instance.yml
new file mode 100644
index 0000000..e525205
--- /dev/null
+++ b/test/e2e-v2/cases/kafka/meter/expected/service-instance.yml
@@ -0,0 +1,40 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+{{- contains . }}
+- id: {{ b64enc "e2e-service-provider" }}.1_{{ b64enc "provider1" }}
+ name: provider1
+ attributes:
+ {{- contains .attributes }}
+ - name: OS Name
+ value: Linux
+ - name: hostname
+ value: {{ notEmpty .value }}
+ - name: Process No.
+ value: "1"
+ - name: Start Time
+ value: {{ notEmpty .value }}
+ - name: JVM Arguments
+ value: '{{ notEmpty .value }}'
+ - name: Jar Dependencies
+ value: '{{ notEmpty .value }}'
+ - name: ipv4s
+ value: {{ notEmpty .value }}
+ {{- end}}
+ language: JAVA
+ instanceuuid: {{ b64enc "e2e-service-provider" }}.1_{{ b64enc "provider1" }}
+{{- end}}
diff --git a/test/e2e-v2/cases/meter/docker-compose.yml b/test/e2e-v2/cases/kafka/meter/expected/service.yml
similarity index 62%
copy from test/e2e-v2/cases/meter/docker-compose.yml
copy to test/e2e-v2/cases/kafka/meter/expected/service.yml
index 7ab0404..3a33348 100644
--- a/test/e2e-v2/cases/meter/docker-compose.yml
+++ b/test/e2e-v2/cases/kafka/meter/expected/service.yml
@@ -13,29 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-version: '2.1'
-
-services:
- oap:
- environment:
- SW_METER_ANALYZER_ACTIVE_FILES: spring-sleuth
- extends:
- file: ../../script/docker-compose/base-compose.yml
- service: oap
- ports:
- - 12800
-
- provider:
- extends:
- file: ../../script/docker-compose/base-compose.yml
- service: provider
- environment:
- SW_METER_REPORT_INTERVAL: 5
- depends_on:
- oap:
- condition: service_healthy
- ports:
- - 9090
-
-networks:
- e2e:
+{{- contains . }}
+- id: {{ b64enc "e2e-service-provider" }}.1
+ name: e2e-service-provider
+ group: ""
+{{- end }}
diff --git a/test/e2e-v2/cases/meter/meter-cases.yaml b/test/e2e-v2/cases/kafka/meter/meter-cases.yaml
similarity index 97%
copy from test/e2e-v2/cases/meter/meter-cases.yaml
copy to test/e2e-v2/cases/kafka/meter/meter-cases.yaml
index d1eeb7c..ce40b82 100644
--- a/test/e2e-v2/cases/meter/meter-cases.yaml
+++ b/test/e2e-v2/cases/kafka/meter/meter-cases.yaml
@@ -26,4 +26,4 @@
- query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_jvm_threads_live --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
expected: expected/metrics-has-value.yml
- query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_process_files_max --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
- expected: expected/metrics-has-value.yml
+ expected: expected/metrics-has-value.yml
\ No newline at end of file
diff --git a/test/e2e-v2/cases/meter/docker-compose.yml b/test/e2e-v2/cases/meter/batch-meter.yaml
similarity index 62%
copy from test/e2e-v2/cases/meter/docker-compose.yml
copy to test/e2e-v2/cases/meter/batch-meter.yaml
index 7ab0404..625d23a 100644
--- a/test/e2e-v2/cases/meter/docker-compose.yml
+++ b/test/e2e-v2/cases/meter/batch-meter.yaml
@@ -13,29 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-version: '2.1'
-
-services:
- oap:
- environment:
- SW_METER_ANALYZER_ACTIVE_FILES: spring-sleuth
- extends:
- file: ../../script/docker-compose/base-compose.yml
- service: oap
- ports:
- - 12800
-
- provider:
- extends:
- file: ../../script/docker-compose/base-compose.yml
- service: provider
- environment:
- SW_METER_REPORT_INTERVAL: 5
- depends_on:
- oap:
- condition: service_healthy
- ports:
- - 9090
-
-networks:
- e2e:
+expSuffix: instance(['service'], ['instance'])
+metricPrefix: batch
+metricsRules:
+ - name: test
+ exp: batch_test
diff --git a/test/e2e-v2/cases/meter/docker-compose.yml b/test/e2e-v2/cases/meter/docker-compose.yml
index 7ab0404..7590a7d 100644
--- a/test/e2e-v2/cases/meter/docker-compose.yml
+++ b/test/e2e-v2/cases/meter/docker-compose.yml
@@ -18,10 +18,12 @@ version: '2.1'
services:
oap:
environment:
- SW_METER_ANALYZER_ACTIVE_FILES: spring-sleuth
+ SW_METER_ANALYZER_ACTIVE_FILES: spring-sleuth,batch-meter
extends:
file: ../../script/docker-compose/base-compose.yml
service: oap
+ volumes:
+ - ./batch-meter.yaml:/skywalking/config/meter-analyzer-config/batch-meter.yaml
ports:
- 12800
@@ -37,5 +39,26 @@ services:
ports:
- 9090
+ sender:
+ image: "adoptopenjdk/openjdk8:alpine-jre"
+ volumes:
+ - ./../../java-test-service/e2e-mock-sender/target/e2e-mock-sender-2.0.0.jar:/e2e-mock-sender-2.0.0.jar
+ command: [ "java", "-jar", "/e2e-mock-sender-2.0.0.jar" ]
+ environment:
+ OAP_HOST: oap
+ OAP_GRPC_PORT: 11800
+ networks:
+ - e2e
+ ports:
+ - 9093
+ healthcheck:
+ test: ["CMD", "sh", "-c", "nc -nz 127.0.0.1 9093"]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+ depends_on:
+ oap:
+ condition: service_healthy
+
networks:
e2e:
diff --git a/test/e2e-v2/cases/meter/meter-cases.yaml b/test/e2e-v2/cases/meter/meter-cases.yaml
index d1eeb7c..e3bae75 100644
--- a/test/e2e-v2/cases/meter/meter-cases.yaml
+++ b/test/e2e-v2/cases/meter/meter-cases.yaml
@@ -27,3 +27,8 @@
expected: expected/metrics-has-value.yml
- query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_process_files_max --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
expected: expected/metrics-has-value.yml
+ - query: |
+ curl -s -XPOST http://${sender_host}:${sender_9093}/sendBatchMetrics > /dev/null;
+ sleep 10;
+ swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=batch_test --instance-name=test-instance --service-name=test-service |yq e 'to_entries' -
+ expected: expected/metrics-has-value.yml
\ No newline at end of file
diff --git a/test/e2e-v2/java-test-service/e2e-mock-sender/src/main/java/org/apache/skywalking/e2e/controller/MeterMetricSenderController.java b/test/e2e-v2/java-test-service/e2e-mock-sender/src/main/java/org/apache/skywalking/e2e/controller/MeterMetricSenderController.java
new file mode 100644
index 0000000..876ed23
--- /dev/null
+++ b/test/e2e-v2/java-test-service/e2e-mock-sender/src/main/java/org/apache/skywalking/e2e/controller/MeterMetricSenderController.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.e2e.controller;
+
+import io.grpc.ManagedChannel;
+import io.grpc.internal.DnsNameResolverProvider;
+import io.grpc.netty.NettyChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.CountDownLatch;
+import org.apache.skywalking.apm.network.common.v3.Commands;
+import org.apache.skywalking.apm.network.language.agent.v3.MeterData;
+import org.apache.skywalking.apm.network.language.agent.v3.MeterDataCollection;
+import org.apache.skywalking.apm.network.language.agent.v3.MeterReportServiceGrpc;
+import org.apache.skywalking.apm.network.language.agent.v3.MeterSingleValue;
+import org.apache.skywalking.e2e.E2EConfiguration;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+public class MeterMetricSenderController {
+ private static final int MAX_INBOUND_MESSAGE_SIZE = 1024 * 1024 * 50;
+ private final MeterReportServiceGrpc.MeterReportServiceStub grpcStub;
+
+ public MeterMetricSenderController(final E2EConfiguration configuration) {
+ final ManagedChannel channel = NettyChannelBuilder.forAddress(
+ configuration.getOapHost(), Integer.parseInt(configuration.getOapGrpcPort()))
+ .nameResolverFactory(new DnsNameResolverProvider())
+ .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
+ .usePlaintext()
+ .build();
+
+ grpcStub = MeterReportServiceGrpc.newStub(channel);
+ }
+
+ @PostMapping("/sendBatchMetrics")
+ public String sendBatchMetrics() throws Exception {
+ final MeterDataCollection.Builder builder =
+ MeterDataCollection.newBuilder()
+ .addMeterData(MeterData.newBuilder()
+ .setService("test-service")
+ .setTimestamp(System.currentTimeMillis())
+ .setServiceInstance("test-instance")
+ .setSingleValue(MeterSingleValue.newBuilder()
+ .setName("batch_test")
+ .setValue(100)
+ .build())
+ .build());
+
+ sendMetrics(builder.build());
+
+ return "Metrics send success!";
+ }
+
+ void sendMetrics(final MeterDataCollection metrics) throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ StreamObserver<MeterDataCollection> collect = grpcStub.collectBatch(new StreamObserver<Commands>() {
+ @Override
+ public void onNext(final Commands commands) {
+
+ }
+
+ @Override
+ public void onError(final Throwable throwable) {
+ throwable.printStackTrace();
+ latch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ latch.countDown();
+ }
+ });
+
+ collect.onNext(metrics);
+
+ collect.onCompleted();
+
+ latch.await();
+ }
+}
diff --git a/test/e2e-v2/java-test-service/e2e-protocol/src/main/proto b/test/e2e-v2/java-test-service/e2e-protocol/src/main/proto
index 21492e4..fbbe955 160000
--- a/test/e2e-v2/java-test-service/e2e-protocol/src/main/proto
+++ b/test/e2e-v2/java-test-service/e2e-protocol/src/main/proto
@@ -1 +1 @@
-Subproject commit 21492e496b797567d0e127f4510509baf73e10fd
+Subproject commit fbbe955545fd2c942ca59cd05720a084d010bb8a