You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2018/11/29 07:34:51 UTC

[incubator-skywalking] 01/01: Support sampling trace at server side and keep metric right.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch backend-sampling
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git

commit 790d6ff16b7a44c43f62b8275e84b57527d3eeaa
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Thu Nov 29 15:34:39 2018 +0800

    Support sampling trace at server side and keep metric right.
---
 docker/config/application.yml                      |  1 +
 .../trace/provider/TraceModuleProvider.java        | 19 ++++++--
 .../trace/provider/TraceServiceModuleConfig.java   |  5 ++
 .../listener/segment/SegmentSpanListener.java      | 54 +++++++++++++++++---
 .../listener/segment/TraceSegmentSampler.java      | 57 ++++++++++++++++++++++
 .../listener/segment/TraceSegmentSamplerTest.java  | 47 ++++++++++++++++++
 .../src/main/assembly/application.yml              |  1 +
 .../src/main/resources/application.yml             |  5 +-
 8 files changed, 175 insertions(+), 14 deletions(-)

diff --git a/docker/config/application.yml b/docker/config/application.yml
index 9966bcd..f1592b7 100644
--- a/docker/config/application.yml
+++ b/docker/config/application.yml
@@ -68,6 +68,7 @@ receiver-trace:
     bufferOffsetMaxFileSize: ${SW_RECEIVER_BUFFER_OFFSET_MAX_FILE_SIZE:100} # Unit is MB
     bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB
     bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
+    sampleRate: {$SW_TRACE_SAMPLE_RATE:1000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
 receiver-jvm:
   default:
 service-mesh:
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
index 480998b..c30daac 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
@@ -20,13 +20,22 @@ package org.apache.skywalking.oap.server.receiver.trace.provider;
 
 import java.io.IOException;
 import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.server.*;
-import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
+import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.library.module.ModuleProvider;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
 import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
 import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v5.grpc.TraceSegmentServiceHandler;
 import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v5.rest.TraceSegmentServletHandler;
 import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v6.grpc.TraceSegmentReportServiceHandler;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.*;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParse;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParseV2;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParserListenerManager;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParserServiceImpl;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.endpoint.MultiScopesSpanListener;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.segment.SegmentSpanListener;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service.ServiceMappingSpanListener;
@@ -61,14 +70,14 @@ public class TraceModuleProvider extends ModuleProvider {
         SegmentParserListenerManager listenerManager = new SegmentParserListenerManager();
         listenerManager.add(new MultiScopesSpanListener.Factory());
         listenerManager.add(new ServiceMappingSpanListener.Factory());
-        listenerManager.add(new SegmentSpanListener.Factory());
+        listenerManager.add(new SegmentSpanListener.Factory(moduleConfig.getSampleRate()));
 
         segmentProducer = new SegmentParse.Producer(getManager(), listenerManager);
 
         listenerManager = new SegmentParserListenerManager();
         listenerManager.add(new MultiScopesSpanListener.Factory());
         listenerManager.add(new ServiceMappingSpanListener.Factory());
-        listenerManager.add(new SegmentSpanListener.Factory());
+        listenerManager.add(new SegmentSpanListener.Factory(moduleConfig.getSampleRate()));
 
         segmentProducerV2 = new SegmentParseV2.Producer(getManager(), listenerManager);
 
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
index 578cdd0..5a36953 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
@@ -29,4 +29,9 @@ public class TraceServiceModuleConfig extends ModuleConfig {
     @Setter @Getter private int bufferOffsetMaxFileSize;
     @Setter @Getter private int bufferDataMaxFileSize;
     @Setter @Getter private boolean bufferFileCleanWhenRestart;
+    /**
+     * The sample rate precision is 1/10000.
+     * 10000 means 100% sample in default.
+     */
+    @Setter @Getter private int sampleRate = 10000;
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
index f7e8111..9add272 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
@@ -21,12 +21,20 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener
 import org.apache.skywalking.apm.network.language.agent.UniqueId;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
-import org.apache.skywalking.oap.server.core.source.*;
+import org.apache.skywalking.oap.server.core.source.Segment;
+import org.apache.skywalking.oap.server.core.source.SourceReceiver;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.apache.skywalking.oap.server.library.util.*;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
-import org.slf4j.*;
+import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.EntrySpanListener;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.FirstSpanListener;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.GlobalTraceIdsListener;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListener;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author peng-yongsheng
@@ -36,12 +44,15 @@ public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener
     private static final Logger logger = LoggerFactory.getLogger(SegmentSpanListener.class);
 
     private final SourceReceiver sourceReceiver;
+    private final TraceSegmentSampler sampler;
     private final Segment segment = new Segment();
     private final EndpointInventoryCache serviceNameCacheService;
+    private SAMPLE_STATUS sampleStatus = SAMPLE_STATUS.UNKNOWN;
     private int entryEndpointId = 0;
     private int firstEndpointId = 0;
 
-    private SegmentSpanListener(ModuleManager moduleManager) {
+    private SegmentSpanListener(ModuleManager moduleManager, TraceSegmentSampler sampler) {
+        this.sampler = sampler;
         this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
         this.serviceNameCacheService = moduleManager.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class);
     }
@@ -52,6 +63,10 @@ public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener
 
     @Override
     public void parseFirst(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
+        if (sampleStatus.equals(SAMPLE_STATUS.IGNORE)) {
+            return;
+        }
+
         long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(segmentCoreInfo.getStartTime());
 
         segment.setSegmentId(segmentCoreInfo.getSegmentId());
@@ -75,6 +90,18 @@ public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener
     }
 
     @Override public void parseGlobalTraceId(UniqueId uniqueId, SegmentCoreInfo segmentCoreInfo) {
+        if (sampleStatus.equals(SAMPLE_STATUS.UNKNOWN) || sampleStatus.equals(SAMPLE_STATUS.IGNORE)) {
+            if (sampler.shouldSample(uniqueId)) {
+                sampleStatus = SAMPLE_STATUS.SAMPLED;
+            } else {
+                sampleStatus = SAMPLE_STATUS.IGNORE;
+            }
+        }
+
+        if (sampleStatus.equals(SAMPLE_STATUS.IGNORE)) {
+            return;
+        }
+
         StringBuilder traceIdBuilder = new StringBuilder();
         for (int i = 0; i < uniqueId.getIdPartsList().size(); i++) {
             if (i == 0) {
@@ -91,6 +118,10 @@ public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener
             logger.debug("segment listener build, segment id: {}", segment.getSegmentId());
         }
 
+        if (sampleStatus.equals(SAMPLE_STATUS.IGNORE)) {
+            return;
+        }
+
         if (entryEndpointId == 0) {
             segment.setEndpointId(firstEndpointId);
             segment.setEndpointName(serviceNameCacheService.get(firstEndpointId).getName());
@@ -102,10 +133,19 @@ public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener
         sourceReceiver.receive(segment);
     }
 
+    private enum SAMPLE_STATUS {
+        UNKNOWN, SAMPLED, IGNORE
+    }
+
     public static class Factory implements SpanListenerFactory {
+        private TraceSegmentSampler sampler;
+
+        public Factory(int segmentSamplingRate) {
+            this.sampler = new TraceSegmentSampler(segmentSamplingRate);
+        }
 
         @Override public SpanListener create(ModuleManager moduleManager) {
-            return new SegmentSpanListener(moduleManager);
+            return new SegmentSpanListener(moduleManager, sampler);
         }
     }
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/TraceSegmentSampler.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/TraceSegmentSampler.java
new file mode 100644
index 0000000..bfafc28
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/TraceSegmentSampler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.segment;
+
+import java.util.List;
+import org.apache.skywalking.apm.network.language.agent.UniqueId;
+
+/**
+ * The sampler makes the sampling mechanism works at backend side.
+ *
+ * The sample check mechanism is very easy and effective when backend run in cluster mode. Based on traceId, which is
+ * constituted by 3 Long, and according to GlobalIdGenerator, the last four number of the last Long is a sequence, so it
+ * is suitable for sampling.
+ *
+ * Set rate = x
+ *
+ * Then divide last Long in TraceId by 10000,  y = x % 10000
+ *
+ * Sample result: [0,y) sampled, (y,~) ignored
+ *
+ * @author wusheng
+ */
+public class TraceSegmentSampler {
+    private int sampleRate = 10000;
+
+    public TraceSegmentSampler(int sampleRate) {
+        this.sampleRate = sampleRate;
+    }
+
+    public boolean shouldSample(UniqueId uniqueId) {
+        List<Long> idPartsList = uniqueId.getIdPartsList();
+        if (idPartsList.size() == 3) {
+            Long lastLong = idPartsList.get(2);
+            long sampleValue = lastLong % 10000;
+            if (sampleValue < sampleRate) {
+                return true;
+            }
+        }
+        return false;
+    }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/TraceSegmentSamplerTest.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/TraceSegmentSamplerTest.java
new file mode 100644
index 0000000..d55cd71
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/TraceSegmentSamplerTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.segment;
+
+import org.apache.skywalking.apm.network.language.agent.UniqueId;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TraceSegmentSamplerTest {
+    @Test
+    public void sample() {
+        TraceSegmentSampler sampler = new TraceSegmentSampler(100);
+        Assert.assertTrue(sampler.shouldSample(UniqueId.newBuilder().addIdParts(123).addIdParts(2).addIdParts(0).build()));
+        Assert.assertTrue(sampler.shouldSample(UniqueId.newBuilder().addIdParts(123).addIdParts(2).addIdParts(50).build()));
+        Assert.assertTrue(sampler.shouldSample(UniqueId.newBuilder().addIdParts(123).addIdParts(2).addIdParts(99).build()));
+        Assert.assertFalse(sampler.shouldSample(UniqueId.newBuilder().addIdParts(123).addIdParts(2).addIdParts(100).build()));
+        Assert.assertFalse(sampler.shouldSample(UniqueId.newBuilder().addIdParts(123).addIdParts(2).addIdParts(101).build()));
+        Assert.assertTrue(sampler.shouldSample(UniqueId.newBuilder().addIdParts(123).addIdParts(2).addIdParts(10000).build()));
+        Assert.assertTrue(sampler.shouldSample(UniqueId.newBuilder().addIdParts(123).addIdParts(2).addIdParts(10001).build()));
+        Assert.assertFalse(sampler.shouldSample(UniqueId.newBuilder().addIdParts(123).addIdParts(2).addIdParts(1019903).build()));
+    }
+
+    @Test
+    public void IllegalTraceIDSample() {
+        TraceSegmentSampler sampler = new TraceSegmentSampler(100);
+        Assert.assertFalse(sampler.shouldSample(UniqueId.newBuilder().addIdParts(123).addIdParts(2).build()));
+
+        Assert.assertFalse(
+            sampler.shouldSample(UniqueId.newBuilder().addIdParts(123).addIdParts(2).addIdParts(23).addIdParts(3).build()));
+    }
+}
diff --git a/oap-server/server-starter/src/main/assembly/application.yml b/oap-server/server-starter/src/main/assembly/application.yml
index f97c9e8..a6fb9f3 100644
--- a/oap-server/server-starter/src/main/assembly/application.yml
+++ b/oap-server/server-starter/src/main/assembly/application.yml
@@ -67,6 +67,7 @@ receiver-trace:
     bufferOffsetMaxFileSize: ${SW_RECEIVER_BUFFER_OFFSET_MAX_FILE_SIZE:100} # Unit is MB
     bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB
     bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
+    sampleRate: {$SW_TRACE_SAMPLE_RATE:1000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
 receiver-jvm:
   default:
 service-mesh:
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 663f833..1588b49 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -16,8 +16,8 @@
 
 cluster:
   standalone:
-   # Please check your ZooKeeper is 3.5+, However, it is also compatible with ZooKeeper 3.4.x. Replace the ZooKeeper 3.5+
-   # library the oap-libs folder with your ZooKeeper 3.4.x library.
+  # Please check your ZooKeeper is 3.5+, However, it is also compatible with ZooKeeper 3.4.x. Replace the ZooKeeper 3.5+
+  # library the oap-libs folder with your ZooKeeper 3.4.x library.
 #  zookeeper:
 #    hostPort: ${SW_CLUSTER_ZK_HOST_PORT:localhost:2181}
 #    #Retry Policy
@@ -68,6 +68,7 @@ receiver-trace:
     bufferOffsetMaxFileSize: ${SW_RECEIVER_BUFFER_OFFSET_MAX_FILE_SIZE:100} # Unit is MB
     bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB
     bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
+    sampleRate: {$SW_TRACE_SAMPLE_RATE:1000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
 receiver-jvm:
   default:
 service-mesh: