You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zipkin.apache.org by ad...@apache.org on 2019/06/03 00:46:35 UTC

[incubator-zipkin-brave] branch master updated: Refines secondary sampling feature test to be realistic (#916)

This is an automated email from the ASF dual-hosted git repository.

adriancole pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin-brave.git


The following commit(s) were added to refs/heads/master by this push:
     new ed77ea0  Refines secondary sampling feature test to be realistic (#916)
ed77ea0 is described below

commit ed77ea03cf0c9f3b365f232c73e8aa41ff770bff
Author: Adrian Cole <ad...@users.noreply.github.com>
AuthorDate: Mon Jun 3 08:46:31 2019 +0800

    Refines secondary sampling feature test to be realistic (#916)
    
    @narayuna is one of the first folks to look at this code carefully. I
    think the intent wasn't clear. I'm changing the example to look more
    realistic and also include dynamic configuration hints.
---
 .../features/propagation/SecondarySampling.java    | 145 +++++++++++-------
 .../propagation/SecondarySamplingTest.java         | 163 +++++++++++++--------
 2 files changed, 190 insertions(+), 118 deletions(-)

diff --git a/brave/src/test/java/brave/features/propagation/SecondarySampling.java b/brave/src/test/java/brave/features/propagation/SecondarySampling.java
index 0873cf0..09dc81d 100644
--- a/brave/src/test/java/brave/features/propagation/SecondarySampling.java
+++ b/brave/src/test/java/brave/features/propagation/SecondarySampling.java
@@ -16,46 +16,80 @@
  */
 package brave.features.propagation;
 
+import brave.Tracing;
 import brave.handler.MutableSpan;
+import brave.propagation.B3SinglePropagation;
 import brave.propagation.Propagation.Getter;
 import brave.propagation.Propagation.KeyFactory;
 import brave.propagation.Propagation.Setter;
 import brave.propagation.TraceContext;
 import brave.propagation.TraceContextOrSamplingFlags;
-import brave.sampler.Sampler;
-import com.google.common.base.Splitter;
+import brave.sampler.RateLimitingSampler;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 public final class SecondarySampling {
-  public static final class FinishedSpanHandler extends brave.handler.FinishedSpanHandler {
-    final Map<String, brave.handler.FinishedSpanHandler> configuredHandlers;
 
-    public FinishedSpanHandler(Map<String, brave.handler.FinishedSpanHandler> configuredHandlers) {
-      this.configuredHandlers = configuredHandlers;
+  public static SecondarySampling create() {
+    return new SecondarySampling();
+  }
+
+  Propagation.Factory propagationFactory = B3SinglePropagation.FACTORY;
+  final Map<String, brave.handler.FinishedSpanHandler> systemToHandlers = new ConcurrentHashMap<>();
+
+  SecondarySampling() {
+  }
+
+  public SecondarySampling putSystem(String system, brave.handler.FinishedSpanHandler handler) {
+    systemToHandlers.put(system, handler);
+    return this;
+  }
+
+  public SecondarySampling removeSystem(String system) {
+    systemToHandlers.remove(system);
+    return this;
+  }
+
+  public Tracing build(Tracing.Builder builder) {
+    return builder
+      .addFinishedSpanHandler(new FinishedSpanHandler(systemToHandlers))
+      // BRAVE6: we need a better collaboration model for propagation than wrapping, as it makes
+      // configuration complex.
+      .propagationFactory(new PropagationFactory(propagationFactory, systemToHandlers.keySet()))
+      .build();
+  }
+
+  static final class FinishedSpanHandler extends brave.handler.FinishedSpanHandler {
+    final Map<String, brave.handler.FinishedSpanHandler> systemToHandlers;
+
+    FinishedSpanHandler(Map<String, brave.handler.FinishedSpanHandler> systemToHandlers) {
+      this.systemToHandlers = systemToHandlers;
     }
 
     @Override public boolean handle(TraceContext context, MutableSpan span) {
       Extra extra = context.findExtra(Extra.class);
       if (extra == null) return true;
-      for (String state : extra.states.keySet()) {
-        brave.handler.FinishedSpanHandler handler = configuredHandlers.get(state);
-        if (handler != null) handler.handle(context, span);
+      for (String system : extra.systems.keySet()) {
+        brave.handler.FinishedSpanHandler handler = systemToHandlers.get(system);
+        if (handler != null && "1".equals(extra.systems.get(system).get("sampled"))) {
+          handler.handle(context, span);
+        }
       }
       return true;
     }
   }
 
-  public static final class PropagationFactory extends Propagation.Factory {
+  static final class PropagationFactory extends Propagation.Factory {
     final Propagation.Factory delegate;
-    final Set<String> configuredStates;
+    final Set<String> configuredSystems;
 
-    PropagationFactory(Propagation.Factory delegate, Set<String> configuredStates) {
+    PropagationFactory(Propagation.Factory delegate, Set<String> configuredSystems) {
       this.delegate = delegate;
-      this.configuredStates = configuredStates;
+      this.configuredSystems = configuredSystems;
     }
 
     @Override public boolean supportsJoin() {
@@ -63,38 +97,37 @@ public final class SecondarySampling {
     }
 
     @Override public <K> Propagation<K> create(KeyFactory<K> keyFactory) {
-      return new Propagation<>(delegate.create(keyFactory), keyFactory, configuredStates);
+      return new Propagation<>(delegate.create(keyFactory), keyFactory, configuredSystems);
     }
   }
 
   static final class Extra {
-    Map<String, Map<String, String>> states = new LinkedHashMap<>();
+    final Map<String, Map<String, String>> systems = new LinkedHashMap<>();
 
     @Override public String toString() {
-      return states.entrySet()
-          .stream()
-          .map(s -> s.getKey() + ":" + Extra.toString(s.getValue()))
-          .collect(Collectors.joining(";"));
+      return systems.entrySet()
+        .stream()
+        .map(s -> s.getKey() + ":" + Extra.toString(s.getValue()))
+        .collect(Collectors.joining(";"));
     }
 
     static String toString(Map<String, String> s) {
       return s.entrySet()
-          .stream()
-          .map(e -> e.getKey() + "=" + e.getValue())
-          .collect(Collectors.joining(","));
+        .stream()
+        .map(e -> e.getKey() + "=" + e.getValue())
+        .collect(Collectors.joining(","));
     }
   }
 
   static class Propagation<K> implements brave.propagation.Propagation<K> {
-
     final brave.propagation.Propagation<K> delegate;
-    final Set<String> configuredStates;
+    final Set<String> configuredSystems;
     final K samplingKey;
 
     Propagation(brave.propagation.Propagation<K> delegate, KeyFactory<K> keyFactory,
-        Set<String> configuredStates) {
+      Set<String> configuredSystems) {
       this.delegate = delegate;
-      this.configuredStates = configuredStates;
+      this.configuredSystems = configuredSystems;
       this.samplingKey = keyFactory.create("sampling");
     }
 
@@ -126,7 +159,7 @@ public final class SecondarySampling {
     @Override public void inject(TraceContext traceContext, C carrier) {
       delegate.inject(traceContext, carrier);
       Extra sampled = traceContext.findExtra(Extra.class);
-      if (sampled == null || sampled.states.isEmpty()) return;
+      if (sampled == null || sampled.systems.isEmpty()) return;
       setter.put(carrier, samplingKey, sampled.toString());
     }
   }
@@ -134,13 +167,13 @@ public final class SecondarySampling {
   static final class Extractor<C, K> implements TraceContext.Extractor<C> {
     final TraceContext.Extractor<C> delegate;
     final Getter<C, K> getter;
-    final Set<String> configuredStates;
+    final Set<String> configuredSystems;
     final K samplingKey;
 
     Extractor(Propagation<K> propagation, Getter<C, K> getter) {
       this.delegate = propagation.delegate.extractor(getter);
       this.getter = getter;
-      this.configuredStates = propagation.configuredStates;
+      this.configuredSystems = propagation.configuredSystems;
       this.samplingKey = propagation.samplingKey;
     }
 
@@ -152,44 +185,48 @@ public final class SecondarySampling {
       TraceContextOrSamplingFlags.Builder builder = result.toBuilder().addExtra(extra);
       if (maybeValue == null) return builder.build();
 
-      for (String entry : Splitter.on(";").split(maybeValue)) {
-        String[] nameValue = entry.split(":");
+      for (String entry : maybeValue.split(";", 100)) {
+        String[] nameValue = entry.split(":", 2);
         String name = nameValue[0];
-        Map<String, String> state = Splitter.on(",").withKeyValueSeparator("=").split(nameValue[1]);
-
-        if (configuredStates.contains(name)) {
-          state = new LinkedHashMap<>(state); // make mutable
-          if (update(state)) {
-            if (state.get("sampled").equals("1")) {
-              builder.sampledLocal(); // this allows us to override the default decision
-            }
-            extra.states.put(name, state);
-          }
-        } else {
-          extra.states.put(name, state);
+
+        Map<String, String> systemToState = parseSystem(nameValue[1]);
+        if (configuredSystems.contains(name) && updateStateAndSample(systemToState)) {
+          builder.sampledLocal(); // this means data will be recorded
         }
+        if (!systemToState.isEmpty()) extra.systems.put(name, systemToState);
       }
 
       return builder.build();
     }
   }
 
-  static boolean update(Map<String, String> state) {
-    // if there's a rate, convert it to a sampling decision
-    String rate = state.remove("rate");
-    if (rate != null) {
+  static Map<String, String> parseSystem(String system) {
+    Map<String, String> result = new LinkedHashMap<>();
+    for (String entry : system.split(",", 100)) {
+      String[] nameValue = entry.split("=", 2);
+      result.put(nameValue[0], nameValue[1]);
+    }
+    return result;
+  }
+
+  static boolean updateStateAndSample(Map<String, String> state) {
+    // if there's a tps, convert it to a sampling decision
+    String tps = state.remove("tps");
+    if (tps != null) {
       // in real life the sampler would be cached
-      boolean decision = Sampler.create(Float.parseFloat(rate)).isSampled(1L);
+      boolean decision = RateLimitingSampler.create(Integer.parseInt(tps)).isSampled(1L);
       state.put("sampled", decision ? "1" : "0");
-    } else if (state.containsKey("ttl")) {
-      // decrement ttl if there is one
+      return decision;
+    }
+
+    if (state.containsKey("ttl")) { // decrement ttl if there is one
       String ttl = state.remove("ttl");
-      if (ttl != null && !ttl.equals("1")) {
-        state.put("ttl", Integer.toString(Integer.parseInt(ttl) - 1));
+      if (ttl.equals("1")) {
+        state.remove("sampled");
       } else {
-        return false; // remove the out-dated decision
+        state.put("ttl", Integer.toString(Integer.parseInt(ttl) - 1));
       }
     }
-    return true;
+    return "1".equals(state.get("sampled"));
   }
 }
diff --git a/brave/src/test/java/brave/features/propagation/SecondarySamplingTest.java b/brave/src/test/java/brave/features/propagation/SecondarySamplingTest.java
index 0852e90..c9677b0 100644
--- a/brave/src/test/java/brave/features/propagation/SecondarySamplingTest.java
+++ b/brave/src/test/java/brave/features/propagation/SecondarySamplingTest.java
@@ -20,12 +20,11 @@ import brave.Span;
 import brave.Span.Kind;
 import brave.Tracer;
 import brave.Tracing;
+import brave.features.propagation.SecondarySampling.Extra;
 import brave.handler.FinishedSpanHandler;
 import brave.handler.MutableSpan;
-import brave.propagation.B3SinglePropagation;
 import brave.propagation.TraceContext;
 import brave.propagation.TraceContextOrSamplingFlags;
-import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -33,33 +32,33 @@ import java.util.Map;
 import org.junit.After;
 import org.junit.Test;
 
-import static brave.features.propagation.SecondarySampling.Extra;
 import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
 import static org.assertj.core.api.Assertions.assertThat;
 
 public class SecondarySamplingTest {
   List<zipkin2.Span> zipkin = new ArrayList<>();
-  List<MutableSpan> zeus = new ArrayList<>(), apollo = new ArrayList<>();
-  Map<String, FinishedSpanHandler> stateToFinishedSpanHandler = ImmutableMap.of(
-      "zeus", new FinishedSpanHandler() {
-        @Override public boolean handle(TraceContext context, MutableSpan span) {
-          return zeus.add(span);
-        }
-      },
-      "apollo", new FinishedSpanHandler() {
-        @Override public boolean handle(TraceContext context, MutableSpan span) {
-          return apollo.add(span);
-        }
+  List<MutableSpan> edge = new ArrayList<>(), links = new ArrayList<>(), triage =
+    new ArrayList<>();
+  FinishedSpanHandler triageHandler = new FinishedSpanHandler() {
+    @Override public boolean handle(TraceContext context, MutableSpan span) {
+      return triage.add(span);
+    }
+  };
+
+  SecondarySampling secondarySampling = SecondarySampling.create()
+    .putSystem("edge", new FinishedSpanHandler() {
+      @Override public boolean handle(TraceContext context, MutableSpan span) {
+        return edge.add(span);
       }
-  );
-
-  Tracing tracing = Tracing.newBuilder()
-      .addFinishedSpanHandler(new SecondarySampling.FinishedSpanHandler(stateToFinishedSpanHandler))
-      .propagationFactory(new SecondarySampling.PropagationFactory(
-          B3SinglePropagation.FACTORY, stateToFinishedSpanHandler.keySet()
-      ))
-      .spanReporter(zipkin::add)
-      .build();
+    })
+    .putSystem("links", new FinishedSpanHandler() {
+      @Override public boolean handle(TraceContext context, MutableSpan span) {
+        return links.add(span);
+      }
+    });
+  Tracing tracing = secondarySampling.build(Tracing.newBuilder().spanReporter(zipkin::add));
 
   TraceContext.Extractor<Map<String, String>> extractor = tracing.propagation().extractor(Map::get);
   TraceContext.Injector<Map<String, String>> injector = tracing.propagation().injector(Map::put);
@@ -70,106 +69,142 @@ public class SecondarySamplingTest {
     tracing.close();
   }
 
-  /** This shows when primary trace status is not sampled, we can send to handlers anyway. */
+  /**
+   * This shows when primary trace status is not sampled, we can send to handlers anyway.
+   *
+   * <p>At first, "triage" is not configured, so the tracer ignores it. Later, it is configured, so
+   * starts receiving traces.
+   */
   @Test public void integrationTest() {
     map.put("b3", "0");
-    map.put("sampling", "zeus:rate=1.0,ttl=3;apollo:sampled=1;wookie:rate=0.05");
+    map.put("sampling", "edge:tps=1,ttl=3;links:sampled=1;triage:tps=5");
 
     Tracer tracer = tracing.tracer();
     Span span1 = tracer.nextSpan(extractor.extract(map)).name("span1").kind(Kind.SERVER).start();
     Span span2 = tracer.newChild(span1.context()).kind(Kind.CLIENT).name("span2").start();
     injector.inject(span2.context(), map);
-    assertThat(map).containsEntry("sampling",
-        "zeus:ttl=3,sampled=1;apollo:sampled=1;wookie:rate=0.05");
+    assertThat(map).containsEntry("sampling", "edge:ttl=3,sampled=1;links:sampled=1;triage:tps=5");
 
     // hop 1
-    Span span3 = tracer.joinSpan(extractor.extract(map).context()).kind(Kind.SERVER).start();
+    Span span3 = tracer.nextSpan(extractor.extract(map)).kind(Kind.SERVER).start();
     Span span4 = tracer.newChild(span3.context()).kind(Kind.CLIENT).name("span3").start();
     injector.inject(span4.context(), map);
-    assertThat(map).containsEntry("sampling",
-        "zeus:sampled=1,ttl=2;apollo:sampled=1;wookie:rate=0.05");
+    assertThat(map).containsEntry("sampling", "edge:sampled=1,ttl=2;links:sampled=1;triage:tps=5");
 
     // hop 2
-    Span span5 = tracer.joinSpan(extractor.extract(map).context()).kind(Kind.SERVER).start();
+    Span span5 = tracer.nextSpan(extractor.extract(map)).kind(Kind.SERVER).start();
     Span span6 = tracer.newChild(span5.context()).kind(Kind.CLIENT).name("span4").start();
     injector.inject(span6.context(), map);
-    assertThat(map).containsEntry("sampling",
-        "zeus:sampled=1,ttl=1;apollo:sampled=1;wookie:rate=0.05");
+    assertThat(map).containsEntry("sampling", "edge:sampled=1,ttl=1;links:sampled=1;triage:tps=5");
 
     // hop 3
-    Span span7 = tracer.joinSpan(extractor.extract(map).context()).kind(Kind.SERVER).start();
+    Span span7 = tracer.nextSpan(extractor.extract(map)).kind(Kind.SERVER).start();
     Span span8 = tracer.newChild(span7.context()).kind(Kind.CLIENT).name("span5").start();
     injector.inject(span8.context(), map);
-    assertThat(map).containsEntry("sampling", "apollo:sampled=1;wookie:rate=0.05");
+    assertThat(map).containsEntry("sampling", "links:sampled=1;triage:tps=5");
 
-    // hop 4
-    Span span9 = tracer.joinSpan(extractor.extract(map).context()).kind(Kind.SERVER).start();
+    // dynamic configuration adds triage processing
+    secondarySampling.putSystem("triage", triageHandler);
+
+    // hop 4, triage is now sampled
+    Span span9 = tracer.nextSpan(extractor.extract(map)).kind(Kind.SERVER).start();
     Span span10 = tracer.newChild(span9.context()).kind(Kind.CLIENT).name("span6").start();
     injector.inject(span10.context(), map);
-    assertThat(map).containsEntry("sampling", "apollo:sampled=1;wookie:rate=0.05");
+    assertThat(map).containsEntry("sampling", "links:sampled=1;triage:sampled=1");
 
     asList(span1, span2, span3, span4, span5, span6, span7, span8, span9, span10)
-        .forEach(Span::finish);
+      .forEach(Span::finish);
 
     assertThat(zipkin).isEmpty();
-    assertThat(zeus).filteredOn(s -> s.kind() == Kind.SERVER).hasSize(4);
-    assertThat(apollo).filteredOn(s -> s.kind() == Kind.SERVER).hasSize(5);
+    assertThat(edge).filteredOn(s -> s.kind() == Kind.SERVER).hasSize(3);
+    assertThat(links).filteredOn(s -> s.kind() == Kind.SERVER).hasSize(5);
+    assertThat(triage).filteredOn(s -> s.kind() == Kind.SERVER).hasSize(1);
   }
 
   @Test public void extract_samplesLocalWhenConfigured() {
     map.put("b3", "0");
-    map.put("sampling", "apollo:sampled=0;wookie:rate=0.05");
+    map.put("sampling", "links:sampled=0;triage:tps=5");
 
     assertThat(extractor.extract(map).sampledLocal()).isFalse();
 
     map.put("b3", "0");
-    map.put("sampling", "apollo:sampled=0;wookie:sampled=1");
+    map.put("sampling", "links:sampled=0;triage:sampled=1");
 
     assertThat(extractor.extract(map).sampledLocal()).isFalse();
 
     map.put("b3", "0");
-    map.put("sampling", "apollo:sampled=1;wookie:rate=0.05");
+    map.put("sampling", "links:sampled=1;triage:tps=5");
+
+    assertThat(extractor.extract(map).sampledLocal()).isTrue();
+  }
+
+  /** This shows an example of dynamic configuration */
+  @Test public void dynamicConfiguration() {
+    // base case: links is configured, triage is not. triage is in the headers, though!
+    map.put("b3", "0");
+    map.put("sampling", "links:sampled=1;triage:tps=5");
 
     assertThat(extractor.extract(map).sampledLocal()).isTrue();
+
+    // dynamic configuration removes link processing
+    secondarySampling.removeSystem("links");
+    assertThat(extractor.extract(map).sampledLocal()).isFalse();
+
+    // dynamic configuration adds triage processing
+    secondarySampling.putSystem("triage", triageHandler);
+    assertThat(extractor.extract(map).sampledLocal()).isTrue();
+
+    tracing.tracer().nextSpan(extractor.extract(map)).start().finish();
+    assertThat(zipkin).isEmpty();
+    assertThat(edge).isEmpty();
+    assertThat(links).isEmpty(); // no longer configured
+    assertThat(triage).hasSize(1); // now configured
   }
 
-  @Test public void extract_convertsConfiguredRateToDecision() {
+  @Test public void extract_convertsConfiguredTpsToDecision() {
     map.put("b3", "0");
-    map.put("sampling", "zeus:rate=1.0,ttl=3;apollo:sampled=0;wookie:rate=0.05");
+    map.put("sampling", "edge:tps=1,ttl=3;links:sampled=0;triage:tps=5");
 
     TraceContextOrSamplingFlags extracted = extractor.extract(map);
     Extra extra = (Extra) extracted.extra().get(0);
-    assertThat(extra.states)
-        .containsEntry("zeus", ImmutableMap.of("sampled", "1", "ttl", "3"))
-        .containsEntry("apollo", ImmutableMap.of("sampled", "0"))
-        .containsEntry("wookie", ImmutableMap.of("rate", "0.05"));
+    assertThat(extra.systems)
+      .containsEntry("edge", twoEntryMap("sampled", "1", "ttl", "3"))
+      .containsEntry("links", singletonMap("sampled", "0"))
+      .containsEntry("triage", singletonMap("tps", "5")); // triage is not configured
   }
 
   @Test public void extract_decrementsTtlWhenConfigured() {
     map.put("b3", "0");
-    map.put("sampling", "zeus:sampled=1,ttl=3;apollo:sampled=0,ttl=1;wookie:rate=0.05");
+    map.put("sampling", "edge:sampled=1,ttl=3;links:sampled=0,ttl=1;triage:tps=5");
 
     TraceContextOrSamplingFlags extracted = extractor.extract(map);
     Extra extra = (Extra) extracted.extra().get(0);
-    assertThat(extra.states)
-        .containsEntry("zeus", ImmutableMap.of("sampled", "1", "ttl", "2"))
-        .doesNotContainKey("apollo")
-        .containsEntry("wookie", ImmutableMap.of("rate", "0.05"));
+    assertThat(extra.systems)
+      .containsEntry("edge", twoEntryMap("sampled", "1", "ttl", "2"))
+      .doesNotContainKey("links")
+      .containsEntry("triage", singletonMap("tps", "5"));
   }
 
-  @Test public void injectWritesAllStates() {
+  @Test public void injectWritesAllSystems() {
     Extra extra = new Extra();
-    extra.states.put("zeus", ImmutableMap.of("rate", "1.0", "ttl", "3"));
-    extra.states.put("apollo", ImmutableMap.of("sampled", "0"));
-    extra.states.put("wookie", ImmutableMap.of("rate", "0.05"));
+    extra.systems.put("edge", twoEntryMap("tps", "1", "ttl", "3"));
+    extra.systems.put("links", singletonMap("sampled", "0"));
+    extra.systems.put("triage", singletonMap("tps", "5"));
 
     injector.inject(TraceContext.newBuilder()
-        .traceId(1L).spanId(2L)
-        .sampled(false)
-        .extra(asList(extra))
-        .build(), map);
+      .traceId(1L).spanId(2L)
+      .sampled(false)
+      .extra(singletonList(extra))
+      .build(), map);
 
     assertThat(map)
-        .containsEntry("sampling", "zeus:rate=1.0,ttl=3;apollo:sampled=0;wookie:rate=0.05");
+      .containsEntry("sampling", "edge:tps=1,ttl=3;links:sampled=0;triage:tps=5");
+  }
+
+  static <K, V> Map<K, V> twoEntryMap(K key1, V value1, K key2, V value2) {
+    Map<K, V> result = new LinkedHashMap<>();
+    result.put(key1, value1);
+    result.put(key2, value2);
+    return result;
   }
 }