You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zipkin.apache.org by ad...@apache.org on 2019/05/10 09:59:32 UTC
[incubator-zipkin] 01/01: Makes it an error to store during
assembly of a call
This is an automated email from the ASF dual-hosted git repository.
adriancole pushed a commit to branch error-io-on-assembly
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git
commit 8300976933d3f6700c7dec8bf5f82754a5699e56
Author: Adrian Cole <ac...@pivotal.io>
AuthorDate: Fri May 10 17:57:51 2019 +0800
Makes it an error to store during assembly of a call
Before this, there was some extra code in the throttle package handling
a bug in our in memory storage. This fixes that and removes the extra
code.
See #2502
---
.../server/internal/throttle/ThrottledCall.java | 59 +++++-------------
.../throttle/ThrottledStorageComponent.java | 4 +-
.../server/internal/throttle/ThrottledCallTest.kt | 72 ++++++++--------------
.../src/main/java/zipkin2/storage/ITSpanStore.java | 56 +++++++++++++++++
.../main/java/zipkin2/storage/InMemoryStorage.java | 39 +++++++++++-
.../java/zipkin2/storage/InMemoryStorageTest.java | 14 ++---
6 files changed, 142 insertions(+), 102 deletions(-)
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java
index f43d61e..67aa71d 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java
@@ -24,10 +24,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
-import java.util.function.Supplier;
import zipkin2.Call;
import zipkin2.Callback;
-import zipkin2.storage.InMemoryStorage;
/**
* {@link Call} implementation that is backed by an {@link ExecutorService}. The ExecutorService
@@ -41,39 +39,21 @@ import zipkin2.storage.InMemoryStorage;
*
* @see ThrottledStorageComponent
*/
-final class ThrottledCall<V> extends Call<V> {
+final class ThrottledCall<V> extends Call.Base<V> {
final ExecutorService executor;
final Limiter<Void> limiter;
- final Listener limitListener;
- /**
- * supplier call needs to be supplied later to avoid having it take action when it is created
- * (like {@link InMemoryStorage} and thus avoid being throttled.
- */
- final Supplier<? extends Call<V>> supplier;
- volatile Call<V> delegate;
- volatile boolean canceled;
-
- public ThrottledCall(ExecutorService executor, Limiter<Void> limiter,
- Supplier<? extends Call<V>> supplier) {
+ final Call<V> delegate;
+
+ ThrottledCall(ExecutorService executor, Limiter<Void> limiter, Call<V> delegate) {
this.executor = executor;
this.limiter = limiter;
- this.limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);
- this.supplier = supplier;
+ this.delegate = delegate;
}
- // TODO: refactor this when in-memory no longer executes storage ops during assembly time
- ThrottledCall(ThrottledCall<V> other) {
- this(other.executor, other.limiter,
- other.delegate == null ? other.supplier : () -> other.delegate.clone());
- }
+ @Override protected V doExecute() throws IOException {
+ Listener limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);
- // TODO: we cannot currently extend Call.Base as tests execute the call multiple times,
- // which is invalid as calls are one-shot. It isn't worth refactoring until we refactor out
- // the need for assembly time throttling (fix to in-memory storage)
- @Override public V execute() throws IOException {
try {
- delegate = supplier.get();
-
// Make sure we throttle
Future<V> future = executor.submit(() -> {
String oldName = setCurrentThreadName(delegate.toString());
@@ -115,9 +95,11 @@ final class ThrottledCall<V> extends Call<V> {
}
}
- @Override public void enqueue(Callback<V> callback) {
+ @Override protected void doEnqueue(Callback<V> callback) {
+ Listener limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);
+
try {
- executor.execute(new QueuedCall(callback));
+ executor.execute(new QueuedCall(callback, limitListener));
} catch (RuntimeException | Error e) {
propagateIfFatal(e);
// Ignoring in all cases here because storage itself isn't saying we need to throttle. Though, we may still be
@@ -127,21 +109,12 @@ final class ThrottledCall<V> extends Call<V> {
}
}
- @Override public void cancel() {
- canceled = true;
- if (delegate != null) delegate.cancel();
- }
-
- @Override public boolean isCanceled() {
- return canceled || (delegate != null && delegate.isCanceled());
- }
-
@Override public Call<V> clone() {
- return new ThrottledCall<>(this);
+ return new ThrottledCall<>(executor, limiter, delegate.clone());
}
@Override public String toString() {
- return "Throttled" + supplier;
+ return "Throttled" + delegate;
}
static String setCurrentThreadName(String name) {
@@ -153,17 +126,17 @@ final class ThrottledCall<V> extends Call<V> {
final class QueuedCall implements Runnable {
final Callback<V> callback;
+ final Listener limitListener;
- QueuedCall(Callback<V> callback) {
+ QueuedCall(Callback<V> callback, Listener limitListener) {
this.callback = callback;
+ this.limitListener = limitListener;
}
@Override public void run() {
try {
if (isCanceled()) return;
- delegate = ThrottledCall.this.supplier.get();
-
String oldName = setCurrentThreadName(delegate.toString());
try {
enqueueAndWait();
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java
index 91e7b78..1422232 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java
@@ -104,7 +104,7 @@ public final class ThrottledStorageComponent extends StorageComponent {
return "Throttled" + delegate;
}
- final class ThrottledSpanConsumer implements SpanConsumer {
+ static final class ThrottledSpanConsumer implements SpanConsumer {
final SpanConsumer delegate;
final Limiter<Void> limiter;
final ExecutorService executor;
@@ -116,7 +116,7 @@ public final class ThrottledStorageComponent extends StorageComponent {
}
@Override public Call<Void> accept(List<Span> spans) {
- return new ThrottledCall<>(executor, limiter, () -> delegate.accept(spans));
+ return new ThrottledCall<>(executor, limiter, delegate.accept(spans));
}
@Override public String toString() {
diff --git a/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt
index 00eb02f..b729e9a 100644
--- a/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt
+++ b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt
@@ -21,6 +21,7 @@ import com.netflix.concurrency.limits.Limiter.Listener
import com.netflix.concurrency.limits.limit.SettableLimit
import com.netflix.concurrency.limits.limiter.SimpleLimiter
import org.assertj.core.api.Assertions.assertThat
+import org.junit.After
import org.junit.Test
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito
@@ -41,39 +42,34 @@ import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.Semaphore
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
-import java.util.function.Supplier
-// TODO: this class re-uses Call objects which is bad as they are one-shot. This needs to be
-// refactored in order to be realistic (calls throw if re-invoked, as clone() is the correct way)
class ThrottledCallTest {
- var limit = SettableLimit.startingAt(0)
- var limiter = SimpleLimiter.newBuilder().limit(limit).build<Void>()
+ val limit = SettableLimit.startingAt(0)
+ val limiter = SimpleLimiter.newBuilder().limit(limit).build<Void>()
- inline fun <reified T : Any> mock() = Mockito.mock(T::class.java)
+ val numThreads = 1
+ val executor = Executors.newSingleThreadExecutor();
+ @After fun shutdownExecutor() = executor.shutdown()
- @Test fun callCreation_isDeferred() {
- val created = booleanArrayOf(false)
+ inline fun <reified T : Any> mock() = Mockito.mock(T::class.java)
- val throttle = createThrottle(Supplier {
- created[0] = true
- Call.create<Void>(null)
- })
+ @Test fun niceToString() {
+ val delegate: Call<Void> = mock()
+ `when`(delegate.toString()).thenReturn("StoreSpansCall{}")
- assertThat(created).contains(false)
- throttle.execute()
- assertThat(created).contains(true)
+ assertThat(ThrottledCall(executor, limiter, delegate))
+ .hasToString("ThrottledStoreSpansCall{}")
}
@Test fun execute_isThrottled() {
- val numThreads = 1
val queueSize = 1
val totalTasks = numThreads + queueSize
+ limit.limit = totalTasks
val startLock = Semaphore(numThreads)
val waitLock = Semaphore(totalTasks)
val failLock = Semaphore(1)
- val throttle =
- createThrottle(numThreads, queueSize, Supplier { LockedCall(startLock, waitLock) })
+ val throttled = throttle(LockedCall(startLock, waitLock))
// Step 1: drain appropriate locks
startLock.drainPermits()
@@ -83,7 +79,7 @@ class ThrottledCallTest {
// Step 2: saturate threads and fill queue
val backgroundPool = Executors.newCachedThreadPool()
for (i in 0 until totalTasks) {
- backgroundPool.submit(Callable { throttle.execute() })
+ backgroundPool.submit(Callable { throttled.clone().execute() })
}
try {
@@ -93,7 +89,7 @@ class ThrottledCallTest {
// Step 4: submit something beyond our limits
val future = backgroundPool.submit(Callable {
try {
- throttle.execute()
+ throttled.execute()
} catch (e: IOException) {
throw RuntimeException(e)
} finally {
@@ -125,7 +121,7 @@ class ThrottledCallTest {
val call = FakeCall()
call.overCapacity = true
- val throttle = ThrottledCall(createPool(1, 1), mockLimiter(listener), Supplier { call })
+ val throttle = ThrottledCall(executor, mockLimiter(listener), call)
try {
throttle.execute()
assertThat(true).isFalse() // should raise a RejectedExecutionException
@@ -137,8 +133,7 @@ class ThrottledCallTest {
@Test fun execute_ignoresLimit_whenPoolFull() {
val listener: Listener = mock()
- val throttle =
- ThrottledCall(mockExhaustedPool(), mockLimiter(listener), Supplier { FakeCall() })
+ val throttle = ThrottledCall(mockExhaustedPool(), mockLimiter(listener), FakeCall())
try {
throttle.execute()
assertThat(true).isFalse() // should raise a RejectedExecutionException
@@ -148,14 +143,13 @@ class ThrottledCallTest {
}
@Test fun enqueue_isThrottled() {
- val numThreads = 1
val queueSize = 1
val totalTasks = numThreads + queueSize
+ limit.limit = totalTasks
val startLock = Semaphore(numThreads)
val waitLock = Semaphore(totalTasks)
- val throttle =
- createThrottle(numThreads, queueSize, Supplier { LockedCall(startLock, waitLock) })
+ val throttle = throttle(LockedCall(startLock, waitLock))
// Step 1: drain appropriate locks
startLock.drainPermits()
@@ -164,7 +158,7 @@ class ThrottledCallTest {
// Step 2: saturate threads and fill queue
val callback: Callback<Void> = mock()
for (i in 0 until totalTasks) {
- throttle.enqueue(callback)
+ throttle.clone().enqueue(callback)
}
// Step 3: make sure the threads actually started
@@ -172,7 +166,7 @@ class ThrottledCallTest {
try {
// Step 4: submit something beyond our limits and make sure it fails
- throttle.enqueue(callback)
+ throttle.clone().enqueue(callback)
assertThat(true).isFalse() // should raise a RejectedExecutionException
} catch (e: RejectedExecutionException) {
@@ -187,7 +181,7 @@ class ThrottledCallTest {
val call = FakeCall()
call.overCapacity = true
- val throttle = ThrottledCall(createPool(1, 1), mockLimiter(listener), Supplier { call })
+ val throttle = ThrottledCall(executor, mockLimiter(listener), call)
val latch = CountDownLatch(1)
throttle.enqueue(object : Callback<Void> {
override fun onSuccess(value: Void) {
@@ -207,7 +201,7 @@ class ThrottledCallTest {
val listener: Listener = mock()
val throttle =
- ThrottledCall(mockExhaustedPool(), mockLimiter(listener), Supplier { FakeCall() })
+ ThrottledCall(mockExhaustedPool(), mockLimiter(listener), FakeCall())
try {
throttle.enqueue(null)
assertThat(true).isFalse() // should raise a RejectedExecutionException
@@ -216,18 +210,7 @@ class ThrottledCallTest {
}
}
- private fun createThrottle(delegate: Supplier<Call<Void>>): ThrottledCall<Void> {
- return createThrottle(1, 1, delegate)
- }
-
- private fun createThrottle(
- poolSize: Int,
- queueSize: Int,
- delegate: Supplier<Call<Void>>
- ): ThrottledCall<Void> {
- limit.setLimit(limit.getLimit() + 1)
- return ThrottledCall(createPool(poolSize, queueSize), limiter, delegate)
- }
+ private fun throttle(delegate: Call<Void>) = ThrottledCall(executor, limiter, delegate)
private class LockedCall(val startLock: Semaphore, val waitLock: Semaphore) : Call.Base<Void>() {
override fun doExecute(): Void? {
@@ -252,11 +235,6 @@ class ThrottledCallTest {
override fun clone() = LockedCall(startLock, waitLock);
}
- private fun createPool(poolSize: Int, queueSize: Int): ExecutorService {
- return ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.DAYS,
- LinkedBlockingQueue(queueSize))
- }
-
private fun mockExhaustedPool(): ExecutorService {
val mock: ExecutorService = mock()
doThrow(RejectedExecutionException::class.java).`when`(mock).execute(any())
diff --git a/zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java b/zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java
index edac7a5..a883065 100644
--- a/zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java
+++ b/zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java
@@ -23,17 +23,21 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.junit.Before;
import org.junit.Test;
+import zipkin2.Call;
+import zipkin2.Callback;
import zipkin2.Endpoint;
import zipkin2.Span;
import zipkin2.internal.Trace;
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
import static zipkin2.TestObjects.BACKEND;
import static zipkin2.TestObjects.CLIENT_SPAN;
import static zipkin2.TestObjects.DAY;
@@ -106,6 +110,58 @@ public abstract class ITSpanStore {
allShouldWorkWhenEmpty();
}
+ @Test public void consumer_properlyImplementsCallContract_execute() throws IOException {
+ Call<Void> call = storage().spanConsumer().accept(asList(LOTS_OF_SPANS[0]));
+
+ // Ensure the implementation didn't accidentally do I/O at assembly time.
+ assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute()).isEmpty();
+ call.execute();
+
+ assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute())
+ .containsExactly(LOTS_OF_SPANS[0]);
+
+ try {
+ call.execute();
+ failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
+ } catch (IllegalStateException e) {
+ }
+
+ // no problem to clone a call
+ call.clone().execute();
+ }
+
+ @Test public void consumer_properlyImplementsCallContract_submit() throws Exception {
+ Call<Void> call = storage().spanConsumer().accept(asList(LOTS_OF_SPANS[0]));
+ // Ensure the implementation didn't accidentally do I/O at assembly time.
+ assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute()).isEmpty();
+
+ CountDownLatch latch = new CountDownLatch(1);
+ Callback<Void> callback = new Callback<Void>() {
+ @Override public void onSuccess(Void value) {
+ latch.countDown();
+ }
+
+ @Override public void onError(Throwable t) {
+ latch.countDown();
+ }
+ };
+
+ call.enqueue(callback);
+ latch.await();
+
+ assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute())
+ .containsExactly(LOTS_OF_SPANS[0]);
+
+ try {
+ call.enqueue(callback);
+ failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
+ } catch (IllegalStateException e) {
+ }
+
+ // no problem to clone a call
+ call.clone().execute();
+ }
+
/**
* Ideally, storage backends can deduplicate identical documents as this will prevent some
* analysis problems such as double-counting dependency links or other statistics. While this test
diff --git a/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java b/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java
index ce72238..88d2195 100644
--- a/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java
+++ b/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java
@@ -30,6 +30,7 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import zipkin2.Call;
+import zipkin2.Callback;
import zipkin2.DependencyLink;
import zipkin2.Endpoint;
import zipkin2.Span;
@@ -189,8 +190,11 @@ public final class InMemoryStorage extends StorageComponent implements SpanStore
autocompleteTags.clear();
}
- @Override
- public synchronized Call<Void> accept(List<Span> spans) {
+ @Override public Call<Void> accept(List<Span> spans) {
+ return new StoreSpansCall(spans);
+ }
+
+ synchronized void doAccept(List<Span> spans) {
int delta = spans.size();
int spansToRecover = (spansByTraceIdTimeStamp.size() + delta) - maxSpanCount;
evictToRecoverSpans(spansToRecover);
@@ -221,7 +225,36 @@ public final class InMemoryStorage extends StorageComponent implements SpanStore
}
}
}
- return Call.create(null /* Void == null */);
+ }
+
+ final class StoreSpansCall extends Call.Base<Void> {
+ final List<Span> spans;
+
+ StoreSpansCall(List<Span> spans) {
+ this.spans = spans;
+ }
+
+ @Override protected Void doExecute() {
+ doAccept(spans);
+ return null;
+ }
+
+ @Override protected void doEnqueue(Callback<Void> callback) {
+ try {
+ callback.onSuccess(doExecute());
+ } catch (RuntimeException | Error e) {
+ Call.propagateIfFatal(e);
+ callback.onError(e);
+ }
+ }
+
+ @Override public Call<Void> clone() {
+ return new StoreSpansCall(spans);
+ }
+
+ @Override public String toString() {
+ return "StoreSpansCall{" + spans + "}";
+ }
}
/** Returns the count of spans evicted. */
diff --git a/zipkin/src/test/java/zipkin2/storage/InMemoryStorageTest.java b/zipkin/src/test/java/zipkin2/storage/InMemoryStorageTest.java
index 24f617f..7ed0d61 100644
--- a/zipkin/src/test/java/zipkin2/storage/InMemoryStorageTest.java
+++ b/zipkin/src/test/java/zipkin2/storage/InMemoryStorageTest.java
@@ -83,9 +83,9 @@ public class InMemoryStorageTest {
}
/** Ensures we don't overload a partition due to key equality being conflated with order */
- @Test public void differentiatesOnTraceIdWhenTimestampEqual() {
- storage.accept(asList(CLIENT_SPAN));
- storage.accept(asList(CLIENT_SPAN.toBuilder().traceId("333").build()));
+ @Test public void differentiatesOnTraceIdWhenTimestampEqual() throws IOException {
+ storage.accept(asList(CLIENT_SPAN)).execute();
+ storage.accept(asList(CLIENT_SPAN.toBuilder().traceId("333").build())).execute();
assertThat(storage).extracting("spansByTraceIdTimeStamp.delegate")
.allSatisfy(map -> assertThat((Map) map).hasSize(2));
@@ -100,8 +100,8 @@ public class InMemoryStorageTest {
.timestamp(TODAY * 1000)
.build();
- storage.accept(asList(span));
- storage.accept(asList(span));
+ storage.accept(asList(span)).execute();
+ storage.accept(asList(span)).execute();
assertThat(storage.getDependencies(TODAY + 1000L, TODAY).execute()).containsOnly(
DependencyLink.newBuilder().parent("kafka").child("app").callCount(1L).build()
@@ -119,7 +119,7 @@ public class InMemoryStorageTest {
.timestamp(TODAY * 1000)
.build();
- storage.accept(asList(span1, span2));
+ storage.accept(asList(span1, span2)).execute();
assertThat(storage.getSpanNames("app").execute()).containsOnly(
"root"
@@ -153,7 +153,7 @@ public class InMemoryStorageTest {
.putTag("http.path", "/users")
.timestamp(TODAY * 1000)
.build();
- storage.accept(asList(span1, span2, span3, span4));
+ storage.accept(asList(span1, span2, span3, span4)).execute();
assertThat(storage.getKeys().execute()).containsOnlyOnce("http.path");
assertThat(storage.getValues("http.path").execute()).containsOnlyOnce("/users");