You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/11/22 14:11:59 UTC

[skywalking] branch master updated: Add MeterReportService collectBatch method. (#8165)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 8436135  Add MeterReportService collectBatch method. (#8165)
8436135 is described below

commit 8436135dc919351a6d0ab42ff48c85e1b3104b53
Author: liqiangz <li...@gmail.com>
AuthorDate: Mon Nov 22 22:11:39 2021 +0800

    Add MeterReportService collectBatch method. (#8165)
---
 CHANGES.md                                         |  1 +
 apm-protocol/apm-network/src/main/proto            |  2 +-
 .../provider/handler/MeterServiceHandler.java      | 30 +++++++
 test/e2e-v2/cases/kafka/meter/e2e.yaml             |  2 +-
 .../meter/expected/metrics-has-value.yml}          | 30 +------
 .../kafka/meter/expected/service-instance.yml      | 40 +++++++++
 .../meter/expected/service.yml}                    | 31 ++-----
 .../cases/{ => kafka}/meter/meter-cases.yaml       |  2 +-
 .../meter/{docker-compose.yml => batch-meter.yaml} | 31 ++-----
 test/e2e-v2/cases/meter/docker-compose.yml         | 25 +++++-
 test/e2e-v2/cases/meter/meter-cases.yaml           |  5 ++
 .../controller/MeterMetricSenderController.java    | 97 ++++++++++++++++++++++
 .../java-test-service/e2e-protocol/src/main/proto  |  2 +-
 13 files changed, 215 insertions(+), 83 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 6283e24..0883217 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -55,6 +55,7 @@ Release Notes.
 * Fix concurrency bug in MAL `increase`-related calculation.
 * Fix a null pointer bug when building `SampleFamily`.
 * Fix the so11y latency of persistence execution latency not correct in ElasticSearch storage.
+* Add `MeterReportService` `collectBatch` method. 
 
 #### UI
 
diff --git a/apm-protocol/apm-network/src/main/proto b/apm-protocol/apm-network/src/main/proto
index 21492e4..fbbe955 160000
--- a/apm-protocol/apm-network/src/main/proto
+++ b/apm-protocol/apm-network/src/main/proto
@@ -1 +1 @@
-Subproject commit 21492e496b797567d0e127f4510509baf73e10fd
+Subproject commit fbbe955545fd2c942ca59cd05720a084d010bb8a
diff --git a/oap-server/server-receiver-plugin/skywalking-meter-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/meter/provider/handler/MeterServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-meter-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/meter/provider/handler/MeterServiceHandler.java
index b2f0d2e..73cdd44 100644
--- a/oap-server/server-receiver-plugin/skywalking-meter-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/meter/provider/handler/MeterServiceHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-meter-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/meter/provider/handler/MeterServiceHandler.java
@@ -22,6 +22,7 @@ import io.grpc.stub.StreamObserver;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.network.common.v3.Commands;
 import org.apache.skywalking.apm.network.language.agent.v3.MeterData;
+import org.apache.skywalking.apm.network.language.agent.v3.MeterDataCollection;
 import org.apache.skywalking.apm.network.language.agent.v3.MeterReportServiceGrpc;
 import org.apache.skywalking.oap.server.analyzer.provider.meter.process.IMeterProcessService;
 import org.apache.skywalking.oap.server.analyzer.provider.meter.process.MeterProcessor;
@@ -87,4 +88,33 @@ public class MeterServiceHandler extends MeterReportServiceGrpc.MeterReportServi
             }
         };
     }
+
+    @Override
+    public StreamObserver<MeterDataCollection> collectBatch(StreamObserver<Commands> responseObserver) {
+        return new StreamObserver<MeterDataCollection>() {
+            @Override
+            public void onNext(MeterDataCollection meterDataCollection) {
+                final MeterProcessor processor = processService.createProcessor();
+                try (HistogramMetrics.Timer ignored = histogram.createTimer()) {
+                    meterDataCollection.getMeterDataList().forEach(processor::read);
+                    processor.process();
+                } catch (Exception e) {
+                    errorCounter.inc();
+                    log.error(e.getMessage(), e);
+                }
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                log.error(throwable.getMessage(), throwable);
+                responseObserver.onCompleted();
+            }
+
+            @Override
+            public void onCompleted() {
+                responseObserver.onNext(Commands.newBuilder().build());
+                responseObserver.onCompleted();
+            }
+        };
+    }
 }
diff --git a/test/e2e-v2/cases/kafka/meter/e2e.yaml b/test/e2e-v2/cases/kafka/meter/e2e.yaml
index 4fff535..398897d 100644
--- a/test/e2e-v2/cases/kafka/meter/e2e.yaml
+++ b/test/e2e-v2/cases/kafka/meter/e2e.yaml
@@ -42,4 +42,4 @@ verify:
     interval: 3s
   cases:
     - includes:
-        - ../../meter/meter-cases.yaml
+        - meter-cases.yaml
diff --git a/test/e2e-v2/cases/meter/docker-compose.yml b/test/e2e-v2/cases/kafka/meter/expected/metrics-has-value.yml
similarity index 62%
copy from test/e2e-v2/cases/meter/docker-compose.yml
copy to test/e2e-v2/cases/kafka/meter/expected/metrics-has-value.yml
index 7ab0404..5359e6d 100644
--- a/test/e2e-v2/cases/meter/docker-compose.yml
+++ b/test/e2e-v2/cases/kafka/meter/expected/metrics-has-value.yml
@@ -13,29 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-version: '2.1'
-
-services:
-  oap:
-    environment:
-      SW_METER_ANALYZER_ACTIVE_FILES: spring-sleuth
-    extends:
-      file: ../../script/docker-compose/base-compose.yml
-      service: oap
-    ports:
-      - 12800
-
-  provider:
-    extends:
-      file: ../../script/docker-compose/base-compose.yml
-      service: provider
-    environment:
-      SW_METER_REPORT_INTERVAL: 5
-    depends_on:
-      oap:
-        condition: service_healthy
-    ports:
-      - 9090
-
-networks:
-  e2e:
+{{- contains . }}
+- key: {{ notEmpty .key }}
+  value: {{ ge .value 1 }}
+{{- end }}
diff --git a/test/e2e-v2/cases/kafka/meter/expected/service-instance.yml b/test/e2e-v2/cases/kafka/meter/expected/service-instance.yml
new file mode 100644
index 0000000..e525205
--- /dev/null
+++ b/test/e2e-v2/cases/kafka/meter/expected/service-instance.yml
@@ -0,0 +1,40 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+{{- contains . }}
+- id: {{ b64enc "e2e-service-provider" }}.1_{{ b64enc "provider1" }}
+  name: provider1
+  attributes:
+  {{- contains .attributes }}
+  - name: OS Name
+    value: Linux
+  - name: hostname
+    value: {{ notEmpty .value }}
+  - name: Process No.
+    value: "1"
+  - name: Start Time
+    value: {{ notEmpty .value }}
+  - name: JVM Arguments
+    value: '{{ notEmpty .value }}'
+  - name: Jar Dependencies
+    value: '{{ notEmpty .value }}'
+  - name: ipv4s
+    value: {{ notEmpty .value }}
+  {{- end}}
+  language: JAVA
+  instanceuuid: {{ b64enc "e2e-service-provider" }}.1_{{ b64enc "provider1" }}
+{{- end}}
diff --git a/test/e2e-v2/cases/meter/docker-compose.yml b/test/e2e-v2/cases/kafka/meter/expected/service.yml
similarity index 62%
copy from test/e2e-v2/cases/meter/docker-compose.yml
copy to test/e2e-v2/cases/kafka/meter/expected/service.yml
index 7ab0404..3a33348 100644
--- a/test/e2e-v2/cases/meter/docker-compose.yml
+++ b/test/e2e-v2/cases/kafka/meter/expected/service.yml
@@ -13,29 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-version: '2.1'
-
-services:
-  oap:
-    environment:
-      SW_METER_ANALYZER_ACTIVE_FILES: spring-sleuth
-    extends:
-      file: ../../script/docker-compose/base-compose.yml
-      service: oap
-    ports:
-      - 12800
-
-  provider:
-    extends:
-      file: ../../script/docker-compose/base-compose.yml
-      service: provider
-    environment:
-      SW_METER_REPORT_INTERVAL: 5
-    depends_on:
-      oap:
-        condition: service_healthy
-    ports:
-      - 9090
-
-networks:
-  e2e:
+{{- contains . }}
+- id: {{ b64enc "e2e-service-provider" }}.1
+  name: e2e-service-provider
+  group: ""
+{{- end }}
diff --git a/test/e2e-v2/cases/meter/meter-cases.yaml b/test/e2e-v2/cases/kafka/meter/meter-cases.yaml
similarity index 97%
copy from test/e2e-v2/cases/meter/meter-cases.yaml
copy to test/e2e-v2/cases/kafka/meter/meter-cases.yaml
index d1eeb7c..ce40b82 100644
--- a/test/e2e-v2/cases/meter/meter-cases.yaml
+++ b/test/e2e-v2/cases/kafka/meter/meter-cases.yaml
@@ -26,4 +26,4 @@
     - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_jvm_threads_live --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
       expected: expected/metrics-has-value.yml
     - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_process_files_max --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
-      expected: expected/metrics-has-value.yml
+      expected: expected/metrics-has-value.yml
\ No newline at end of file
diff --git a/test/e2e-v2/cases/meter/docker-compose.yml b/test/e2e-v2/cases/meter/batch-meter.yaml
similarity index 62%
copy from test/e2e-v2/cases/meter/docker-compose.yml
copy to test/e2e-v2/cases/meter/batch-meter.yaml
index 7ab0404..625d23a 100644
--- a/test/e2e-v2/cases/meter/docker-compose.yml
+++ b/test/e2e-v2/cases/meter/batch-meter.yaml
@@ -13,29 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-version: '2.1'
-
-services:
-  oap:
-    environment:
-      SW_METER_ANALYZER_ACTIVE_FILES: spring-sleuth
-    extends:
-      file: ../../script/docker-compose/base-compose.yml
-      service: oap
-    ports:
-      - 12800
-
-  provider:
-    extends:
-      file: ../../script/docker-compose/base-compose.yml
-      service: provider
-    environment:
-      SW_METER_REPORT_INTERVAL: 5
-    depends_on:
-      oap:
-        condition: service_healthy
-    ports:
-      - 9090
-
-networks:
-  e2e:
+expSuffix: instance(['service'], ['instance'])
+metricPrefix: batch
+metricsRules:
+  - name: test
+    exp: batch_test
diff --git a/test/e2e-v2/cases/meter/docker-compose.yml b/test/e2e-v2/cases/meter/docker-compose.yml
index 7ab0404..7590a7d 100644
--- a/test/e2e-v2/cases/meter/docker-compose.yml
+++ b/test/e2e-v2/cases/meter/docker-compose.yml
@@ -18,10 +18,12 @@ version: '2.1'
 services:
   oap:
     environment:
-      SW_METER_ANALYZER_ACTIVE_FILES: spring-sleuth
+      SW_METER_ANALYZER_ACTIVE_FILES: spring-sleuth,batch-meter
     extends:
       file: ../../script/docker-compose/base-compose.yml
       service: oap
+    volumes:
+      - ./batch-meter.yaml:/skywalking/config/meter-analyzer-config/batch-meter.yaml
     ports:
       - 12800
 
@@ -37,5 +39,26 @@ services:
     ports:
       - 9090
 
+  sender:
+    image: "adoptopenjdk/openjdk8:alpine-jre"
+    volumes:
+      - ./../../java-test-service/e2e-mock-sender/target/e2e-mock-sender-2.0.0.jar:/e2e-mock-sender-2.0.0.jar
+    command: [ "java", "-jar", "/e2e-mock-sender-2.0.0.jar" ]
+    environment:
+      OAP_HOST: oap
+      OAP_GRPC_PORT: 11800
+    networks:
+      - e2e
+    ports:
+      - 9093
+    healthcheck:
+      test: ["CMD", "sh", "-c", "nc -nz 127.0.0.1 9093"]
+      interval: 5s
+      timeout: 60s
+      retries: 120
+    depends_on:
+      oap:
+        condition: service_healthy
+
 networks:
   e2e:
diff --git a/test/e2e-v2/cases/meter/meter-cases.yaml b/test/e2e-v2/cases/meter/meter-cases.yaml
index d1eeb7c..e3bae75 100644
--- a/test/e2e-v2/cases/meter/meter-cases.yaml
+++ b/test/e2e-v2/cases/meter/meter-cases.yaml
@@ -27,3 +27,8 @@
       expected: expected/metrics-has-value.yml
     - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_process_files_max --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
       expected: expected/metrics-has-value.yml
+    - query: |
+        curl -s -XPOST http://${sender_host}:${sender_9093}/sendBatchMetrics > /dev/null;
+        sleep 10;
+        swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=batch_test --instance-name=test-instance --service-name=test-service |yq e 'to_entries' -
+      expected: expected/metrics-has-value.yml
\ No newline at end of file
diff --git a/test/e2e-v2/java-test-service/e2e-mock-sender/src/main/java/org/apache/skywalking/e2e/controller/MeterMetricSenderController.java b/test/e2e-v2/java-test-service/e2e-mock-sender/src/main/java/org/apache/skywalking/e2e/controller/MeterMetricSenderController.java
new file mode 100644
index 0000000..876ed23
--- /dev/null
+++ b/test/e2e-v2/java-test-service/e2e-mock-sender/src/main/java/org/apache/skywalking/e2e/controller/MeterMetricSenderController.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.e2e.controller;
+
+import io.grpc.ManagedChannel;
+import io.grpc.internal.DnsNameResolverProvider;
+import io.grpc.netty.NettyChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.CountDownLatch;
+import org.apache.skywalking.apm.network.common.v3.Commands;
+import org.apache.skywalking.apm.network.language.agent.v3.MeterData;
+import org.apache.skywalking.apm.network.language.agent.v3.MeterDataCollection;
+import org.apache.skywalking.apm.network.language.agent.v3.MeterReportServiceGrpc;
+import org.apache.skywalking.apm.network.language.agent.v3.MeterSingleValue;
+import org.apache.skywalking.e2e.E2EConfiguration;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+public class MeterMetricSenderController {
+    private static final int MAX_INBOUND_MESSAGE_SIZE = 1024 * 1024 * 50;
+    private final MeterReportServiceGrpc.MeterReportServiceStub grpcStub;
+
+    public MeterMetricSenderController(final E2EConfiguration configuration) {
+        final ManagedChannel channel = NettyChannelBuilder.forAddress(
+            configuration.getOapHost(), Integer.parseInt(configuration.getOapGrpcPort()))
+                                                          .nameResolverFactory(new DnsNameResolverProvider())
+                                                          .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
+                                                          .usePlaintext()
+                                                          .build();
+
+        grpcStub = MeterReportServiceGrpc.newStub(channel);
+    }
+
+    @PostMapping("/sendBatchMetrics")
+    public String sendBatchMetrics() throws Exception {
+        final MeterDataCollection.Builder builder =
+            MeterDataCollection.newBuilder()
+                               .addMeterData(MeterData.newBuilder()
+                                                      .setService("test-service")
+                                                      .setTimestamp(System.currentTimeMillis())
+                                                      .setServiceInstance("test-instance")
+                                                      .setSingleValue(MeterSingleValue.newBuilder()
+                                                                                      .setName("batch_test")
+                                                                                      .setValue(100)
+                                                                                      .build())
+                                                      .build());
+
+        sendMetrics(builder.build());
+
+        return "Metrics send success!";
+    }
+
+    void sendMetrics(final MeterDataCollection metrics) throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        StreamObserver<MeterDataCollection> collect = grpcStub.collectBatch(new StreamObserver<Commands>() {
+            @Override
+            public void onNext(final Commands commands) {
+
+            }
+
+            @Override
+            public void onError(final Throwable throwable) {
+                throwable.printStackTrace();
+                latch.countDown();
+            }
+
+            @Override
+            public void onCompleted() {
+                latch.countDown();
+            }
+        });
+
+        collect.onNext(metrics);
+
+        collect.onCompleted();
+
+        latch.await();
+    }
+}
diff --git a/test/e2e-v2/java-test-service/e2e-protocol/src/main/proto b/test/e2e-v2/java-test-service/e2e-protocol/src/main/proto
index 21492e4..fbbe955 160000
--- a/test/e2e-v2/java-test-service/e2e-protocol/src/main/proto
+++ b/test/e2e-v2/java-test-service/e2e-protocol/src/main/proto
@@ -1 +1 @@
-Subproject commit 21492e496b797567d0e127f4510509baf73e10fd
+Subproject commit fbbe955545fd2c942ca59cd05720a084d010bb8a