You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zipkin.apache.org by ad...@apache.org on 2019/06/03 03:28:08 UTC
[incubator-zipkin-brave] branch master updated: RateLimitingSampler
parallel theory didn't run by default (#888)
This is an automated email from the ASF dual-hosted git repository.
adriancole pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin-brave.git
The following commit(s) were added to refs/heads/master by this push:
new 4db98d3 RateLimitingSampler parallel theory didn't run by default (#888)
4db98d3 is described below
commit 4db98d3a6f4f07e8edfdfa3f490a948f6eefe373
Author: Adrian Cole <ad...@users.noreply.github.com>
AuthorDate: Mon Jun 3 11:28:04 2019 +0800
RateLimitingSampler parallel theory didn't run by default (#888)
This fixes lack of rate limited sampler tests running.
---
.../java/brave/sampler/RateLimitingSampler.java | 5 +-
.../brave/sampler/RateLimitingSamplerSoakTest.java | 88 ++++++++++++++++++++++
.../brave/sampler/RateLimitingSamplerTest.java | 31 ++++----
3 files changed, 107 insertions(+), 17 deletions(-)
diff --git a/brave/src/main/java/brave/sampler/RateLimitingSampler.java b/brave/src/main/java/brave/sampler/RateLimitingSampler.java
index b16f7f3..9dfc6f0 100644
--- a/brave/src/main/java/brave/sampler/RateLimitingSampler.java
+++ b/brave/src/main/java/brave/sampler/RateLimitingSampler.java
@@ -67,9 +67,9 @@ public class RateLimitingSampler extends Sampler {
}
@Override public boolean isSampled(long ignoredTraceId) {
- long now = System.nanoTime();
- long updateAt = nextReset.get();
+ long now = System.nanoTime(), updateAt = nextReset.get();
+ // First task is to determine if this request is later than the one second sampling window
long nanosUntilReset = -(now - updateAt); // because nanoTime can be negative
if (nanosUntilReset <= 0) {
// Attempt to move into the next sampling interval.
@@ -81,6 +81,7 @@ public class RateLimitingSampler extends Sampler {
return isSampled(ignoredTraceId);
}
+ // Now, we determine the amount of samples allowed for this interval, and sample accordingly
int max = maxFunction.max(nanosUntilReset);
int prev, next;
do { // same form as java 8 AtomicLong.getAndUpdate
diff --git a/brave/src/test/java/brave/sampler/RateLimitingSamplerSoakTest.java b/brave/src/test/java/brave/sampler/RateLimitingSamplerSoakTest.java
new file mode 100644
index 0000000..bca68f1
--- /dev/null
+++ b/brave/src/test/java/brave/sampler/RateLimitingSamplerSoakTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brave.sampler;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.junit.experimental.theories.DataPoints;
+import org.junit.experimental.theories.Theories;
+import org.junit.experimental.theories.Theory;
+import org.junit.runner.RunWith;
+
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+@RunWith(Theories.class)
+public class RateLimitingSamplerSoakTest {
+
+ @DataPoints public static final int[] SAMPLE_RESERVOIRS = {1, 11, 101, 1001, 1_000_001};
+
+ /** This test will take a little over a second per reservoir */
+ @Theory public void retainsPerSampleRate(int reservoir) throws Exception {
+ Sampler sampler = RateLimitingSampler.create(reservoir);
+
+ // We want to make sure we fill up the entire second, so
+ long startTick = System.nanoTime();
+ long lastDecisecond = startTick + TimeUnit.MILLISECONDS.toNanos(990);
+ // Because exacts don't work with Thread.sleep + operation overhead, set a deadline just under a second
+ long deadline = startTick + TimeUnit.MILLISECONDS.toNanos(998);
+
+ AtomicBoolean hitLastDecisecond = new AtomicBoolean();
+
+ AtomicLong passed = new AtomicLong();
+ Runnable sample = () -> {
+ long tick = System.nanoTime();
+ if (tick > deadline) return;
+ if (tick >= lastDecisecond) hitLastDecisecond.set(true);
+
+ if (sampler.isSampled(0L)) passed.incrementAndGet();
+ };
+
+ Runnable loopAndSample = () -> {
+ do {
+ if (reservoir > 10) { // execute one tenth of our reservoir
+ for (int j = 0; j < reservoir / 10; j++) sample.run();
+ } else {// don't divide by 10!
+ sample.run();
+ }
+ sleepUninterruptibly(9, TimeUnit.MILLISECONDS);
+ } while (System.nanoTime() < deadline);
+ };
+
+ int threadCount = 10;
+ ExecutorService service = Executors.newFixedThreadPool(threadCount);
+ List<Future<?>> futures = new ArrayList<>();
+ for (int i = 0; i < threadCount; i++) {
+ futures.add(service.submit(loopAndSample));
+ }
+
+ service.shutdown();
+ service.awaitTermination(1, TimeUnit.SECONDS);
+
+ assertThat(passed.get()).isEqualTo(reservoir);
+ assumeThat(hitLastDecisecond.get())
+ .withFailMessage("ran out of samples before the end of the second")
+ .isTrue();
+ }
+}
diff --git a/brave/src/test/java/brave/sampler/RateLimitingSamplerTest.java b/brave/src/test/java/brave/sampler/RateLimitingSamplerTest.java
index 0c4881e..7035f31 100644
--- a/brave/src/test/java/brave/sampler/RateLimitingSamplerTest.java
+++ b/brave/src/test/java/brave/sampler/RateLimitingSamplerTest.java
@@ -16,11 +16,8 @@
*/
package brave.sampler;
-import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
-import org.junit.experimental.theories.DataPoints;
-import org.junit.experimental.theories.Theory;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -28,7 +25,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
import static brave.sampler.RateLimitingSampler.NANOS_PER_DECISECOND;
import static brave.sampler.RateLimitingSampler.NANOS_PER_SECOND;
-import static brave.sampler.SamplerTest.INPUT_SIZE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.powermock.api.mockito.PowerMockito.when;
@@ -52,6 +48,22 @@ public class RateLimitingSamplerTest {
assertThat(sampler.isSampled(0L)).isFalse();
}
+ @Test public void edgeCases() {
+ mockStatic(System.class);
+ when(System.nanoTime()).thenReturn(NANOS_PER_SECOND);
+ Sampler sampler = RateLimitingSampler.create(2);
+
+ // exact moment of reset
+ when(System.nanoTime()).thenReturn(NANOS_PER_SECOND);
+ assertThat(sampler.isSampled(0L)).isTrue();
+
+ // right before next interval
+ when(System.nanoTime()).thenReturn(NANOS_PER_SECOND + NANOS_PER_SECOND - 1);
+ assertThat(sampler.isSampled(0L)).isTrue();
+ when(System.nanoTime()).thenReturn(NANOS_PER_SECOND + NANOS_PER_SECOND - 1);
+ assertThat(sampler.isSampled(0L)).isFalse();
+ }
+
@Test public void resetsAfterASecond() {
mockStatic(System.class);
@@ -157,17 +169,6 @@ public class RateLimitingSamplerTest {
assertThat(sampler.isSampled(0L)).isTrue();
}
- @DataPoints public static final int[] SAMPLE_RESERVOIRS = {1, 10, 100};
-
- @Theory public void retainsPerSampleRate(int reservoir) {
- Sampler sampler = RateLimitingSampler.create(reservoir);
-
- // parallel to ensure there aren't any unsynchronized race conditions
- long passed = new Random().longs(INPUT_SIZE).parallel().filter(sampler::isSampled).count();
-
- assertThat(passed).isEqualTo(reservoir);
- }
-
@Test public void zeroMeansDropAllTraces() {
assertThat(RateLimitingSampler.create(0)).isSameAs(Sampler.NEVER_SAMPLE);
}