You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zipkin.apache.org by ad...@apache.org on 2019/05/10 09:59:32 UTC

[incubator-zipkin] 01/01: Makes it an error to store during assembly of a call

This is an automated email from the ASF dual-hosted git repository.

adriancole pushed a commit to branch error-io-on-assembly
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git

commit 8300976933d3f6700c7dec8bf5f82754a5699e56
Author: Adrian Cole <ac...@pivotal.io>
AuthorDate: Fri May 10 17:57:51 2019 +0800

    Makes it an error to store during assembly of a call
    
    Before this, there was some extra code in the throttle package handling
    a bug in our in memory storage. This fixes that and removes the extra
    code.
    
    See #2502
---
 .../server/internal/throttle/ThrottledCall.java    | 59 +++++-------------
 .../throttle/ThrottledStorageComponent.java        |  4 +-
 .../server/internal/throttle/ThrottledCallTest.kt  | 72 ++++++++--------------
 .../src/main/java/zipkin2/storage/ITSpanStore.java | 56 +++++++++++++++++
 .../main/java/zipkin2/storage/InMemoryStorage.java | 39 +++++++++++-
 .../java/zipkin2/storage/InMemoryStorageTest.java  | 14 ++---
 6 files changed, 142 insertions(+), 102 deletions(-)

diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java
index f43d61e..67aa71d 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java
@@ -24,10 +24,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.function.Supplier;
 import zipkin2.Call;
 import zipkin2.Callback;
-import zipkin2.storage.InMemoryStorage;
 
 /**
  * {@link Call} implementation that is backed by an {@link ExecutorService}. The ExecutorService
@@ -41,39 +39,21 @@ import zipkin2.storage.InMemoryStorage;
  *
  * @see ThrottledStorageComponent
  */
-final class ThrottledCall<V> extends Call<V> {
+final class ThrottledCall<V> extends Call.Base<V> {
   final ExecutorService executor;
   final Limiter<Void> limiter;
-  final Listener limitListener;
-  /**
-   * supplier call needs to be supplied later to avoid having it take action when it is created
-   * (like {@link InMemoryStorage} and thus avoid being throttled.
-   */
-  final Supplier<? extends Call<V>> supplier;
-  volatile Call<V> delegate;
-  volatile boolean canceled;
-
-  public ThrottledCall(ExecutorService executor, Limiter<Void> limiter,
-    Supplier<? extends Call<V>> supplier) {
+  final Call<V> delegate;
+
+  ThrottledCall(ExecutorService executor, Limiter<Void> limiter, Call<V> delegate) {
     this.executor = executor;
     this.limiter = limiter;
-    this.limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);
-    this.supplier = supplier;
+    this.delegate = delegate;
   }
 
-  // TODO: refactor this when in-memory no longer executes storage ops during assembly time
-  ThrottledCall(ThrottledCall<V> other) {
-    this(other.executor, other.limiter,
-      other.delegate == null ? other.supplier : () -> other.delegate.clone());
-  }
+  @Override protected V doExecute() throws IOException {
+    Listener limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);
 
-  // TODO: we cannot currently extend Call.Base as tests execute the call multiple times,
-  // which is invalid as calls are one-shot. It isn't worth refactoring until we refactor out
-  // the need for assembly time throttling (fix to in-memory storage)
-  @Override public V execute() throws IOException {
     try {
-      delegate = supplier.get();
-
       // Make sure we throttle
       Future<V> future = executor.submit(() -> {
         String oldName = setCurrentThreadName(delegate.toString());
@@ -115,9 +95,11 @@ final class ThrottledCall<V> extends Call<V> {
     }
   }
 
-  @Override public void enqueue(Callback<V> callback) {
+  @Override protected void doEnqueue(Callback<V> callback) {
+    Listener limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);
+
     try {
-      executor.execute(new QueuedCall(callback));
+      executor.execute(new QueuedCall(callback, limitListener));
     } catch (RuntimeException | Error e) {
       propagateIfFatal(e);
       // Ignoring in all cases here because storage itself isn't saying we need to throttle.  Though, we may still be
@@ -127,21 +109,12 @@ final class ThrottledCall<V> extends Call<V> {
     }
   }
 
-  @Override public void cancel() {
-    canceled = true;
-    if (delegate != null) delegate.cancel();
-  }
-
-  @Override public boolean isCanceled() {
-    return canceled || (delegate != null && delegate.isCanceled());
-  }
-
   @Override public Call<V> clone() {
-    return new ThrottledCall<>(this);
+    return new ThrottledCall<>(executor, limiter, delegate.clone());
   }
 
   @Override public String toString() {
-    return "Throttled" + supplier;
+    return "Throttled" + delegate;
   }
 
   static String setCurrentThreadName(String name) {
@@ -153,17 +126,17 @@ final class ThrottledCall<V> extends Call<V> {
 
   final class QueuedCall implements Runnable {
     final Callback<V> callback;
+    final Listener limitListener;
 
-    QueuedCall(Callback<V> callback) {
+    QueuedCall(Callback<V> callback, Listener limitListener) {
       this.callback = callback;
+      this.limitListener = limitListener;
     }
 
     @Override public void run() {
       try {
         if (isCanceled()) return;
 
-        delegate = ThrottledCall.this.supplier.get();
-
         String oldName = setCurrentThreadName(delegate.toString());
         try {
           enqueueAndWait();
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java
index 91e7b78..1422232 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java
@@ -104,7 +104,7 @@ public final class ThrottledStorageComponent extends StorageComponent {
     return "Throttled" + delegate;
   }
 
-  final class ThrottledSpanConsumer implements SpanConsumer {
+  static final class ThrottledSpanConsumer implements SpanConsumer {
     final SpanConsumer delegate;
     final Limiter<Void> limiter;
     final ExecutorService executor;
@@ -116,7 +116,7 @@ public final class ThrottledStorageComponent extends StorageComponent {
     }
 
     @Override public Call<Void> accept(List<Span> spans) {
-      return new ThrottledCall<>(executor, limiter, () -> delegate.accept(spans));
+      return new ThrottledCall<>(executor, limiter, delegate.accept(spans));
     }
 
     @Override public String toString() {
diff --git a/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt
index 00eb02f..b729e9a 100644
--- a/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt
+++ b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt
@@ -21,6 +21,7 @@ import com.netflix.concurrency.limits.Limiter.Listener
 import com.netflix.concurrency.limits.limit.SettableLimit
 import com.netflix.concurrency.limits.limiter.SimpleLimiter
 import org.assertj.core.api.Assertions.assertThat
+import org.junit.After
 import org.junit.Test
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito
@@ -41,39 +42,34 @@ import java.util.concurrent.RejectedExecutionException
 import java.util.concurrent.Semaphore
 import java.util.concurrent.ThreadPoolExecutor
 import java.util.concurrent.TimeUnit
-import java.util.function.Supplier
 
-// TODO: this class re-uses Call objects which is bad as they are one-shot. This needs to be
-//  refactored in order to be realistic (calls throw if re-invoked, as clone() is the correct way)
 class ThrottledCallTest {
-  var limit = SettableLimit.startingAt(0)
-  var limiter = SimpleLimiter.newBuilder().limit(limit).build<Void>()
+  val limit = SettableLimit.startingAt(0)
+  val limiter = SimpleLimiter.newBuilder().limit(limit).build<Void>()
 
-  inline fun <reified T : Any> mock() = Mockito.mock(T::class.java)
+  val numThreads = 1
+  val executor = Executors.newSingleThreadExecutor();
+  @After fun shutdownExecutor() = executor.shutdown()
 
-  @Test fun callCreation_isDeferred() {
-    val created = booleanArrayOf(false)
+  inline fun <reified T : Any> mock() = Mockito.mock(T::class.java)
 
-    val throttle = createThrottle(Supplier {
-      created[0] = true
-      Call.create<Void>(null)
-    })
+  @Test fun niceToString() {
+    val delegate: Call<Void> = mock()
+    `when`(delegate.toString()).thenReturn("StoreSpansCall{}")
 
-    assertThat(created).contains(false)
-    throttle.execute()
-    assertThat(created).contains(true)
+    assertThat(ThrottledCall(executor, limiter, delegate))
+      .hasToString("ThrottledStoreSpansCall{}")
   }
 
   @Test fun execute_isThrottled() {
-    val numThreads = 1
     val queueSize = 1
     val totalTasks = numThreads + queueSize
+    limit.limit = totalTasks
 
     val startLock = Semaphore(numThreads)
     val waitLock = Semaphore(totalTasks)
     val failLock = Semaphore(1)
-    val throttle =
-      createThrottle(numThreads, queueSize, Supplier { LockedCall(startLock, waitLock) })
+    val throttled = throttle(LockedCall(startLock, waitLock))
 
     // Step 1: drain appropriate locks
     startLock.drainPermits()
@@ -83,7 +79,7 @@ class ThrottledCallTest {
     // Step 2: saturate threads and fill queue
     val backgroundPool = Executors.newCachedThreadPool()
     for (i in 0 until totalTasks) {
-      backgroundPool.submit(Callable { throttle.execute() })
+      backgroundPool.submit(Callable { throttled.clone().execute() })
     }
 
     try {
@@ -93,7 +89,7 @@ class ThrottledCallTest {
       // Step 4: submit something beyond our limits
       val future = backgroundPool.submit(Callable {
         try {
-          throttle.execute()
+          throttled.execute()
         } catch (e: IOException) {
           throw RuntimeException(e)
         } finally {
@@ -125,7 +121,7 @@ class ThrottledCallTest {
     val call = FakeCall()
     call.overCapacity = true
 
-    val throttle = ThrottledCall(createPool(1, 1), mockLimiter(listener), Supplier { call })
+    val throttle = ThrottledCall(executor, mockLimiter(listener), call)
     try {
       throttle.execute()
       assertThat(true).isFalse() // should raise a RejectedExecutionException
@@ -137,8 +133,7 @@ class ThrottledCallTest {
   @Test fun execute_ignoresLimit_whenPoolFull() {
     val listener: Listener = mock()
 
-    val throttle =
-      ThrottledCall(mockExhaustedPool(), mockLimiter(listener), Supplier { FakeCall() })
+    val throttle = ThrottledCall(mockExhaustedPool(), mockLimiter(listener), FakeCall())
     try {
       throttle.execute()
       assertThat(true).isFalse() // should raise a RejectedExecutionException
@@ -148,14 +143,13 @@ class ThrottledCallTest {
   }
 
   @Test fun enqueue_isThrottled() {
-    val numThreads = 1
     val queueSize = 1
     val totalTasks = numThreads + queueSize
+    limit.limit = totalTasks
 
     val startLock = Semaphore(numThreads)
     val waitLock = Semaphore(totalTasks)
-    val throttle =
-      createThrottle(numThreads, queueSize, Supplier { LockedCall(startLock, waitLock) })
+    val throttle = throttle(LockedCall(startLock, waitLock))
 
     // Step 1: drain appropriate locks
     startLock.drainPermits()
@@ -164,7 +158,7 @@ class ThrottledCallTest {
     // Step 2: saturate threads and fill queue
     val callback: Callback<Void> = mock()
     for (i in 0 until totalTasks) {
-      throttle.enqueue(callback)
+      throttle.clone().enqueue(callback)
     }
 
     // Step 3: make sure the threads actually started
@@ -172,7 +166,7 @@ class ThrottledCallTest {
 
     try {
       // Step 4: submit something beyond our limits and make sure it fails
-      throttle.enqueue(callback)
+      throttle.clone().enqueue(callback)
 
       assertThat(true).isFalse() // should raise a RejectedExecutionException
     } catch (e: RejectedExecutionException) {
@@ -187,7 +181,7 @@ class ThrottledCallTest {
     val call = FakeCall()
     call.overCapacity = true
 
-    val throttle = ThrottledCall(createPool(1, 1), mockLimiter(listener), Supplier { call })
+    val throttle = ThrottledCall(executor, mockLimiter(listener), call)
     val latch = CountDownLatch(1)
     throttle.enqueue(object : Callback<Void> {
       override fun onSuccess(value: Void) {
@@ -207,7 +201,7 @@ class ThrottledCallTest {
     val listener: Listener = mock()
 
     val throttle =
-      ThrottledCall(mockExhaustedPool(), mockLimiter(listener), Supplier { FakeCall() })
+      ThrottledCall(mockExhaustedPool(), mockLimiter(listener), FakeCall())
     try {
       throttle.enqueue(null)
       assertThat(true).isFalse() // should raise a RejectedExecutionException
@@ -216,18 +210,7 @@ class ThrottledCallTest {
     }
   }
 
-  private fun createThrottle(delegate: Supplier<Call<Void>>): ThrottledCall<Void> {
-    return createThrottle(1, 1, delegate)
-  }
-
-  private fun createThrottle(
-    poolSize: Int,
-    queueSize: Int,
-    delegate: Supplier<Call<Void>>
-  ): ThrottledCall<Void> {
-    limit.setLimit(limit.getLimit() + 1)
-    return ThrottledCall(createPool(poolSize, queueSize), limiter, delegate)
-  }
+  private fun throttle(delegate: Call<Void>) = ThrottledCall(executor, limiter, delegate)
 
   private class LockedCall(val startLock: Semaphore, val waitLock: Semaphore) : Call.Base<Void>() {
     override fun doExecute(): Void? {
@@ -252,11 +235,6 @@ class ThrottledCallTest {
     override fun clone() = LockedCall(startLock, waitLock);
   }
 
-  private fun createPool(poolSize: Int, queueSize: Int): ExecutorService {
-    return ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.DAYS,
-      LinkedBlockingQueue(queueSize))
-  }
-
   private fun mockExhaustedPool(): ExecutorService {
     val mock: ExecutorService = mock()
     doThrow(RejectedExecutionException::class.java).`when`(mock).execute(any())
diff --git a/zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java b/zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java
index edac7a5..a883065 100644
--- a/zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java
+++ b/zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java
@@ -23,17 +23,21 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.junit.Before;
 import org.junit.Test;
+import zipkin2.Call;
+import zipkin2.Callback;
 import zipkin2.Endpoint;
 import zipkin2.Span;
 import zipkin2.internal.Trace;
 
 import static java.util.Arrays.asList;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
 import static zipkin2.TestObjects.BACKEND;
 import static zipkin2.TestObjects.CLIENT_SPAN;
 import static zipkin2.TestObjects.DAY;
@@ -106,6 +110,58 @@ public abstract class ITSpanStore {
     allShouldWorkWhenEmpty();
   }
 
+  @Test public void consumer_properlyImplementsCallContract_execute() throws IOException {
+    Call<Void> call = storage().spanConsumer().accept(asList(LOTS_OF_SPANS[0]));
+
+    // Ensure the implementation didn't accidentally do I/O at assembly time.
+    assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute()).isEmpty();
+    call.execute();
+
+    assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute())
+      .containsExactly(LOTS_OF_SPANS[0]);
+
+    try {
+      call.execute();
+      failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
+    } catch (IllegalStateException e) {
+    }
+
+    // no problem to clone a call
+    call.clone().execute();
+  }
+
+  @Test public void consumer_properlyImplementsCallContract_submit() throws Exception {
+    Call<Void> call = storage().spanConsumer().accept(asList(LOTS_OF_SPANS[0]));
+    // Ensure the implementation didn't accidentally do I/O at assembly time.
+    assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute()).isEmpty();
+
+    CountDownLatch latch = new CountDownLatch(1);
+    Callback<Void> callback = new Callback<Void>() {
+      @Override public void onSuccess(Void value) {
+        latch.countDown();
+      }
+
+      @Override public void onError(Throwable t) {
+        latch.countDown();
+      }
+    };
+
+    call.enqueue(callback);
+    latch.await();
+
+    assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute())
+      .containsExactly(LOTS_OF_SPANS[0]);
+
+    try {
+      call.enqueue(callback);
+      failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
+    } catch (IllegalStateException e) {
+    }
+
+    // no problem to clone a call
+    call.clone().execute();
+  }
+
   /**
    * Ideally, storage backends can deduplicate identical documents as this will prevent some
    * analysis problems such as double-counting dependency links or other statistics. While this test
diff --git a/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java b/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java
index ce72238..88d2195 100644
--- a/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java
+++ b/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import zipkin2.Call;
+import zipkin2.Callback;
 import zipkin2.DependencyLink;
 import zipkin2.Endpoint;
 import zipkin2.Span;
@@ -189,8 +190,11 @@ public final class InMemoryStorage extends StorageComponent implements SpanStore
     autocompleteTags.clear();
   }
 
-  @Override
-  public synchronized Call<Void> accept(List<Span> spans) {
+  @Override public Call<Void> accept(List<Span> spans) {
+    return new StoreSpansCall(spans);
+  }
+
+  synchronized void doAccept(List<Span> spans) {
     int delta = spans.size();
     int spansToRecover = (spansByTraceIdTimeStamp.size() + delta) - maxSpanCount;
     evictToRecoverSpans(spansToRecover);
@@ -221,7 +225,36 @@ public final class InMemoryStorage extends StorageComponent implements SpanStore
         }
       }
     }
-    return Call.create(null /* Void == null */);
+  }
+
+  final class StoreSpansCall extends Call.Base<Void> {
+    final List<Span> spans;
+
+    StoreSpansCall(List<Span> spans) {
+      this.spans = spans;
+    }
+
+    @Override protected Void doExecute() {
+      doAccept(spans);
+      return null;
+    }
+
+    @Override protected void doEnqueue(Callback<Void> callback) {
+      try {
+        callback.onSuccess(doExecute());
+      } catch (RuntimeException | Error e) {
+        Call.propagateIfFatal(e);
+        callback.onError(e);
+      }
+    }
+
+    @Override public Call<Void> clone() {
+      return new StoreSpansCall(spans);
+    }
+
+    @Override public String toString() {
+      return "StoreSpansCall{" + spans + "}";
+    }
   }
 
   /** Returns the count of spans evicted. */
diff --git a/zipkin/src/test/java/zipkin2/storage/InMemoryStorageTest.java b/zipkin/src/test/java/zipkin2/storage/InMemoryStorageTest.java
index 24f617f..7ed0d61 100644
--- a/zipkin/src/test/java/zipkin2/storage/InMemoryStorageTest.java
+++ b/zipkin/src/test/java/zipkin2/storage/InMemoryStorageTest.java
@@ -83,9 +83,9 @@ public class InMemoryStorageTest {
   }
 
   /** Ensures we don't overload a partition due to key equality being conflated with order */
-  @Test public void differentiatesOnTraceIdWhenTimestampEqual() {
-    storage.accept(asList(CLIENT_SPAN));
-    storage.accept(asList(CLIENT_SPAN.toBuilder().traceId("333").build()));
+  @Test public void differentiatesOnTraceIdWhenTimestampEqual() throws IOException {
+    storage.accept(asList(CLIENT_SPAN)).execute();
+    storage.accept(asList(CLIENT_SPAN.toBuilder().traceId("333").build())).execute();
 
     assertThat(storage).extracting("spansByTraceIdTimeStamp.delegate")
       .allSatisfy(map -> assertThat((Map) map).hasSize(2));
@@ -100,8 +100,8 @@ public class InMemoryStorageTest {
       .timestamp(TODAY * 1000)
       .build();
 
-    storage.accept(asList(span));
-    storage.accept(asList(span));
+    storage.accept(asList(span)).execute();
+    storage.accept(asList(span)).execute();
 
     assertThat(storage.getDependencies(TODAY + 1000L, TODAY).execute()).containsOnly(
       DependencyLink.newBuilder().parent("kafka").child("app").callCount(1L).build()
@@ -119,7 +119,7 @@ public class InMemoryStorageTest {
       .timestamp(TODAY * 1000)
       .build();
 
-    storage.accept(asList(span1, span2));
+    storage.accept(asList(span1, span2)).execute();
 
     assertThat(storage.getSpanNames("app").execute()).containsOnly(
       "root"
@@ -153,7 +153,7 @@ public class InMemoryStorageTest {
       .putTag("http.path", "/users")
       .timestamp(TODAY * 1000)
       .build();
-    storage.accept(asList(span1, span2, span3, span4));
+    storage.accept(asList(span1, span2, span3, span4)).execute();
 
     assertThat(storage.getKeys().execute()).containsOnlyOnce("http.path");
     assertThat(storage.getValues("http.path").execute()).containsOnlyOnce("/users");