You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zipkin.apache.org by ad...@apache.org on 2019/05/31 16:07:37 UTC

[incubator-zipkin-brave] branch secondary-sampling created (now dcd4a76)

This is an automated email from the ASF dual-hosted git repository.

adriancole pushed a change to branch secondary-sampling
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin-brave.git.


      at dcd4a76  WIP: refine secondary sampling feature test to be realistic

This branch includes the following new commits:

     new dcd4a76  WIP: refine secondary sampling feature test to be realistic

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-zipkin-brave] 01/01: WIP: refine secondary sampling feature test to be realistic

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adriancole pushed a commit to branch secondary-sampling
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin-brave.git

commit dcd4a76f53381fd0968a5cd3d423591d38feb5b8
Author: Adrian Cole <ac...@pivotal.io>
AuthorDate: Sat Jun 1 00:06:17 2019 +0800

    WIP: refine secondary sampling feature test to be realistic
    
    @narayuna is one of the first folks to look at this code carefully. I
    think the intent wasn't clear. I'm changing the example to look more
    realistic and also include dynamic configuration hints.
---
 .../features/propagation/SecondarySampling.java    | 145 +++++++++++-------
 .../propagation/SecondarySamplingTest.java         | 163 +++++++++++++--------
 2 files changed, 190 insertions(+), 118 deletions(-)

diff --git a/brave/src/test/java/brave/features/propagation/SecondarySampling.java b/brave/src/test/java/brave/features/propagation/SecondarySampling.java
index 0873cf0..09dc81d 100644
--- a/brave/src/test/java/brave/features/propagation/SecondarySampling.java
+++ b/brave/src/test/java/brave/features/propagation/SecondarySampling.java
@@ -16,46 +16,80 @@
  */
 package brave.features.propagation;
 
+import brave.Tracing;
 import brave.handler.MutableSpan;
+import brave.propagation.B3SinglePropagation;
 import brave.propagation.Propagation.Getter;
 import brave.propagation.Propagation.KeyFactory;
 import brave.propagation.Propagation.Setter;
 import brave.propagation.TraceContext;
 import brave.propagation.TraceContextOrSamplingFlags;
-import brave.sampler.Sampler;
-import com.google.common.base.Splitter;
+import brave.sampler.RateLimitingSampler;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 public final class SecondarySampling {
-  public static final class FinishedSpanHandler extends brave.handler.FinishedSpanHandler {
-    final Map<String, brave.handler.FinishedSpanHandler> configuredHandlers;
 
-    public FinishedSpanHandler(Map<String, brave.handler.FinishedSpanHandler> configuredHandlers) {
-      this.configuredHandlers = configuredHandlers;
+  public static SecondarySampling create() {
+    return new SecondarySampling();
+  }
+
+  Propagation.Factory propagationFactory = B3SinglePropagation.FACTORY;
+  final Map<String, brave.handler.FinishedSpanHandler> systemToHandlers = new ConcurrentHashMap<>();
+
+  SecondarySampling() {
+  }
+
+  public SecondarySampling putSystem(String system, brave.handler.FinishedSpanHandler handler) {
+    systemToHandlers.put(system, handler);
+    return this;
+  }
+
+  public SecondarySampling removeSystem(String system) {
+    systemToHandlers.remove(system);
+    return this;
+  }
+
+  public Tracing build(Tracing.Builder builder) {
+    return builder
+      .addFinishedSpanHandler(new FinishedSpanHandler(systemToHandlers))
+      // BRAVE6: we need a better collaboration model for propagation than wrapping, as it makes
+      // configuration complex.
+      .propagationFactory(new PropagationFactory(propagationFactory, systemToHandlers.keySet()))
+      .build();
+  }
+
+  static final class FinishedSpanHandler extends brave.handler.FinishedSpanHandler {
+    final Map<String, brave.handler.FinishedSpanHandler> systemToHandlers;
+
+    FinishedSpanHandler(Map<String, brave.handler.FinishedSpanHandler> systemToHandlers) {
+      this.systemToHandlers = systemToHandlers;
     }
 
     @Override public boolean handle(TraceContext context, MutableSpan span) {
       Extra extra = context.findExtra(Extra.class);
       if (extra == null) return true;
-      for (String state : extra.states.keySet()) {
-        brave.handler.FinishedSpanHandler handler = configuredHandlers.get(state);
-        if (handler != null) handler.handle(context, span);
+      for (String system : extra.systems.keySet()) {
+        brave.handler.FinishedSpanHandler handler = systemToHandlers.get(system);
+        if (handler != null && "1".equals(extra.systems.get(system).get("sampled"))) {
+          handler.handle(context, span);
+        }
       }
       return true;
     }
   }
 
-  public static final class PropagationFactory extends Propagation.Factory {
+  static final class PropagationFactory extends Propagation.Factory {
     final Propagation.Factory delegate;
-    final Set<String> configuredStates;
+    final Set<String> configuredSystems;
 
-    PropagationFactory(Propagation.Factory delegate, Set<String> configuredStates) {
+    PropagationFactory(Propagation.Factory delegate, Set<String> configuredSystems) {
       this.delegate = delegate;
-      this.configuredStates = configuredStates;
+      this.configuredSystems = configuredSystems;
     }
 
     @Override public boolean supportsJoin() {
@@ -63,38 +97,37 @@ public final class SecondarySampling {
     }
 
     @Override public <K> Propagation<K> create(KeyFactory<K> keyFactory) {
-      return new Propagation<>(delegate.create(keyFactory), keyFactory, configuredStates);
+      return new Propagation<>(delegate.create(keyFactory), keyFactory, configuredSystems);
     }
   }
 
   static final class Extra {
-    Map<String, Map<String, String>> states = new LinkedHashMap<>();
+    final Map<String, Map<String, String>> systems = new LinkedHashMap<>();
 
     @Override public String toString() {
-      return states.entrySet()
-          .stream()
-          .map(s -> s.getKey() + ":" + Extra.toString(s.getValue()))
-          .collect(Collectors.joining(";"));
+      return systems.entrySet()
+        .stream()
+        .map(s -> s.getKey() + ":" + Extra.toString(s.getValue()))
+        .collect(Collectors.joining(";"));
     }
 
     static String toString(Map<String, String> s) {
       return s.entrySet()
-          .stream()
-          .map(e -> e.getKey() + "=" + e.getValue())
-          .collect(Collectors.joining(","));
+        .stream()
+        .map(e -> e.getKey() + "=" + e.getValue())
+        .collect(Collectors.joining(","));
     }
   }
 
   static class Propagation<K> implements brave.propagation.Propagation<K> {
-
     final brave.propagation.Propagation<K> delegate;
-    final Set<String> configuredStates;
+    final Set<String> configuredSystems;
     final K samplingKey;
 
     Propagation(brave.propagation.Propagation<K> delegate, KeyFactory<K> keyFactory,
-        Set<String> configuredStates) {
+      Set<String> configuredSystems) {
       this.delegate = delegate;
-      this.configuredStates = configuredStates;
+      this.configuredSystems = configuredSystems;
       this.samplingKey = keyFactory.create("sampling");
     }
 
@@ -126,7 +159,7 @@ public final class SecondarySampling {
     @Override public void inject(TraceContext traceContext, C carrier) {
       delegate.inject(traceContext, carrier);
       Extra sampled = traceContext.findExtra(Extra.class);
-      if (sampled == null || sampled.states.isEmpty()) return;
+      if (sampled == null || sampled.systems.isEmpty()) return;
       setter.put(carrier, samplingKey, sampled.toString());
     }
   }
@@ -134,13 +167,13 @@ public final class SecondarySampling {
   static final class Extractor<C, K> implements TraceContext.Extractor<C> {
     final TraceContext.Extractor<C> delegate;
     final Getter<C, K> getter;
-    final Set<String> configuredStates;
+    final Set<String> configuredSystems;
     final K samplingKey;
 
     Extractor(Propagation<K> propagation, Getter<C, K> getter) {
       this.delegate = propagation.delegate.extractor(getter);
       this.getter = getter;
-      this.configuredStates = propagation.configuredStates;
+      this.configuredSystems = propagation.configuredSystems;
       this.samplingKey = propagation.samplingKey;
     }
 
@@ -152,44 +185,48 @@ public final class SecondarySampling {
       TraceContextOrSamplingFlags.Builder builder = result.toBuilder().addExtra(extra);
       if (maybeValue == null) return builder.build();
 
-      for (String entry : Splitter.on(";").split(maybeValue)) {
-        String[] nameValue = entry.split(":");
+      for (String entry : maybeValue.split(";", 100)) {
+        String[] nameValue = entry.split(":", 2);
         String name = nameValue[0];
-        Map<String, String> state = Splitter.on(",").withKeyValueSeparator("=").split(nameValue[1]);
-
-        if (configuredStates.contains(name)) {
-          state = new LinkedHashMap<>(state); // make mutable
-          if (update(state)) {
-            if (state.get("sampled").equals("1")) {
-              builder.sampledLocal(); // this allows us to override the default decision
-            }
-            extra.states.put(name, state);
-          }
-        } else {
-          extra.states.put(name, state);
+
+        Map<String, String> systemToState = parseSystem(nameValue[1]);
+        if (configuredSystems.contains(name) && updateStateAndSample(systemToState)) {
+          builder.sampledLocal(); // this means data will be recorded
         }
+        if (!systemToState.isEmpty()) extra.systems.put(name, systemToState);
       }
 
       return builder.build();
     }
   }
 
-  static boolean update(Map<String, String> state) {
-    // if there's a rate, convert it to a sampling decision
-    String rate = state.remove("rate");
-    if (rate != null) {
+  static Map<String, String> parseSystem(String system) {
+    Map<String, String> result = new LinkedHashMap<>();
+    for (String entry : system.split(",", 100)) {
+      String[] nameValue = entry.split("=", 2);
+      result.put(nameValue[0], nameValue[1]);
+    }
+    return result;
+  }
+
+  static boolean updateStateAndSample(Map<String, String> state) {
+    // if there's a tps, convert it to a sampling decision
+    String tps = state.remove("tps");
+    if (tps != null) {
       // in real life the sampler would be cached
-      boolean decision = Sampler.create(Float.parseFloat(rate)).isSampled(1L);
+      boolean decision = RateLimitingSampler.create(Integer.parseInt(tps)).isSampled(1L);
       state.put("sampled", decision ? "1" : "0");
-    } else if (state.containsKey("ttl")) {
-      // decrement ttl if there is one
+      return decision;
+    }
+
+    if (state.containsKey("ttl")) { // decrement ttl if there is one
       String ttl = state.remove("ttl");
-      if (ttl != null && !ttl.equals("1")) {
-        state.put("ttl", Integer.toString(Integer.parseInt(ttl) - 1));
+      if (ttl.equals("1")) {
+        state.remove("sampled");
       } else {
-        return false; // remove the out-dated decision
+        state.put("ttl", Integer.toString(Integer.parseInt(ttl) - 1));
       }
     }
-    return true;
+    return "1".equals(state.get("sampled"));
   }
 }
diff --git a/brave/src/test/java/brave/features/propagation/SecondarySamplingTest.java b/brave/src/test/java/brave/features/propagation/SecondarySamplingTest.java
index 0852e90..c9677b0 100644
--- a/brave/src/test/java/brave/features/propagation/SecondarySamplingTest.java
+++ b/brave/src/test/java/brave/features/propagation/SecondarySamplingTest.java
@@ -20,12 +20,11 @@ import brave.Span;
 import brave.Span.Kind;
 import brave.Tracer;
 import brave.Tracing;
+import brave.features.propagation.SecondarySampling.Extra;
 import brave.handler.FinishedSpanHandler;
 import brave.handler.MutableSpan;
-import brave.propagation.B3SinglePropagation;
 import brave.propagation.TraceContext;
 import brave.propagation.TraceContextOrSamplingFlags;
-import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -33,33 +32,33 @@ import java.util.Map;
 import org.junit.After;
 import org.junit.Test;
 
-import static brave.features.propagation.SecondarySampling.Extra;
 import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
 import static org.assertj.core.api.Assertions.assertThat;
 
 public class SecondarySamplingTest {
   List<zipkin2.Span> zipkin = new ArrayList<>();
-  List<MutableSpan> zeus = new ArrayList<>(), apollo = new ArrayList<>();
-  Map<String, FinishedSpanHandler> stateToFinishedSpanHandler = ImmutableMap.of(
-      "zeus", new FinishedSpanHandler() {
-        @Override public boolean handle(TraceContext context, MutableSpan span) {
-          return zeus.add(span);
-        }
-      },
-      "apollo", new FinishedSpanHandler() {
-        @Override public boolean handle(TraceContext context, MutableSpan span) {
-          return apollo.add(span);
-        }
+  List<MutableSpan> edge = new ArrayList<>(), links = new ArrayList<>(), triage =
+    new ArrayList<>();
+  FinishedSpanHandler triageHandler = new FinishedSpanHandler() {
+    @Override public boolean handle(TraceContext context, MutableSpan span) {
+      return triage.add(span);
+    }
+  };
+
+  SecondarySampling secondarySampling = SecondarySampling.create()
+    .putSystem("edge", new FinishedSpanHandler() {
+      @Override public boolean handle(TraceContext context, MutableSpan span) {
+        return edge.add(span);
       }
-  );
-
-  Tracing tracing = Tracing.newBuilder()
-      .addFinishedSpanHandler(new SecondarySampling.FinishedSpanHandler(stateToFinishedSpanHandler))
-      .propagationFactory(new SecondarySampling.PropagationFactory(
-          B3SinglePropagation.FACTORY, stateToFinishedSpanHandler.keySet()
-      ))
-      .spanReporter(zipkin::add)
-      .build();
+    })
+    .putSystem("links", new FinishedSpanHandler() {
+      @Override public boolean handle(TraceContext context, MutableSpan span) {
+        return links.add(span);
+      }
+    });
+  Tracing tracing = secondarySampling.build(Tracing.newBuilder().spanReporter(zipkin::add));
 
   TraceContext.Extractor<Map<String, String>> extractor = tracing.propagation().extractor(Map::get);
   TraceContext.Injector<Map<String, String>> injector = tracing.propagation().injector(Map::put);
@@ -70,106 +69,142 @@ public class SecondarySamplingTest {
     tracing.close();
   }
 
-  /** This shows when primary trace status is not sampled, we can send to handlers anyway. */
+  /**
+   * This shows when primary trace status is not sampled, we can send to handlers anyway.
+   *
+   * <p>At first, "triage" is not configured, so the tracer ignores it. Later, it is configured, so
+   * starts receiving traces.
+   */
   @Test public void integrationTest() {
     map.put("b3", "0");
-    map.put("sampling", "zeus:rate=1.0,ttl=3;apollo:sampled=1;wookie:rate=0.05");
+    map.put("sampling", "edge:tps=1,ttl=3;links:sampled=1;triage:tps=5");
 
     Tracer tracer = tracing.tracer();
     Span span1 = tracer.nextSpan(extractor.extract(map)).name("span1").kind(Kind.SERVER).start();
     Span span2 = tracer.newChild(span1.context()).kind(Kind.CLIENT).name("span2").start();
     injector.inject(span2.context(), map);
-    assertThat(map).containsEntry("sampling",
-        "zeus:ttl=3,sampled=1;apollo:sampled=1;wookie:rate=0.05");
+    assertThat(map).containsEntry("sampling", "edge:ttl=3,sampled=1;links:sampled=1;triage:tps=5");
 
     // hop 1
-    Span span3 = tracer.joinSpan(extractor.extract(map).context()).kind(Kind.SERVER).start();
+    Span span3 = tracer.nextSpan(extractor.extract(map)).kind(Kind.SERVER).start();
     Span span4 = tracer.newChild(span3.context()).kind(Kind.CLIENT).name("span3").start();
     injector.inject(span4.context(), map);
-    assertThat(map).containsEntry("sampling",
-        "zeus:sampled=1,ttl=2;apollo:sampled=1;wookie:rate=0.05");
+    assertThat(map).containsEntry("sampling", "edge:sampled=1,ttl=2;links:sampled=1;triage:tps=5");
 
     // hop 2
-    Span span5 = tracer.joinSpan(extractor.extract(map).context()).kind(Kind.SERVER).start();
+    Span span5 = tracer.nextSpan(extractor.extract(map)).kind(Kind.SERVER).start();
     Span span6 = tracer.newChild(span5.context()).kind(Kind.CLIENT).name("span4").start();
     injector.inject(span6.context(), map);
-    assertThat(map).containsEntry("sampling",
-        "zeus:sampled=1,ttl=1;apollo:sampled=1;wookie:rate=0.05");
+    assertThat(map).containsEntry("sampling", "edge:sampled=1,ttl=1;links:sampled=1;triage:tps=5");
 
     // hop 3
-    Span span7 = tracer.joinSpan(extractor.extract(map).context()).kind(Kind.SERVER).start();
+    Span span7 = tracer.nextSpan(extractor.extract(map)).kind(Kind.SERVER).start();
     Span span8 = tracer.newChild(span7.context()).kind(Kind.CLIENT).name("span5").start();
     injector.inject(span8.context(), map);
-    assertThat(map).containsEntry("sampling", "apollo:sampled=1;wookie:rate=0.05");
+    assertThat(map).containsEntry("sampling", "links:sampled=1;triage:tps=5");
 
-    // hop 4
-    Span span9 = tracer.joinSpan(extractor.extract(map).context()).kind(Kind.SERVER).start();
+    // dynamic configuration adds triage processing
+    secondarySampling.putSystem("triage", triageHandler);
+
+    // hop 4, triage is now sampled
+    Span span9 = tracer.nextSpan(extractor.extract(map)).kind(Kind.SERVER).start();
     Span span10 = tracer.newChild(span9.context()).kind(Kind.CLIENT).name("span6").start();
     injector.inject(span10.context(), map);
-    assertThat(map).containsEntry("sampling", "apollo:sampled=1;wookie:rate=0.05");
+    assertThat(map).containsEntry("sampling", "links:sampled=1;triage:sampled=1");
 
     asList(span1, span2, span3, span4, span5, span6, span7, span8, span9, span10)
-        .forEach(Span::finish);
+      .forEach(Span::finish);
 
     assertThat(zipkin).isEmpty();
-    assertThat(zeus).filteredOn(s -> s.kind() == Kind.SERVER).hasSize(4);
-    assertThat(apollo).filteredOn(s -> s.kind() == Kind.SERVER).hasSize(5);
+    assertThat(edge).filteredOn(s -> s.kind() == Kind.SERVER).hasSize(3);
+    assertThat(links).filteredOn(s -> s.kind() == Kind.SERVER).hasSize(5);
+    assertThat(triage).filteredOn(s -> s.kind() == Kind.SERVER).hasSize(1);
   }
 
   @Test public void extract_samplesLocalWhenConfigured() {
     map.put("b3", "0");
-    map.put("sampling", "apollo:sampled=0;wookie:rate=0.05");
+    map.put("sampling", "links:sampled=0;triage:tps=5");
 
     assertThat(extractor.extract(map).sampledLocal()).isFalse();
 
     map.put("b3", "0");
-    map.put("sampling", "apollo:sampled=0;wookie:sampled=1");
+    map.put("sampling", "links:sampled=0;triage:sampled=1");
 
     assertThat(extractor.extract(map).sampledLocal()).isFalse();
 
     map.put("b3", "0");
-    map.put("sampling", "apollo:sampled=1;wookie:rate=0.05");
+    map.put("sampling", "links:sampled=1;triage:tps=5");
+
+    assertThat(extractor.extract(map).sampledLocal()).isTrue();
+  }
+
+  /** This shows an example of dynamic configuration */
+  @Test public void dynamicConfiguration() {
+    // base case: links is configured, triage is not. triage is in the headers, though!
+    map.put("b3", "0");
+    map.put("sampling", "links:sampled=1;triage:tps=5");
 
     assertThat(extractor.extract(map).sampledLocal()).isTrue();
+
+    // dynamic configuration removes link processing
+    secondarySampling.removeSystem("links");
+    assertThat(extractor.extract(map).sampledLocal()).isFalse();
+
+    // dynamic configuration adds triage processing
+    secondarySampling.putSystem("triage", triageHandler);
+    assertThat(extractor.extract(map).sampledLocal()).isTrue();
+
+    tracing.tracer().nextSpan(extractor.extract(map)).start().finish();
+    assertThat(zipkin).isEmpty();
+    assertThat(edge).isEmpty();
+    assertThat(links).isEmpty(); // no longer configured
+    assertThat(triage).hasSize(1); // now configured
   }
 
-  @Test public void extract_convertsConfiguredRateToDecision() {
+  @Test public void extract_convertsConfiguredTpsToDecision() {
     map.put("b3", "0");
-    map.put("sampling", "zeus:rate=1.0,ttl=3;apollo:sampled=0;wookie:rate=0.05");
+    map.put("sampling", "edge:tps=1,ttl=3;links:sampled=0;triage:tps=5");
 
     TraceContextOrSamplingFlags extracted = extractor.extract(map);
     Extra extra = (Extra) extracted.extra().get(0);
-    assertThat(extra.states)
-        .containsEntry("zeus", ImmutableMap.of("sampled", "1", "ttl", "3"))
-        .containsEntry("apollo", ImmutableMap.of("sampled", "0"))
-        .containsEntry("wookie", ImmutableMap.of("rate", "0.05"));
+    assertThat(extra.systems)
+      .containsEntry("edge", twoEntryMap("sampled", "1", "ttl", "3"))
+      .containsEntry("links", singletonMap("sampled", "0"))
+      .containsEntry("triage", singletonMap("tps", "5")); // triage is not configured
   }
 
   @Test public void extract_decrementsTtlWhenConfigured() {
     map.put("b3", "0");
-    map.put("sampling", "zeus:sampled=1,ttl=3;apollo:sampled=0,ttl=1;wookie:rate=0.05");
+    map.put("sampling", "edge:sampled=1,ttl=3;links:sampled=0,ttl=1;triage:tps=5");
 
     TraceContextOrSamplingFlags extracted = extractor.extract(map);
     Extra extra = (Extra) extracted.extra().get(0);
-    assertThat(extra.states)
-        .containsEntry("zeus", ImmutableMap.of("sampled", "1", "ttl", "2"))
-        .doesNotContainKey("apollo")
-        .containsEntry("wookie", ImmutableMap.of("rate", "0.05"));
+    assertThat(extra.systems)
+      .containsEntry("edge", twoEntryMap("sampled", "1", "ttl", "2"))
+      .doesNotContainKey("links")
+      .containsEntry("triage", singletonMap("tps", "5"));
   }
 
-  @Test public void injectWritesAllStates() {
+  @Test public void injectWritesAllSystems() {
     Extra extra = new Extra();
-    extra.states.put("zeus", ImmutableMap.of("rate", "1.0", "ttl", "3"));
-    extra.states.put("apollo", ImmutableMap.of("sampled", "0"));
-    extra.states.put("wookie", ImmutableMap.of("rate", "0.05"));
+    extra.systems.put("edge", twoEntryMap("tps", "1", "ttl", "3"));
+    extra.systems.put("links", singletonMap("sampled", "0"));
+    extra.systems.put("triage", singletonMap("tps", "5"));
 
     injector.inject(TraceContext.newBuilder()
-        .traceId(1L).spanId(2L)
-        .sampled(false)
-        .extra(asList(extra))
-        .build(), map);
+      .traceId(1L).spanId(2L)
+      .sampled(false)
+      .extra(singletonList(extra))
+      .build(), map);
 
     assertThat(map)
-        .containsEntry("sampling", "zeus:rate=1.0,ttl=3;apollo:sampled=0;wookie:rate=0.05");
+      .containsEntry("sampling", "edge:tps=1,ttl=3;links:sampled=0;triage:tps=5");
+  }
+
+  static <K, V> Map<K, V> twoEntryMap(K key1, V value1, K key2, V value2) {
+    Map<K, V> result = new LinkedHashMap<>();
+    result.put(key1, value1);
+    result.put(key2, value2);
+    return result;
   }
 }