You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zipkin.apache.org by ad...@apache.org on 2019/06/03 03:28:08 UTC

[incubator-zipkin-brave] branch master updated: RateLimitingSampler parallel theory didn't run by default (#888)

This is an automated email from the ASF dual-hosted git repository.

adriancole pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin-brave.git


The following commit(s) were added to refs/heads/master by this push:
     new 4db98d3  RateLimitingSampler parallel theory didn't run by default (#888)
4db98d3 is described below

commit 4db98d3a6f4f07e8edfdfa3f490a948f6eefe373
Author: Adrian Cole <ad...@users.noreply.github.com>
AuthorDate: Mon Jun 3 11:28:04 2019 +0800

    RateLimitingSampler parallel theory didn't run by default (#888)
    
    This fixes lack of rate limited sampler tests running.
---
 .../java/brave/sampler/RateLimitingSampler.java    |  5 +-
 .../brave/sampler/RateLimitingSamplerSoakTest.java | 88 ++++++++++++++++++++++
 .../brave/sampler/RateLimitingSamplerTest.java     | 31 ++++----
 3 files changed, 107 insertions(+), 17 deletions(-)

diff --git a/brave/src/main/java/brave/sampler/RateLimitingSampler.java b/brave/src/main/java/brave/sampler/RateLimitingSampler.java
index b16f7f3..9dfc6f0 100644
--- a/brave/src/main/java/brave/sampler/RateLimitingSampler.java
+++ b/brave/src/main/java/brave/sampler/RateLimitingSampler.java
@@ -67,9 +67,9 @@ public class RateLimitingSampler extends Sampler {
   }
 
   @Override public boolean isSampled(long ignoredTraceId) {
-    long now = System.nanoTime();
-    long updateAt = nextReset.get();
+    long now = System.nanoTime(), updateAt = nextReset.get();
 
+    // First task is to determine if this request is later than the one second sampling window
     long nanosUntilReset = -(now - updateAt); // because nanoTime can be negative
     if (nanosUntilReset <= 0) {
       // Attempt to move into the next sampling interval.
@@ -81,6 +81,7 @@ public class RateLimitingSampler extends Sampler {
       return isSampled(ignoredTraceId);
     }
 
+    // Now, we determine the amount of samples allowed for this interval, and sample accordingly
     int max = maxFunction.max(nanosUntilReset);
     int prev, next;
     do { // same form as java 8 AtomicLong.getAndUpdate
diff --git a/brave/src/test/java/brave/sampler/RateLimitingSamplerSoakTest.java b/brave/src/test/java/brave/sampler/RateLimitingSamplerSoakTest.java
new file mode 100644
index 0000000..bca68f1
--- /dev/null
+++ b/brave/src/test/java/brave/sampler/RateLimitingSamplerSoakTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brave.sampler;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.junit.experimental.theories.DataPoints;
+import org.junit.experimental.theories.Theories;
+import org.junit.experimental.theories.Theory;
+import org.junit.runner.RunWith;
+
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+@RunWith(Theories.class)
+public class RateLimitingSamplerSoakTest {
+
+  @DataPoints public static final int[] SAMPLE_RESERVOIRS = {1, 11, 101, 1001, 1_000_001};
+
+  /** This test will take a little over a second per reservoir */
+  @Theory public void retainsPerSampleRate(int reservoir) throws Exception {
+    Sampler sampler = RateLimitingSampler.create(reservoir);
+
+    // We want to make sure we fill up the entire second, so
+    long startTick = System.nanoTime();
+    long lastDecisecond = startTick + TimeUnit.MILLISECONDS.toNanos(990);
+    // Because exacts don't work with Thread.sleep + operation overhead, set a deadline just under a second
+    long deadline = startTick + TimeUnit.MILLISECONDS.toNanos(998);
+
+    AtomicBoolean hitLastDecisecond = new AtomicBoolean();
+
+    AtomicLong passed = new AtomicLong();
+    Runnable sample = () -> {
+      long tick = System.nanoTime();
+      if (tick > deadline) return;
+      if (tick >= lastDecisecond) hitLastDecisecond.set(true);
+
+      if (sampler.isSampled(0L)) passed.incrementAndGet();
+    };
+
+    Runnable loopAndSample = () -> {
+      do {
+        if (reservoir > 10) {  // execute one tenth of our reservoir
+          for (int j = 0; j < reservoir / 10; j++) sample.run();
+        } else {// don't divide by 10!
+          sample.run();
+        }
+        sleepUninterruptibly(9, TimeUnit.MILLISECONDS);
+      } while (System.nanoTime() < deadline);
+    };
+
+    int threadCount = 10;
+    ExecutorService service = Executors.newFixedThreadPool(threadCount);
+    List<Future<?>> futures = new ArrayList<>();
+    for (int i = 0; i < threadCount; i++) {
+      futures.add(service.submit(loopAndSample));
+    }
+
+    service.shutdown();
+    service.awaitTermination(1, TimeUnit.SECONDS);
+
+    assertThat(passed.get()).isEqualTo(reservoir);
+    assumeThat(hitLastDecisecond.get())
+        .withFailMessage("ran out of samples before the end of the second")
+        .isTrue();
+  }
+}
diff --git a/brave/src/test/java/brave/sampler/RateLimitingSamplerTest.java b/brave/src/test/java/brave/sampler/RateLimitingSamplerTest.java
index 0c4881e..7035f31 100644
--- a/brave/src/test/java/brave/sampler/RateLimitingSamplerTest.java
+++ b/brave/src/test/java/brave/sampler/RateLimitingSamplerTest.java
@@ -16,11 +16,8 @@
  */
 package brave.sampler;
 
-import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import org.junit.Test;
-import org.junit.experimental.theories.DataPoints;
-import org.junit.experimental.theories.Theory;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -28,7 +25,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
 
 import static brave.sampler.RateLimitingSampler.NANOS_PER_DECISECOND;
 import static brave.sampler.RateLimitingSampler.NANOS_PER_SECOND;
-import static brave.sampler.SamplerTest.INPUT_SIZE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.powermock.api.mockito.PowerMockito.when;
@@ -52,6 +48,22 @@ public class RateLimitingSamplerTest {
     assertThat(sampler.isSampled(0L)).isFalse();
   }
 
+  @Test public void edgeCases() {
+    mockStatic(System.class);
+    when(System.nanoTime()).thenReturn(NANOS_PER_SECOND);
+    Sampler sampler = RateLimitingSampler.create(2);
+
+    // exact moment of reset
+    when(System.nanoTime()).thenReturn(NANOS_PER_SECOND);
+    assertThat(sampler.isSampled(0L)).isTrue();
+
+    // right before next interval
+    when(System.nanoTime()).thenReturn(NANOS_PER_SECOND + NANOS_PER_SECOND - 1);
+    assertThat(sampler.isSampled(0L)).isTrue();
+    when(System.nanoTime()).thenReturn(NANOS_PER_SECOND + NANOS_PER_SECOND - 1);
+    assertThat(sampler.isSampled(0L)).isFalse();
+  }
+
   @Test public void resetsAfterASecond() {
     mockStatic(System.class);
 
@@ -157,17 +169,6 @@ public class RateLimitingSamplerTest {
     assertThat(sampler.isSampled(0L)).isTrue();
   }
 
-  @DataPoints public static final int[] SAMPLE_RESERVOIRS = {1, 10, 100};
-
-  @Theory public void retainsPerSampleRate(int reservoir) {
-    Sampler sampler = RateLimitingSampler.create(reservoir);
-
-    // parallel to ensure there aren't any unsynchronized race conditions
-    long passed = new Random().longs(INPUT_SIZE).parallel().filter(sampler::isSampled).count();
-
-    assertThat(passed).isEqualTo(reservoir);
-  }
-
   @Test public void zeroMeansDropAllTraces() {
     assertThat(RateLimitingSampler.create(0)).isSameAs(Sampler.NEVER_SAMPLE);
   }