You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zipkin.apache.org by ad...@apache.org on 2019/05/11 02:25:30 UTC
[incubator-zipkin] branch master updated: Makes it an error to
store during assembly of a call (#2580)
This is an automated email from the ASF dual-hosted git repository.
adriancole pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git
The following commit(s) were added to refs/heads/master by this push:
new 862e3f3 Makes it an error to store during assembly of a call (#2580)
862e3f3 is described below
commit 862e3f3d6979cd66fa75f83e4def1e3bbcf1f3b0
Author: Adrian Cole <ad...@users.noreply.github.com>
AuthorDate: Sat May 11 10:25:25 2019 +0800
Makes it an error to store during assembly of a call (#2580)
Before this, there was some extra code in the throttle package handling
a bug in our in memory storage. This fixes that and removes the extra
code.
See #2502
---
.../internal/BulkRequestBenchmarks.java | 11 +-
.../ZipkinElasticsearchStorageProperties.java | 2 +-
.../server/internal/throttle/ThrottledCall.java | 83 ++++++---------
.../throttle/ThrottledStorageComponent.java | 8 +-
.../server/internal/throttle/ThrottledCallTest.kt | 72 +++++--------
.../throttle/ThrottledStorageComponentTest.kt | 2 +-
.../elasticsearch/internal/BulkCallBuilder.java | 113 ++++++++++++---------
.../elasticsearch/internal/client/HttpCall.java | 17 ++--
.../zipkin2/elasticsearch/InternalForTests.java | 9 +-
.../internal/client/HttpCallTest.java | 19 ++++
.../src/main/java/zipkin2/storage/ITSpanStore.java | 56 ++++++++++
.../main/java/zipkin2/storage/InMemoryStorage.java | 39 ++++++-
.../java/zipkin2/storage/InMemoryStorageTest.java | 14 +--
13 files changed, 266 insertions(+), 179 deletions(-)
diff --git a/benchmarks/src/main/java/zipkin2/elasticsearch/internal/BulkRequestBenchmarks.java b/benchmarks/src/main/java/zipkin2/elasticsearch/internal/BulkRequestBenchmarks.java
index 210edac..ea2b5cb 100644
--- a/benchmarks/src/main/java/zipkin2/elasticsearch/internal/BulkRequestBenchmarks.java
+++ b/benchmarks/src/main/java/zipkin2/elasticsearch/internal/BulkRequestBenchmarks.java
@@ -38,6 +38,7 @@ import zipkin2.Span;
import zipkin2.codec.CodecBenchmarks;
import zipkin2.codec.SpanBytesDecoder;
import zipkin2.elasticsearch.ElasticsearchStorage;
+import zipkin2.elasticsearch.internal.BulkCallBuilder.IndexEntry;
@Measurement(iterations = 5, time = 1)
@Warmup(iterations = 10, time = 1)
@@ -50,18 +51,24 @@ public class BulkRequestBenchmarks {
static final Span CLIENT_SPAN = SpanBytesDecoder.JSON_V2.decodeOne(read("/zipkin2-client.json"));
final ElasticsearchStorage es = ElasticsearchStorage.newBuilder().build();
- final BulkCallBuilder builder = new BulkCallBuilder(es, 6.7f, "index-span");
-
final long indexTimestamp = CLIENT_SPAN.timestampAsLong() / 1000L;
final String spanIndex =
es.indexNameFormatter().formatTypeAndTimestampForInsert("span", '-', indexTimestamp);
+ final IndexEntry<Span> entry =
+ BulkCallBuilder.newIndexEntry(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);
+
+ @Benchmark public void writeRequest_singleSpan() throws IOException {
+ BulkCallBuilder.write(Okio.buffer(Okio.blackhole()), entry, true);
+ }
@Benchmark public void buildAndWriteRequest_singleSpan() throws IOException {
+ BulkCallBuilder builder = new BulkCallBuilder(es, 6.7f, "index-span");
builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);
builder.build().call.request().body().writeTo(Okio.buffer(Okio.blackhole()));
}
@Benchmark public void buildAndWriteRequest_tenSpans() throws IOException {
+ BulkCallBuilder builder = new BulkCallBuilder(es, 6.7f, "index-span");
for (int i = 0; i < 10; i++) {
builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);
}
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java b/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java
index a2fb450..4f367ff 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java
@@ -66,7 +66,7 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar
ZipkinElasticsearchStorageProperties(
@Value("${zipkin.storage.throttle.enabled:false}") boolean throttleEnabled,
- @Value("${zipkin.storage.throttle.maxConcurrency:200}") int throttleMaxConcurrency) {
+ @Value("${zipkin.storage.throttle.max-concurrency:200}") int throttleMaxConcurrency) {
if (throttleEnabled) {
this.throttleMaxConcurrency = throttleMaxConcurrency;
}
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java
index f43d61e..066042c 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java
@@ -24,10 +24,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
-import java.util.function.Supplier;
import zipkin2.Call;
import zipkin2.Callback;
-import zipkin2.storage.InMemoryStorage;
/**
* {@link Call} implementation that is backed by an {@link ExecutorService}. The ExecutorService
@@ -41,39 +39,21 @@ import zipkin2.storage.InMemoryStorage;
*
* @see ThrottledStorageComponent
*/
-final class ThrottledCall<V> extends Call<V> {
+final class ThrottledCall<V> extends Call.Base<V> {
final ExecutorService executor;
final Limiter<Void> limiter;
- final Listener limitListener;
- /**
- * supplier call needs to be supplied later to avoid having it take action when it is created
- * (like {@link InMemoryStorage} and thus avoid being throttled.
- */
- final Supplier<? extends Call<V>> supplier;
- volatile Call<V> delegate;
- volatile boolean canceled;
-
- public ThrottledCall(ExecutorService executor, Limiter<Void> limiter,
- Supplier<? extends Call<V>> supplier) {
+ final Call<V> delegate;
+
+ ThrottledCall(ExecutorService executor, Limiter<Void> limiter, Call<V> delegate) {
this.executor = executor;
this.limiter = limiter;
- this.limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);
- this.supplier = supplier;
+ this.delegate = delegate;
}
- // TODO: refactor this when in-memory no longer executes storage ops during assembly time
- ThrottledCall(ThrottledCall<V> other) {
- this(other.executor, other.limiter,
- other.delegate == null ? other.supplier : () -> other.delegate.clone());
- }
+ @Override protected V doExecute() throws IOException {
+ Listener limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);
- // TODO: we cannot currently extend Call.Base as tests execute the call multiple times,
- // which is invalid as calls are one-shot. It isn't worth refactoring until we refactor out
- // the need for assembly time throttling (fix to in-memory storage)
- @Override public V execute() throws IOException {
try {
- delegate = supplier.get();
-
// Make sure we throttle
Future<V> future = executor.submit(() -> {
String oldName = setCurrentThreadName(delegate.toString());
@@ -115,9 +95,11 @@ final class ThrottledCall<V> extends Call<V> {
}
}
- @Override public void enqueue(Callback<V> callback) {
+ @Override protected void doEnqueue(Callback<V> callback) {
+ Listener limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);
+
try {
- executor.execute(new QueuedCall(callback));
+ executor.execute(new QueuedCall<>(delegate, callback, limitListener));
} catch (RuntimeException | Error e) {
propagateIfFatal(e);
// Ignoring in all cases here because storage itself isn't saying we need to throttle. Though, we may still be
@@ -127,21 +109,12 @@ final class ThrottledCall<V> extends Call<V> {
}
}
- @Override public void cancel() {
- canceled = true;
- if (delegate != null) delegate.cancel();
- }
-
- @Override public boolean isCanceled() {
- return canceled || (delegate != null && delegate.isCanceled());
- }
-
@Override public Call<V> clone() {
- return new ThrottledCall<>(this);
+ return new ThrottledCall<>(executor, limiter, delegate.clone());
}
@Override public String toString() {
- return "Throttled" + supplier;
+ return "Throttled(" + delegate + ")";
}
static String setCurrentThreadName(String name) {
@@ -151,18 +124,20 @@ final class ThrottledCall<V> extends Call<V> {
return originalName;
}
- final class QueuedCall implements Runnable {
+ static final class QueuedCall<V> implements Runnable {
+ final Call<V> delegate;
final Callback<V> callback;
+ final Listener limitListener;
- QueuedCall(Callback<V> callback) {
+ QueuedCall(Call<V> delegate, Callback<V> callback, Listener limitListener) {
+ this.delegate = delegate;
this.callback = callback;
+ this.limitListener = limitListener;
}
@Override public void run() {
try {
- if (isCanceled()) return;
-
- delegate = ThrottledCall.this.supplier.get();
+ if (delegate.isCanceled()) return;
String oldName = setCurrentThreadName(delegate.toString());
try {
@@ -185,15 +160,19 @@ final class ThrottledCall<V> extends Call<V> {
// This ensures we don't exceed our throttle/queue limits.
throttleCallback.await();
}
+
+ @Override public String toString() {
+ return "QueuedCall{delegate=" + delegate + ", callback=" + callback + "}";
+ }
}
static final class ThrottledCallback<V> implements Callback<V> {
- final Callback<V> supplier;
+ final Callback<V> delegate;
final Listener limitListener;
final CountDownLatch latch = new CountDownLatch(1);
- ThrottledCallback(Callback<V> supplier, Listener limitListener) {
- this.supplier = supplier;
+ ThrottledCallback(Callback<V> delegate, Listener limitListener) {
+ this.delegate = delegate;
this.limitListener = limitListener;
}
@@ -210,7 +189,7 @@ final class ThrottledCall<V> extends Call<V> {
@Override public void onSuccess(V value) {
try {
limitListener.onSuccess();
- supplier.onSuccess(value);
+ delegate.onSuccess(value);
} finally {
latch.countDown();
}
@@ -224,10 +203,14 @@ final class ThrottledCall<V> extends Call<V> {
limitListener.onIgnore();
}
- supplier.onError(t);
+ delegate.onError(t);
} finally {
latch.countDown();
}
}
+
+ @Override public String toString() {
+ return "Throttled(" + delegate + ")";
+ }
}
}
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java
index 91e7b78..42dc85c 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java
@@ -101,10 +101,10 @@ public final class ThrottledStorageComponent extends StorageComponent {
}
@Override public String toString() {
- return "Throttled" + delegate;
+ return "Throttled(" + delegate + ")";
}
- final class ThrottledSpanConsumer implements SpanConsumer {
+ static final class ThrottledSpanConsumer implements SpanConsumer {
final SpanConsumer delegate;
final Limiter<Void> limiter;
final ExecutorService executor;
@@ -116,11 +116,11 @@ public final class ThrottledStorageComponent extends StorageComponent {
}
@Override public Call<Void> accept(List<Span> spans) {
- return new ThrottledCall<>(executor, limiter, () -> delegate.accept(spans));
+ return new ThrottledCall<>(executor, limiter, delegate.accept(spans));
}
@Override public String toString() {
- return "Throttled" + delegate;
+ return "Throttled(" + delegate + ")";
}
}
diff --git a/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt
index 00eb02f..bce9bf6 100644
--- a/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt
+++ b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt
@@ -21,6 +21,7 @@ import com.netflix.concurrency.limits.Limiter.Listener
import com.netflix.concurrency.limits.limit.SettableLimit
import com.netflix.concurrency.limits.limiter.SimpleLimiter
import org.assertj.core.api.Assertions.assertThat
+import org.junit.After
import org.junit.Test
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito
@@ -41,39 +42,34 @@ import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.Semaphore
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
-import java.util.function.Supplier
-// TODO: this class re-uses Call objects which is bad as they are one-shot. This needs to be
-// refactored in order to be realistic (calls throw if re-invoked, as clone() is the correct way)
class ThrottledCallTest {
- var limit = SettableLimit.startingAt(0)
- var limiter = SimpleLimiter.newBuilder().limit(limit).build<Void>()
+ val limit = SettableLimit.startingAt(0)
+ val limiter = SimpleLimiter.newBuilder().limit(limit).build<Void>()
- inline fun <reified T : Any> mock() = Mockito.mock(T::class.java)
+ val numThreads = 1
+ val executor = Executors.newSingleThreadExecutor();
+ @After fun shutdownExecutor() = executor.shutdown()
- @Test fun callCreation_isDeferred() {
- val created = booleanArrayOf(false)
+ inline fun <reified T : Any> mock() = Mockito.mock(T::class.java)
- val throttle = createThrottle(Supplier {
- created[0] = true
- Call.create<Void>(null)
- })
+ @Test fun niceToString() {
+ val delegate: Call<Void> = mock()
+ `when`(delegate.toString()).thenReturn("StoreSpansCall{}")
- assertThat(created).contains(false)
- throttle.execute()
- assertThat(created).contains(true)
+ assertThat(ThrottledCall(executor, limiter, delegate))
+ .hasToString("Throttled(StoreSpansCall{})")
}
@Test fun execute_isThrottled() {
- val numThreads = 1
val queueSize = 1
val totalTasks = numThreads + queueSize
+ limit.limit = totalTasks
val startLock = Semaphore(numThreads)
val waitLock = Semaphore(totalTasks)
val failLock = Semaphore(1)
- val throttle =
- createThrottle(numThreads, queueSize, Supplier { LockedCall(startLock, waitLock) })
+ val throttled = throttle(LockedCall(startLock, waitLock))
// Step 1: drain appropriate locks
startLock.drainPermits()
@@ -83,7 +79,7 @@ class ThrottledCallTest {
// Step 2: saturate threads and fill queue
val backgroundPool = Executors.newCachedThreadPool()
for (i in 0 until totalTasks) {
- backgroundPool.submit(Callable { throttle.execute() })
+ backgroundPool.submit(Callable { throttled.clone().execute() })
}
try {
@@ -93,7 +89,7 @@ class ThrottledCallTest {
// Step 4: submit something beyond our limits
val future = backgroundPool.submit(Callable {
try {
- throttle.execute()
+ throttled.execute()
} catch (e: IOException) {
throw RuntimeException(e)
} finally {
@@ -125,7 +121,7 @@ class ThrottledCallTest {
val call = FakeCall()
call.overCapacity = true
- val throttle = ThrottledCall(createPool(1, 1), mockLimiter(listener), Supplier { call })
+ val throttle = ThrottledCall(executor, mockLimiter(listener), call)
try {
throttle.execute()
assertThat(true).isFalse() // should raise a RejectedExecutionException
@@ -137,8 +133,7 @@ class ThrottledCallTest {
@Test fun execute_ignoresLimit_whenPoolFull() {
val listener: Listener = mock()
- val throttle =
- ThrottledCall(mockExhaustedPool(), mockLimiter(listener), Supplier { FakeCall() })
+ val throttle = ThrottledCall(mockExhaustedPool(), mockLimiter(listener), FakeCall())
try {
throttle.execute()
assertThat(true).isFalse() // should raise a RejectedExecutionException
@@ -148,14 +143,13 @@ class ThrottledCallTest {
}
@Test fun enqueue_isThrottled() {
- val numThreads = 1
val queueSize = 1
val totalTasks = numThreads + queueSize
+ limit.limit = totalTasks
val startLock = Semaphore(numThreads)
val waitLock = Semaphore(totalTasks)
- val throttle =
- createThrottle(numThreads, queueSize, Supplier { LockedCall(startLock, waitLock) })
+ val throttle = throttle(LockedCall(startLock, waitLock))
// Step 1: drain appropriate locks
startLock.drainPermits()
@@ -164,7 +158,7 @@ class ThrottledCallTest {
// Step 2: saturate threads and fill queue
val callback: Callback<Void> = mock()
for (i in 0 until totalTasks) {
- throttle.enqueue(callback)
+ throttle.clone().enqueue(callback)
}
// Step 3: make sure the threads actually started
@@ -172,7 +166,7 @@ class ThrottledCallTest {
try {
// Step 4: submit something beyond our limits and make sure it fails
- throttle.enqueue(callback)
+ throttle.clone().enqueue(callback)
assertThat(true).isFalse() // should raise a RejectedExecutionException
} catch (e: RejectedExecutionException) {
@@ -187,7 +181,7 @@ class ThrottledCallTest {
val call = FakeCall()
call.overCapacity = true
- val throttle = ThrottledCall(createPool(1, 1), mockLimiter(listener), Supplier { call })
+ val throttle = ThrottledCall(executor, mockLimiter(listener), call)
val latch = CountDownLatch(1)
throttle.enqueue(object : Callback<Void> {
override fun onSuccess(value: Void) {
@@ -207,7 +201,7 @@ class ThrottledCallTest {
val listener: Listener = mock()
val throttle =
- ThrottledCall(mockExhaustedPool(), mockLimiter(listener), Supplier { FakeCall() })
+ ThrottledCall(mockExhaustedPool(), mockLimiter(listener), FakeCall())
try {
throttle.enqueue(null)
assertThat(true).isFalse() // should raise a RejectedExecutionException
@@ -216,18 +210,7 @@ class ThrottledCallTest {
}
}
- private fun createThrottle(delegate: Supplier<Call<Void>>): ThrottledCall<Void> {
- return createThrottle(1, 1, delegate)
- }
-
- private fun createThrottle(
- poolSize: Int,
- queueSize: Int,
- delegate: Supplier<Call<Void>>
- ): ThrottledCall<Void> {
- limit.setLimit(limit.getLimit() + 1)
- return ThrottledCall(createPool(poolSize, queueSize), limiter, delegate)
- }
+ private fun throttle(delegate: Call<Void>) = ThrottledCall(executor, limiter, delegate)
private class LockedCall(val startLock: Semaphore, val waitLock: Semaphore) : Call.Base<Void>() {
override fun doExecute(): Void? {
@@ -252,11 +235,6 @@ class ThrottledCallTest {
override fun clone() = LockedCall(startLock, waitLock);
}
- private fun createPool(poolSize: Int, queueSize: Int): ExecutorService {
- return ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.DAYS,
- LinkedBlockingQueue(queueSize))
- }
-
private fun mockExhaustedPool(): ExecutorService {
val mock: ExecutorService = mock()
doThrow(RejectedExecutionException::class.java).`when`(mock).execute(any())
diff --git a/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.kt b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.kt
index 705967a..2a804df 100644
--- a/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.kt
+++ b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.kt
@@ -46,6 +46,6 @@ class ThrottledStorageComponentTest {
@Test fun niceToString() {
assertThat(ThrottledStorageComponent(delegate, registry, 1, 2, 1))
- .hasToString("ThrottledInMemoryStorage{traceCount=0}");
+ .hasToString("Throttled(InMemoryStorage{traceCount=0})");
}
}
diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java
index e5d7c6f..2361907 100644
--- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java
+++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java
@@ -16,8 +16,11 @@
*/
package zipkin2.elasticsearch.internal;
+import com.google.auto.value.AutoValue;
import com.squareup.moshi.JsonWriter;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
@@ -42,7 +45,7 @@ public final class BulkCallBuilder {
final boolean waitForRefresh;
// Mutated for each call to index
- final Buffer buffer = new Buffer();
+ final List<IndexEntry<?>> entries = new ArrayList<>();
public BulkCallBuilder(ElasticsearchStorage es, float esVersion, String tag) {
this.tag = tag;
@@ -52,45 +55,23 @@ public final class BulkCallBuilder {
waitForRefresh = es.flushOnWrites();
}
- enum CheckForErrors implements HttpCall.BodyConverter<Void> {
- INSTANCE;
+ static <T> IndexEntry<T> newIndexEntry(String index, String typeName, T input,
+ BulkIndexWriter<T> writer) {
+ return new AutoValue_BulkCallBuilder_IndexEntry<>(index, typeName, input, writer);
+ }
- @Override public Void convert(BufferedSource b) throws IOException {
- String content = b.readUtf8();
- if (content.contains("\"status\":429")) throw new RejectedExecutionException(content);
- if (content.contains("\"errors\":true")) throw new IllegalStateException(content);
- return null;
- }
+ @AutoValue static abstract class IndexEntry<T> {
+ abstract String index();
- @Override public String toString() {
- return "CheckForErrors";
- }
- }
+ abstract String typeName();
- public <T> void index(String index, String typeName, T input, BulkIndexWriter<T> writer) {
- Buffer document = new Buffer();
- String id = writer.writeDocument(input, document);
- writeIndexMetadata(buffer, index, typeName, id);
- buffer.writeByte('\n');
- buffer.write(document, document.size());
- buffer.writeByte('\n');
+ abstract T input();
+
+ abstract BulkIndexWriter<T> writer();
}
- void writeIndexMetadata(Buffer indexBuffer, String index, String typeName, String id) {
- JsonWriter jsonWriter = JsonWriter.of(indexBuffer);
- try {
- jsonWriter.beginObject();
- jsonWriter.name("index");
- jsonWriter.beginObject();
- jsonWriter.name("_index").value(index);
- // the _type parameter is needed for Elasticsearch < 6.x
- if (shouldAddType) jsonWriter.name("_type").value(typeName);
- jsonWriter.name("_id").value(id);
- jsonWriter.endObject();
- jsonWriter.endObject();
- } catch (IOException e) {
- throw new AssertionError(e); // No I/O writing to a Buffer.
- }
+ public <T> void index(String index, String typeName, T input, BulkIndexWriter<T> writer) {
+ entries.add(newIndexEntry(index, typeName, input, writer));
}
/** Creates a bulk request when there is more than one object to store */
@@ -99,36 +80,72 @@ public final class BulkCallBuilder {
if (pipeline != null) urlBuilder.addQueryParameter("pipeline", pipeline);
if (waitForRefresh) urlBuilder.addQueryParameter("refresh", "wait_for");
- RequestBody body = new BufferRequestBody(buffer);
+ RequestBody body = new BulkRequestBody(entries, shouldAddType);
Request request = new Request.Builder().url(urlBuilder.build()).tag(tag).post(body).build();
return http.newCall(request, CheckForErrors.INSTANCE);
}
/** This avoids allocating a large byte array (by using a poolable buffer instead). */
- static final class BufferRequestBody extends RequestBody {
- final long contentLength;
- final Buffer buffer;
+ static final class BulkRequestBody extends RequestBody {
+ final List<IndexEntry<?>> entries;
+ final boolean shouldAddType;
- BufferRequestBody(Buffer buffer) {
- this.contentLength = buffer.size();
- this.buffer = buffer;
+ BulkRequestBody(List<IndexEntry<?>> entries, boolean shouldAddType) {
+ this.entries = entries;
+ this.shouldAddType = shouldAddType;
}
@Override public MediaType contentType() {
return APPLICATION_JSON;
}
- @Override public long contentLength() {
- return contentLength;
+ @Override public void writeTo(BufferedSink sink) throws IOException {
+ for (int i = 0, length = entries.size(); i < length; i++) {
+ write(sink, entries.get(i), shouldAddType);
+ }
}
+ }
+
+ static void write(BufferedSink sink, IndexEntry entry, boolean shouldAddType) throws IOException {
+ Buffer document = new Buffer();
+ String id = entry.writer().writeDocument(entry.input(), document);
+ writeIndexMetadata(sink, entry, id, shouldAddType);
+ sink.writeByte('\n');
+ sink.write(document, document.size());
+ sink.writeByte('\n');
+ }
+
+ static void writeIndexMetadata(BufferedSink sink, IndexEntry entry, String id,
+ boolean shouldAddType) {
+ JsonWriter jsonWriter = JsonWriter.of(sink);
+ try {
+ jsonWriter.beginObject();
+ jsonWriter.name("index");
+ jsonWriter.beginObject();
+ jsonWriter.name("_index").value(entry.index());
+ // the _type parameter is needed for Elasticsearch < 6.x
+ if (shouldAddType) jsonWriter.name("_type").value(entry.typeName());
+ jsonWriter.name("_id").value(id);
+ jsonWriter.endObject();
+ jsonWriter.endObject();
+ } catch (IOException e) {
+ throw new AssertionError(e); // No I/O writing to a Buffer.
+ }
+ }
- @Override public boolean isOneShot() {
- return true;
+ enum CheckForErrors implements HttpCall.BodyConverter<Void> {
+ INSTANCE;
+
+ @Override public Void convert(BufferedSource b) throws IOException {
+ String content = b.readUtf8();
+ if (content.contains("\"status\":429")) throw new RejectedExecutionException(content);
+ if (content.contains("\"errors\":true")) throw new IllegalStateException(content);
+ return null;
}
- @Override public void writeTo(BufferedSink sink) throws IOException {
- sink.write(buffer, contentLength);
+ @Override public String toString() {
+ return "CheckForErrors";
}
}
}
diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java
index 507ad8e..736a960 100644
--- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java
+++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java
@@ -31,7 +31,7 @@ import okio.Okio;
import zipkin2.Call;
import zipkin2.Callback;
-public final class HttpCall<V> extends Call<V> {
+public final class HttpCall<V> extends Call.Base<V> {
public interface BodyConverter<V> {
V convert(BufferedSource content) throws IOException;
@@ -61,7 +61,6 @@ public final class HttpCall<V> extends Call<V> {
public final BodyConverter<V> bodyConverter;
final Semaphore semaphore;
-
HttpCall(Factory factory, Request request, BodyConverter<V> bodyConverter) {
this(
factory.ok.newCall(request),
@@ -76,7 +75,7 @@ public final class HttpCall<V> extends Call<V> {
this.bodyConverter = bodyConverter;
}
- @Override public V execute() throws IOException {
+ @Override protected V doExecute() throws IOException {
if (!semaphore.tryAcquire()) throw new IllegalStateException("over capacity");
try {
return parseResponse(call.execute(), bodyConverter);
@@ -85,22 +84,18 @@ public final class HttpCall<V> extends Call<V> {
}
}
- @Override public void enqueue(Callback<V> delegate) {
+ @Override protected void doEnqueue(Callback<V> callback) {
if (!semaphore.tryAcquire()) {
- delegate.onError(new IllegalStateException("over capacity"));
+ callback.onError(new IllegalStateException("over capacity"));
return;
}
- call.enqueue(new V2CallbackAdapter<>(semaphore, bodyConverter, delegate));
+ call.enqueue(new V2CallbackAdapter<>(semaphore, bodyConverter, callback));
}
- @Override public void cancel() {
+ @Override protected void doCancel() {
call.cancel();
}
- @Override public boolean isCanceled() {
- return call.isCanceled();
- }
-
@Override public HttpCall<V> clone() {
return new HttpCall<V>(call.clone(), semaphore, bodyConverter);
}
diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/InternalForTests.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/InternalForTests.java
index b221fa0..0847f7d 100644
--- a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/InternalForTests.java
+++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/InternalForTests.java
@@ -22,8 +22,8 @@ import java.io.UncheckedIOException;
import java.util.List;
import okio.BufferedSink;
import zipkin2.DependencyLink;
-import zipkin2.elasticsearch.internal.BulkIndexWriter;
import zipkin2.elasticsearch.internal.BulkCallBuilder;
+import zipkin2.elasticsearch.internal.BulkIndexWriter;
/** Package accessor for integration tests */
public class InternalForTests {
@@ -32,9 +32,8 @@ public class InternalForTests {
String index = ((ElasticsearchSpanConsumer) es.spanConsumer())
.formatTypeAndTimestampForInsert("dependency", midnightUTC);
BulkCallBuilder indexer = new BulkCallBuilder(es, es.version(), "indexlinks");
- for (DependencyLink link : links) {
- indexer.index(index, "dependency", link, DEPENDENCY_LINK_BULK_INDEX_SUPPORT);
- }
+ for (DependencyLink link : links)
+ indexer.index(index, "dependency", link, DEPENDENCY_LINK_WRITER);
try {
indexer.build().execute();
} catch (IOException e) {
@@ -42,7 +41,7 @@ public class InternalForTests {
}
}
- static final BulkIndexWriter<DependencyLink> DEPENDENCY_LINK_BULK_INDEX_SUPPORT =
+ static final BulkIndexWriter<DependencyLink> DEPENDENCY_LINK_WRITER =
new BulkIndexWriter<DependencyLink>() {
@Override public String writeDocument(DependencyLink link, BufferedSink sink) {
JsonWriter writer = JsonWriter.of(sink);
diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/client/HttpCallTest.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/client/HttpCallTest.java
index 18b3c97..9f3ed2e 100644
--- a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/client/HttpCallTest.java
+++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/client/HttpCallTest.java
@@ -93,6 +93,25 @@ public class HttpCallTest {
}
@Test
+ public void cloned() throws Exception {
+ mws.enqueue(new MockResponse());
+
+ Call<?> call = http.newCall(request, b -> null);
+ call.execute();
+
+ try {
+ call.execute();
+ failBecauseExceptionWasNotThrown(IllegalStateException.class);
+ } catch (IllegalStateException expected) {
+ assertThat(expected).isInstanceOf(IllegalStateException.class);
+ }
+
+ mws.enqueue(new MockResponse());
+
+ call.clone().execute();
+ }
+
+ @Test
public void executionException_httpFailure() throws Exception {
mws.enqueue(new MockResponse().setResponseCode(500));
diff --git a/zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java b/zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java
index edac7a5..a883065 100644
--- a/zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java
+++ b/zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java
@@ -23,17 +23,21 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.junit.Before;
import org.junit.Test;
+import zipkin2.Call;
+import zipkin2.Callback;
import zipkin2.Endpoint;
import zipkin2.Span;
import zipkin2.internal.Trace;
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
import static zipkin2.TestObjects.BACKEND;
import static zipkin2.TestObjects.CLIENT_SPAN;
import static zipkin2.TestObjects.DAY;
@@ -106,6 +110,58 @@ public abstract class ITSpanStore {
allShouldWorkWhenEmpty();
}
+ @Test public void consumer_properlyImplementsCallContract_execute() throws IOException {
+ Call<Void> call = storage().spanConsumer().accept(asList(LOTS_OF_SPANS[0]));
+
+ // Ensure the implementation didn't accidentally do I/O at assembly time.
+ assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute()).isEmpty();
+ call.execute();
+
+ assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute())
+ .containsExactly(LOTS_OF_SPANS[0]);
+
+ try {
+ call.execute();
+ failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
+ } catch (IllegalStateException e) {
+ }
+
+ // no problem to clone a call
+ call.clone().execute();
+ }
+
+ @Test public void consumer_properlyImplementsCallContract_submit() throws Exception {
+ Call<Void> call = storage().spanConsumer().accept(asList(LOTS_OF_SPANS[0]));
+ // Ensure the implementation didn't accidentally do I/O at assembly time.
+ assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute()).isEmpty();
+
+ CountDownLatch latch = new CountDownLatch(1);
+ Callback<Void> callback = new Callback<Void>() {
+ @Override public void onSuccess(Void value) {
+ latch.countDown();
+ }
+
+ @Override public void onError(Throwable t) {
+ latch.countDown();
+ }
+ };
+
+ call.enqueue(callback);
+ latch.await();
+
+ assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute())
+ .containsExactly(LOTS_OF_SPANS[0]);
+
+ try {
+ call.enqueue(callback);
+ failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
+ } catch (IllegalStateException e) {
+ }
+
+ // no problem to clone a call
+ call.clone().execute();
+ }
+
/**
* Ideally, storage backends can deduplicate identical documents as this will prevent some
* analysis problems such as double-counting dependency links or other statistics. While this test
diff --git a/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java b/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java
index ce72238..88d2195 100644
--- a/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java
+++ b/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java
@@ -30,6 +30,7 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import zipkin2.Call;
+import zipkin2.Callback;
import zipkin2.DependencyLink;
import zipkin2.Endpoint;
import zipkin2.Span;
@@ -189,8 +190,11 @@ public final class InMemoryStorage extends StorageComponent implements SpanStore
autocompleteTags.clear();
}
- @Override
- public synchronized Call<Void> accept(List<Span> spans) {
+ @Override public Call<Void> accept(List<Span> spans) {
+ return new StoreSpansCall(spans);
+ }
+
+ synchronized void doAccept(List<Span> spans) {
int delta = spans.size();
int spansToRecover = (spansByTraceIdTimeStamp.size() + delta) - maxSpanCount;
evictToRecoverSpans(spansToRecover);
@@ -221,7 +225,36 @@ public final class InMemoryStorage extends StorageComponent implements SpanStore
}
}
}
- return Call.create(null /* Void == null */);
+ }
+
+ final class StoreSpansCall extends Call.Base<Void> {
+ final List<Span> spans;
+
+ StoreSpansCall(List<Span> spans) {
+ this.spans = spans;
+ }
+
+ @Override protected Void doExecute() {
+ doAccept(spans);
+ return null;
+ }
+
+ @Override protected void doEnqueue(Callback<Void> callback) {
+ try {
+ callback.onSuccess(doExecute());
+ } catch (RuntimeException | Error e) {
+ Call.propagateIfFatal(e);
+ callback.onError(e);
+ }
+ }
+
+ @Override public Call<Void> clone() {
+ return new StoreSpansCall(spans);
+ }
+
+ @Override public String toString() {
+ return "StoreSpansCall{" + spans + "}";
+ }
}
/** Returns the count of spans evicted. */
diff --git a/zipkin/src/test/java/zipkin2/storage/InMemoryStorageTest.java b/zipkin/src/test/java/zipkin2/storage/InMemoryStorageTest.java
index 24f617f..7ed0d61 100644
--- a/zipkin/src/test/java/zipkin2/storage/InMemoryStorageTest.java
+++ b/zipkin/src/test/java/zipkin2/storage/InMemoryStorageTest.java
@@ -83,9 +83,9 @@ public class InMemoryStorageTest {
}
/** Ensures we don't overload a partition due to key equality being conflated with order */
- @Test public void differentiatesOnTraceIdWhenTimestampEqual() {
- storage.accept(asList(CLIENT_SPAN));
- storage.accept(asList(CLIENT_SPAN.toBuilder().traceId("333").build()));
+ @Test public void differentiatesOnTraceIdWhenTimestampEqual() throws IOException {
+ storage.accept(asList(CLIENT_SPAN)).execute();
+ storage.accept(asList(CLIENT_SPAN.toBuilder().traceId("333").build())).execute();
assertThat(storage).extracting("spansByTraceIdTimeStamp.delegate")
.allSatisfy(map -> assertThat((Map) map).hasSize(2));
@@ -100,8 +100,8 @@ public class InMemoryStorageTest {
.timestamp(TODAY * 1000)
.build();
- storage.accept(asList(span));
- storage.accept(asList(span));
+ storage.accept(asList(span)).execute();
+ storage.accept(asList(span)).execute();
assertThat(storage.getDependencies(TODAY + 1000L, TODAY).execute()).containsOnly(
DependencyLink.newBuilder().parent("kafka").child("app").callCount(1L).build()
@@ -119,7 +119,7 @@ public class InMemoryStorageTest {
.timestamp(TODAY * 1000)
.build();
- storage.accept(asList(span1, span2));
+ storage.accept(asList(span1, span2)).execute();
assertThat(storage.getSpanNames("app").execute()).containsOnly(
"root"
@@ -153,7 +153,7 @@ public class InMemoryStorageTest {
.putTag("http.path", "/users")
.timestamp(TODAY * 1000)
.build();
- storage.accept(asList(span1, span2, span3, span4));
+ storage.accept(asList(span1, span2, span3, span4)).execute();
assertThat(storage.getKeys().execute()).containsOnlyOnce("http.path");
assertThat(storage.getValues("http.path").execute()).containsOnlyOnce("/users");