You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zipkin.apache.org by ad...@apache.org on 2019/05/10 09:59:31 UTC

[incubator-zipkin] branch error-io-on-assembly created (now 8300976)

This is an automated email from the ASF dual-hosted git repository.

adriancole pushed a change to branch error-io-on-assembly
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git.


      at 8300976  Makes it an error to store during assembly of a call

This branch includes the following new commits:

     new 8300976  Makes it an error to store during assembly of a call

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-zipkin] 01/01: Makes it an error to store during assembly of a call

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adriancole pushed a commit to branch error-io-on-assembly
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git

commit 8300976933d3f6700c7dec8bf5f82754a5699e56
Author: Adrian Cole <ac...@pivotal.io>
AuthorDate: Fri May 10 17:57:51 2019 +0800

    Makes it an error to store during assembly of a call
    
    Before this, there was some extra code in the throttle package handling
    a bug in our in memory storage. This fixes that and removes the extra
    code.
    
    See #2502
---
 .../server/internal/throttle/ThrottledCall.java    | 59 +++++-------------
 .../throttle/ThrottledStorageComponent.java        |  4 +-
 .../server/internal/throttle/ThrottledCallTest.kt  | 72 ++++++++--------------
 .../src/main/java/zipkin2/storage/ITSpanStore.java | 56 +++++++++++++++++
 .../main/java/zipkin2/storage/InMemoryStorage.java | 39 +++++++++++-
 .../java/zipkin2/storage/InMemoryStorageTest.java  | 14 ++---
 6 files changed, 142 insertions(+), 102 deletions(-)

diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java
index f43d61e..67aa71d 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java
@@ -24,10 +24,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.function.Supplier;
 import zipkin2.Call;
 import zipkin2.Callback;
-import zipkin2.storage.InMemoryStorage;
 
 /**
  * {@link Call} implementation that is backed by an {@link ExecutorService}. The ExecutorService
@@ -41,39 +39,21 @@ import zipkin2.storage.InMemoryStorage;
  *
  * @see ThrottledStorageComponent
  */
-final class ThrottledCall<V> extends Call<V> {
+final class ThrottledCall<V> extends Call.Base<V> {
   final ExecutorService executor;
   final Limiter<Void> limiter;
-  final Listener limitListener;
-  /**
-   * supplier call needs to be supplied later to avoid having it take action when it is created
-   * (like {@link InMemoryStorage} and thus avoid being throttled.
-   */
-  final Supplier<? extends Call<V>> supplier;
-  volatile Call<V> delegate;
-  volatile boolean canceled;
-
-  public ThrottledCall(ExecutorService executor, Limiter<Void> limiter,
-    Supplier<? extends Call<V>> supplier) {
+  final Call<V> delegate;
+
+  ThrottledCall(ExecutorService executor, Limiter<Void> limiter, Call<V> delegate) {
     this.executor = executor;
     this.limiter = limiter;
-    this.limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);
-    this.supplier = supplier;
+    this.delegate = delegate;
   }
 
-  // TODO: refactor this when in-memory no longer executes storage ops during assembly time
-  ThrottledCall(ThrottledCall<V> other) {
-    this(other.executor, other.limiter,
-      other.delegate == null ? other.supplier : () -> other.delegate.clone());
-  }
+  @Override protected V doExecute() throws IOException {
+    Listener limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);
 
-  // TODO: we cannot currently extend Call.Base as tests execute the call multiple times,
-  // which is invalid as calls are one-shot. It isn't worth refactoring until we refactor out
-  // the need for assembly time throttling (fix to in-memory storage)
-  @Override public V execute() throws IOException {
     try {
-      delegate = supplier.get();
-
       // Make sure we throttle
       Future<V> future = executor.submit(() -> {
         String oldName = setCurrentThreadName(delegate.toString());
@@ -115,9 +95,11 @@ final class ThrottledCall<V> extends Call<V> {
     }
   }
 
-  @Override public void enqueue(Callback<V> callback) {
+  @Override protected void doEnqueue(Callback<V> callback) {
+    Listener limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);
+
     try {
-      executor.execute(new QueuedCall(callback));
+      executor.execute(new QueuedCall(callback, limitListener));
     } catch (RuntimeException | Error e) {
       propagateIfFatal(e);
       // Ignoring in all cases here because storage itself isn't saying we need to throttle.  Though, we may still be
@@ -127,21 +109,12 @@ final class ThrottledCall<V> extends Call<V> {
     }
   }
 
-  @Override public void cancel() {
-    canceled = true;
-    if (delegate != null) delegate.cancel();
-  }
-
-  @Override public boolean isCanceled() {
-    return canceled || (delegate != null && delegate.isCanceled());
-  }
-
   @Override public Call<V> clone() {
-    return new ThrottledCall<>(this);
+    return new ThrottledCall<>(executor, limiter, delegate.clone());
   }
 
   @Override public String toString() {
-    return "Throttled" + supplier;
+    return "Throttled" + delegate;
   }
 
   static String setCurrentThreadName(String name) {
@@ -153,17 +126,17 @@ final class ThrottledCall<V> extends Call<V> {
 
   final class QueuedCall implements Runnable {
     final Callback<V> callback;
+    final Listener limitListener;
 
-    QueuedCall(Callback<V> callback) {
+    QueuedCall(Callback<V> callback, Listener limitListener) {
       this.callback = callback;
+      this.limitListener = limitListener;
     }
 
     @Override public void run() {
       try {
         if (isCanceled()) return;
 
-        delegate = ThrottledCall.this.supplier.get();
-
         String oldName = setCurrentThreadName(delegate.toString());
         try {
           enqueueAndWait();
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java
index 91e7b78..1422232 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java
@@ -104,7 +104,7 @@ public final class ThrottledStorageComponent extends StorageComponent {
     return "Throttled" + delegate;
   }
 
-  final class ThrottledSpanConsumer implements SpanConsumer {
+  static final class ThrottledSpanConsumer implements SpanConsumer {
     final SpanConsumer delegate;
     final Limiter<Void> limiter;
     final ExecutorService executor;
@@ -116,7 +116,7 @@ public final class ThrottledStorageComponent extends StorageComponent {
     }
 
     @Override public Call<Void> accept(List<Span> spans) {
-      return new ThrottledCall<>(executor, limiter, () -> delegate.accept(spans));
+      return new ThrottledCall<>(executor, limiter, delegate.accept(spans));
     }
 
     @Override public String toString() {
diff --git a/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt
index 00eb02f..b729e9a 100644
--- a/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt
+++ b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt
@@ -21,6 +21,7 @@ import com.netflix.concurrency.limits.Limiter.Listener
 import com.netflix.concurrency.limits.limit.SettableLimit
 import com.netflix.concurrency.limits.limiter.SimpleLimiter
 import org.assertj.core.api.Assertions.assertThat
+import org.junit.After
 import org.junit.Test
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito
@@ -41,39 +42,34 @@ import java.util.concurrent.RejectedExecutionException
 import java.util.concurrent.Semaphore
 import java.util.concurrent.ThreadPoolExecutor
 import java.util.concurrent.TimeUnit
-import java.util.function.Supplier
 
-// TODO: this class re-uses Call objects which is bad as they are one-shot. This needs to be
-//  refactored in order to be realistic (calls throw if re-invoked, as clone() is the correct way)
 class ThrottledCallTest {
-  var limit = SettableLimit.startingAt(0)
-  var limiter = SimpleLimiter.newBuilder().limit(limit).build<Void>()
+  val limit = SettableLimit.startingAt(0)
+  val limiter = SimpleLimiter.newBuilder().limit(limit).build<Void>()
 
-  inline fun <reified T : Any> mock() = Mockito.mock(T::class.java)
+  val numThreads = 1
+  val executor = Executors.newSingleThreadExecutor();
+  @After fun shutdownExecutor() = executor.shutdown()
 
-  @Test fun callCreation_isDeferred() {
-    val created = booleanArrayOf(false)
+  inline fun <reified T : Any> mock() = Mockito.mock(T::class.java)
 
-    val throttle = createThrottle(Supplier {
-      created[0] = true
-      Call.create<Void>(null)
-    })
+  @Test fun niceToString() {
+    val delegate: Call<Void> = mock()
+    `when`(delegate.toString()).thenReturn("StoreSpansCall{}")
 
-    assertThat(created).contains(false)
-    throttle.execute()
-    assertThat(created).contains(true)
+    assertThat(ThrottledCall(executor, limiter, delegate))
+      .hasToString("ThrottledStoreSpansCall{}")
   }
 
   @Test fun execute_isThrottled() {
-    val numThreads = 1
     val queueSize = 1
     val totalTasks = numThreads + queueSize
+    limit.limit = totalTasks
 
     val startLock = Semaphore(numThreads)
     val waitLock = Semaphore(totalTasks)
     val failLock = Semaphore(1)
-    val throttle =
-      createThrottle(numThreads, queueSize, Supplier { LockedCall(startLock, waitLock) })
+    val throttled = throttle(LockedCall(startLock, waitLock))
 
     // Step 1: drain appropriate locks
     startLock.drainPermits()
@@ -83,7 +79,7 @@ class ThrottledCallTest {
     // Step 2: saturate threads and fill queue
     val backgroundPool = Executors.newCachedThreadPool()
     for (i in 0 until totalTasks) {
-      backgroundPool.submit(Callable { throttle.execute() })
+      backgroundPool.submit(Callable { throttled.clone().execute() })
     }
 
     try {
@@ -93,7 +89,7 @@ class ThrottledCallTest {
       // Step 4: submit something beyond our limits
       val future = backgroundPool.submit(Callable {
         try {
-          throttle.execute()
+          throttled.execute()
         } catch (e: IOException) {
           throw RuntimeException(e)
         } finally {
@@ -125,7 +121,7 @@ class ThrottledCallTest {
     val call = FakeCall()
     call.overCapacity = true
 
-    val throttle = ThrottledCall(createPool(1, 1), mockLimiter(listener), Supplier { call })
+    val throttle = ThrottledCall(executor, mockLimiter(listener), call)
     try {
       throttle.execute()
       assertThat(true).isFalse() // should raise a RejectedExecutionException
@@ -137,8 +133,7 @@ class ThrottledCallTest {
   @Test fun execute_ignoresLimit_whenPoolFull() {
     val listener: Listener = mock()
 
-    val throttle =
-      ThrottledCall(mockExhaustedPool(), mockLimiter(listener), Supplier { FakeCall() })
+    val throttle = ThrottledCall(mockExhaustedPool(), mockLimiter(listener), FakeCall())
     try {
       throttle.execute()
       assertThat(true).isFalse() // should raise a RejectedExecutionException
@@ -148,14 +143,13 @@ class ThrottledCallTest {
   }
 
   @Test fun enqueue_isThrottled() {
-    val numThreads = 1
     val queueSize = 1
     val totalTasks = numThreads + queueSize
+    limit.limit = totalTasks
 
     val startLock = Semaphore(numThreads)
     val waitLock = Semaphore(totalTasks)
-    val throttle =
-      createThrottle(numThreads, queueSize, Supplier { LockedCall(startLock, waitLock) })
+    val throttle = throttle(LockedCall(startLock, waitLock))
 
     // Step 1: drain appropriate locks
     startLock.drainPermits()
@@ -164,7 +158,7 @@ class ThrottledCallTest {
     // Step 2: saturate threads and fill queue
     val callback: Callback<Void> = mock()
     for (i in 0 until totalTasks) {
-      throttle.enqueue(callback)
+      throttle.clone().enqueue(callback)
     }
 
     // Step 3: make sure the threads actually started
@@ -172,7 +166,7 @@ class ThrottledCallTest {
 
     try {
       // Step 4: submit something beyond our limits and make sure it fails
-      throttle.enqueue(callback)
+      throttle.clone().enqueue(callback)
 
       assertThat(true).isFalse() // should raise a RejectedExecutionException
     } catch (e: RejectedExecutionException) {
@@ -187,7 +181,7 @@ class ThrottledCallTest {
     val call = FakeCall()
     call.overCapacity = true
 
-    val throttle = ThrottledCall(createPool(1, 1), mockLimiter(listener), Supplier { call })
+    val throttle = ThrottledCall(executor, mockLimiter(listener), call)
     val latch = CountDownLatch(1)
     throttle.enqueue(object : Callback<Void> {
       override fun onSuccess(value: Void) {
@@ -207,7 +201,7 @@ class ThrottledCallTest {
     val listener: Listener = mock()
 
     val throttle =
-      ThrottledCall(mockExhaustedPool(), mockLimiter(listener), Supplier { FakeCall() })
+      ThrottledCall(mockExhaustedPool(), mockLimiter(listener), FakeCall())
     try {
       throttle.enqueue(null)
       assertThat(true).isFalse() // should raise a RejectedExecutionException
@@ -216,18 +210,7 @@ class ThrottledCallTest {
     }
   }
 
-  private fun createThrottle(delegate: Supplier<Call<Void>>): ThrottledCall<Void> {
-    return createThrottle(1, 1, delegate)
-  }
-
-  private fun createThrottle(
-    poolSize: Int,
-    queueSize: Int,
-    delegate: Supplier<Call<Void>>
-  ): ThrottledCall<Void> {
-    limit.setLimit(limit.getLimit() + 1)
-    return ThrottledCall(createPool(poolSize, queueSize), limiter, delegate)
-  }
+  private fun throttle(delegate: Call<Void>) = ThrottledCall(executor, limiter, delegate)
 
   private class LockedCall(val startLock: Semaphore, val waitLock: Semaphore) : Call.Base<Void>() {
     override fun doExecute(): Void? {
@@ -252,11 +235,6 @@ class ThrottledCallTest {
     override fun clone() = LockedCall(startLock, waitLock);
   }
 
-  private fun createPool(poolSize: Int, queueSize: Int): ExecutorService {
-    return ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.DAYS,
-      LinkedBlockingQueue(queueSize))
-  }
-
   private fun mockExhaustedPool(): ExecutorService {
     val mock: ExecutorService = mock()
     doThrow(RejectedExecutionException::class.java).`when`(mock).execute(any())
diff --git a/zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java b/zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java
index edac7a5..a883065 100644
--- a/zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java
+++ b/zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java
@@ -23,17 +23,21 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.junit.Before;
 import org.junit.Test;
+import zipkin2.Call;
+import zipkin2.Callback;
 import zipkin2.Endpoint;
 import zipkin2.Span;
 import zipkin2.internal.Trace;
 
 import static java.util.Arrays.asList;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
 import static zipkin2.TestObjects.BACKEND;
 import static zipkin2.TestObjects.CLIENT_SPAN;
 import static zipkin2.TestObjects.DAY;
@@ -106,6 +110,58 @@ public abstract class ITSpanStore {
     allShouldWorkWhenEmpty();
   }
 
+  @Test public void consumer_properlyImplementsCallContract_execute() throws IOException {
+    Call<Void> call = storage().spanConsumer().accept(asList(LOTS_OF_SPANS[0]));
+
+    // Ensure the implementation didn't accidentally do I/O at assembly time.
+    assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute()).isEmpty();
+    call.execute();
+
+    assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute())
+      .containsExactly(LOTS_OF_SPANS[0]);
+
+    try {
+      call.execute();
+      failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
+    } catch (IllegalStateException e) {
+    }
+
+    // no problem to clone a call
+    call.clone().execute();
+  }
+
+  @Test public void consumer_properlyImplementsCallContract_submit() throws Exception {
+    Call<Void> call = storage().spanConsumer().accept(asList(LOTS_OF_SPANS[0]));
+    // Ensure the implementation didn't accidentally do I/O at assembly time.
+    assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute()).isEmpty();
+
+    CountDownLatch latch = new CountDownLatch(1);
+    Callback<Void> callback = new Callback<Void>() {
+      @Override public void onSuccess(Void value) {
+        latch.countDown();
+      }
+
+      @Override public void onError(Throwable t) {
+        latch.countDown();
+      }
+    };
+
+    call.enqueue(callback);
+    latch.await();
+
+    assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute())
+      .containsExactly(LOTS_OF_SPANS[0]);
+
+    try {
+      call.enqueue(callback);
+      failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
+    } catch (IllegalStateException e) {
+    }
+
+    // no problem to clone a call
+    call.clone().execute();
+  }
+
   /**
    * Ideally, storage backends can deduplicate identical documents as this will prevent some
    * analysis problems such as double-counting dependency links or other statistics. While this test
diff --git a/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java b/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java
index ce72238..88d2195 100644
--- a/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java
+++ b/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import zipkin2.Call;
+import zipkin2.Callback;
 import zipkin2.DependencyLink;
 import zipkin2.Endpoint;
 import zipkin2.Span;
@@ -189,8 +190,11 @@ public final class InMemoryStorage extends StorageComponent implements SpanStore
     autocompleteTags.clear();
   }
 
-  @Override
-  public synchronized Call<Void> accept(List<Span> spans) {
+  @Override public Call<Void> accept(List<Span> spans) {
+    return new StoreSpansCall(spans);
+  }
+
+  synchronized void doAccept(List<Span> spans) {
     int delta = spans.size();
     int spansToRecover = (spansByTraceIdTimeStamp.size() + delta) - maxSpanCount;
     evictToRecoverSpans(spansToRecover);
@@ -221,7 +225,36 @@ public final class InMemoryStorage extends StorageComponent implements SpanStore
         }
       }
     }
-    return Call.create(null /* Void == null */);
+  }
+
+  final class StoreSpansCall extends Call.Base<Void> {
+    final List<Span> spans;
+
+    StoreSpansCall(List<Span> spans) {
+      this.spans = spans;
+    }
+
+    @Override protected Void doExecute() {
+      doAccept(spans);
+      return null;
+    }
+
+    @Override protected void doEnqueue(Callback<Void> callback) {
+      try {
+        callback.onSuccess(doExecute());
+      } catch (RuntimeException | Error e) {
+        Call.propagateIfFatal(e);
+        callback.onError(e);
+      }
+    }
+
+    @Override public Call<Void> clone() {
+      return new StoreSpansCall(spans);
+    }
+
+    @Override public String toString() {
+      return "StoreSpansCall{" + spans + "}";
+    }
   }
 
   /** Returns the count of spans evicted. */
diff --git a/zipkin/src/test/java/zipkin2/storage/InMemoryStorageTest.java b/zipkin/src/test/java/zipkin2/storage/InMemoryStorageTest.java
index 24f617f..7ed0d61 100644
--- a/zipkin/src/test/java/zipkin2/storage/InMemoryStorageTest.java
+++ b/zipkin/src/test/java/zipkin2/storage/InMemoryStorageTest.java
@@ -83,9 +83,9 @@ public class InMemoryStorageTest {
   }
 
   /** Ensures we don't overload a partition due to key equality being conflated with order */
-  @Test public void differentiatesOnTraceIdWhenTimestampEqual() {
-    storage.accept(asList(CLIENT_SPAN));
-    storage.accept(asList(CLIENT_SPAN.toBuilder().traceId("333").build()));
+  @Test public void differentiatesOnTraceIdWhenTimestampEqual() throws IOException {
+    storage.accept(asList(CLIENT_SPAN)).execute();
+    storage.accept(asList(CLIENT_SPAN.toBuilder().traceId("333").build())).execute();
 
     assertThat(storage).extracting("spansByTraceIdTimeStamp.delegate")
       .allSatisfy(map -> assertThat((Map) map).hasSize(2));
@@ -100,8 +100,8 @@ public class InMemoryStorageTest {
       .timestamp(TODAY * 1000)
       .build();
 
-    storage.accept(asList(span));
-    storage.accept(asList(span));
+    storage.accept(asList(span)).execute();
+    storage.accept(asList(span)).execute();
 
     assertThat(storage.getDependencies(TODAY + 1000L, TODAY).execute()).containsOnly(
       DependencyLink.newBuilder().parent("kafka").child("app").callCount(1L).build()
@@ -119,7 +119,7 @@ public class InMemoryStorageTest {
       .timestamp(TODAY * 1000)
       .build();
 
-    storage.accept(asList(span1, span2));
+    storage.accept(asList(span1, span2)).execute();
 
     assertThat(storage.getSpanNames("app").execute()).containsOnly(
       "root"
@@ -153,7 +153,7 @@ public class InMemoryStorageTest {
       .putTag("http.path", "/users")
       .timestamp(TODAY * 1000)
       .build();
-    storage.accept(asList(span1, span2, span3, span4));
+    storage.accept(asList(span1, span2, span3, span4)).execute();
 
     assertThat(storage.getKeys().execute()).containsOnlyOnce("http.path");
     assertThat(storage.getValues("http.path").execute()).containsOnlyOnce("/users");