You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zipkin.apache.org by ad...@apache.org on 2019/05/10 08:10:41 UTC
[incubator-zipkin] branch master updated: Adding storage-throttle
module to address "over capacity" issues (#2502)
This is an automated email from the ASF dual-hosted git repository.
adriancole pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git
The following commit(s) were added to refs/heads/master by this push:
new b3eefbe Adding storage-throttle module to address "over capacity" issues (#2502)
b3eefbe is described below
commit b3eefbee1aec34b1036b61913149d805712ea2fc
Author: Logic-32 <Lo...@users.noreply.github.com>
AuthorDate: Fri May 10 02:10:35 2019 -0600
Adding storage-throttle module to address "over capacity" issues (#2502)
Adding ThrottledStorageComponent/etc. to contain logic for wrapping other storage implementations and limiting the number of requests that can go through to them at a given time.
Elasticsearch storage's maxRequests can be override by throttle properties if the throttle is
enabled.
Inspired by work done on #2169.
---
.../test/java/zipkin2/collector/CollectorTest.java | 11 +
zipkin-server/README.md | 10 +
zipkin-server/pom.xml | 17 +-
.../internal/ConditionalOnThrottledStorage.java | 46 ++++
.../server/internal/ZipkinServerConfiguration.java | 44 ++++
.../ZipkinElasticsearchStorageProperties.java | 15 +-
.../internal/throttle/ActuateThrottleMetrics.java | 49 ++++
.../server/internal/throttle/ThrottledCall.java | 233 +++++++++++++++++
.../throttle/ThrottledStorageComponent.java | 203 +++++++++++++++
.../throttle/ZipkinStorageThrottleProperties.java | 69 +++++
.../src/main/resources/zipkin-server-shared.yml | 5 +
.../elasticsearch/BasicAuthInterceptorTest.kt | 2 +-
.../server/internal/throttle/ThrottledCallTest.kt | 289 +++++++++++++++++++++
.../throttle/ThrottledStorageComponentTest.kt | 51 ++++
.../elasticsearch/internal/client/HttpCall.java | 5 +
.../main/java/zipkin2/storage/InMemoryStorage.java | 4 +
16 files changed, 1049 insertions(+), 4 deletions(-)
diff --git a/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java b/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java
index aabcbcb..56f988c 100644
--- a/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java
+++ b/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java
@@ -31,6 +31,7 @@ import zipkin2.storage.InMemoryStorage;
import zipkin2.storage.StorageComponent;
import static java.util.Arrays.asList;
+import java.util.concurrent.RejectedExecutionException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@@ -186,6 +187,16 @@ public class CollectorTest {
}
@Test
+ public void errorAcceptingSpans_onErrorRejectedExecution() {
+ RuntimeException error = new RejectedExecutionException("slow down");
+ collector.handleStorageError(TRACE, error, callback);
+
+ verify(callback).onError(error);
+ assertThat(messages)
+ .containsOnly("Cannot store spans [1, 1, 2, ...] due to RejectedExecutionException(slow down)");
+ verify(metrics).incrementSpansDropped(4);
+ }
+
public void handleStorageError_onErrorWithNullMessage() {
RuntimeException error = new RuntimeException();
collector.handleStorageError(TRACE, error, callback);
diff --git a/zipkin-server/README.md b/zipkin-server/README.md
index 7e64055..5863d7b 100644
--- a/zipkin-server/README.md
+++ b/zipkin-server/README.md
@@ -157,6 +157,16 @@ Defaults to true
* `AUTOCOMPLETE_KEYS`: list of span tag keys which will be returned by the `/api/v2/autocompleteTags` endpoint
* `AUTOCOMPLETE_TTL`: How long in milliseconds to suppress calls to write the same autocomplete key/value pair. Default 3600000 (1 hr)
+### Throttled Storage (Experimental)
+These settings can be used to help tune the rate at which Zipkin flushes data to another, underlying `StorageComponent` (such as Elasticsearch):
+
+ * `STORAGE_THROTTLE_ENABLED`: Enables throttling
+ * `STORAGE_THROTTLE_MIN_CONCURRENCY`: Minimum number of Threads to use for writing to storage.
+ * `STORAGE_THROTTLE_MAX_CONCURRENCY`: Maximum number of Threads to use for writing to storage. In order to avoid configuration drift, this value may override other, storage-specific values such as Elasticsearch's `ES_MAX_REQUESTS`.
+ * `STORAGE_THROTTLE_MAX_QUEUE_SIZE`: How many messages to buffer while all Threads are writing data before abandoning a message (0 = no buffering).
+
+As this feature is experimental, it is not recommended to run this in production environments.
+
### Cassandra Storage
Zipkin's [Cassandra storage component](../zipkin-storage/cassandra)
supports version 3.11+ and applies when `STORAGE_TYPE` is set to `cassandra3`:
diff --git a/zipkin-server/pom.xml b/zipkin-server/pom.xml
index f17dd89..a456ddd 100644
--- a/zipkin-server/pom.xml
+++ b/zipkin-server/pom.xml
@@ -250,6 +250,17 @@
<version>${micrometer.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.netflix.concurrency-limits</groupId>
+ <artifactId>concurrency-limits-core</artifactId>
+ <version>0.2.2</version>
+ </dependency>
+ <dependency>
+ <groupId>io.micrometer</groupId>
+ <artifactId>micrometer-core</artifactId>
+ <version>${micrometer.version}</version>
+ </dependency>
+
<!-- Trace api controller activity with Brave -->
<dependency>
<groupId>io.zipkin.brave</groupId>
@@ -299,6 +310,11 @@
<version>2.4.0</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
@@ -372,7 +388,6 @@
<version>${kotlin.version}</version>
<configuration>
<jvmTarget>${main.java.version}</jvmTarget>
- <experimentalCoroutines>enable</experimentalCoroutines>
</configuration>
<executions>
<execution>
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/ConditionalOnThrottledStorage.java b/zipkin-server/src/main/java/zipkin2/server/internal/ConditionalOnThrottledStorage.java
new file mode 100644
index 0000000..0d7cb4e
--- /dev/null
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/ConditionalOnThrottledStorage.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.server.internal;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import org.springframework.boot.autoconfigure.condition.ConditionOutcome;
+import org.springframework.boot.autoconfigure.condition.SpringBootCondition;
+import org.springframework.context.annotation.ConditionContext;
+import org.springframework.context.annotation.Conditional;
+import org.springframework.core.type.AnnotatedTypeMetadata;
+
+@Conditional(ConditionalOnThrottledStorage.ThrottledStorageCondition.class)
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE, ElementType.METHOD})
+@interface ConditionalOnThrottledStorage {
+ class ThrottledStorageCondition extends SpringBootCondition {
+ @Override
+ public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata a) {
+ String throttleEnabled = context.getEnvironment()
+ .getProperty("zipkin.storage.throttle.enabled");
+
+ if (!Boolean.valueOf(throttleEnabled)) {
+ return ConditionOutcome.noMatch("zipkin.storage.throttle.enabled isn't true");
+ }
+
+ return ConditionOutcome.match();
+ }
+ }
+}
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java
index cb381a7..b3d4b90 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java
@@ -26,6 +26,9 @@ import com.linecorp.armeria.spring.actuate.ArmeriaSpringActuatorAutoConfiguratio
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.config.MeterFilter;
import java.util.List;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.BeanFactory;
+import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
@@ -33,20 +36,24 @@ import org.springframework.boot.actuate.autoconfigure.metrics.MeterRegistryCusto
import org.springframework.boot.actuate.health.HealthAggregator;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Lazy;
import org.springframework.core.annotation.Order;
import org.springframework.core.type.AnnotatedTypeMetadata;
import org.springframework.web.servlet.config.annotation.ViewControllerRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
+import zipkin2.server.internal.throttle.ZipkinStorageThrottleProperties;
import zipkin2.collector.CollectorMetrics;
import zipkin2.collector.CollectorSampler;
import zipkin2.server.internal.brave.TracingStorageComponent;
import zipkin2.storage.InMemoryStorage;
import zipkin2.storage.StorageComponent;
+import zipkin2.server.internal.throttle.ThrottledStorageComponent;
@Configuration
@ImportAutoConfiguration(ArmeriaSpringActuatorAutoConfiguration.class)
@@ -157,10 +164,47 @@ public class ZipkinServerConfiguration implements WebMvcConfigurer {
}
}
+ @Configuration
+ @EnableConfigurationProperties(ZipkinStorageThrottleProperties.class)
+ @ConditionalOnThrottledStorage
+ static class ThrottledStorageComponentEnhancer implements BeanPostProcessor, BeanFactoryAware {
+
+ /**
+ * Need this to resolve cyclic instantiation issue with spring. Mostly, this is for MeterRegistry as really
+ * bad things happen if you try to Autowire it (loss of JVM metrics) but also using it for properties just to make
+ * sure no cycles exist at all as a result of turning throttling on.
+ *
+ * <p>Ref: <a href="https://stackoverflow.com/a/19688634">Tracking down cause of Spring's "not eligible for auto-proxying"</a></p>
+ */
+ private BeanFactory beanFactory;
+
+ @Override
+ public Object postProcessAfterInitialization(Object bean, String beanName) {
+ if (bean instanceof StorageComponent) {
+ ZipkinStorageThrottleProperties throttleProperties = beanFactory.getBean(ZipkinStorageThrottleProperties.class);
+ return new ThrottledStorageComponent((StorageComponent) bean,
+ beanFactory.getBean(MeterRegistry.class),
+ throttleProperties.getMinConcurrency(),
+ throttleProperties.getMaxConcurrency(),
+ throttleProperties.getMaxQueueSize());
+ }
+ return bean;
+ }
+
+ @Override
+ public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
+ this.beanFactory = beanFactory;
+ }
+ }
+
/**
* This is a special-case configuration if there's no StorageComponent of any kind. In-Mem can
* supply both read apis, so we add two beans here.
+ *
+ * <p>Note: this needs to be {@link Lazy} to avoid circular dependency issues when using with
+ * {@link ThrottledStorageComponentEnhancer}.
*/
+ @Lazy
@Configuration
@Conditional(StorageTypeMemAbsentOrEmpty.class)
@ConditionalOnMissingBean(StorageComponent.class)
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java b/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java
index 8ba574f..a2fb450 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java
@@ -23,6 +23,7 @@ import java.util.logging.Logger;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import zipkin2.elasticsearch.ElasticsearchStorage;
@@ -40,8 +41,10 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar
private String index = "zipkin";
/** The date separator used to create the index name. Default to -. */
private String dateSeparator = "-";
- /** Sets maximum in-flight requests from this process to any Elasticsearch host. Defaults to 64 */
+ /** Sets maximum in-flight requests from this process to any Elasticsearch host. Defaults to 64 (overriden by throttle settings) */
private int maxRequests = 64;
+ /** Overrides maximum in-flight requests to match throttling settings if throttling is enabled. */
+ private Integer throttleMaxConcurrency;
/** Number of shards (horizontal scaling factor) per index. Defaults to 5. */
private int indexShards = 5;
/** Number of replicas (redundancy factor) per index. Defaults to 1.` */
@@ -61,6 +64,14 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar
*/
private int timeout = 10_000;
+ ZipkinElasticsearchStorageProperties(
+ @Value("${zipkin.storage.throttle.enabled:false}") boolean throttleEnabled,
+ @Value("${zipkin.storage.throttle.maxConcurrency:200}") int throttleMaxConcurrency) {
+ if (throttleEnabled) {
+ this.throttleMaxConcurrency = throttleMaxConcurrency;
+ }
+ }
+
public String getPipeline() {
return pipeline;
}
@@ -180,7 +191,7 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar
.index(index)
.dateSeparator(dateSeparator.isEmpty() ? 0 : dateSeparator.charAt(0))
.pipeline(pipeline)
- .maxRequests(maxRequests)
+ .maxRequests(throttleMaxConcurrency == null ? maxRequests : throttleMaxConcurrency)
.indexShards(indexShards)
.indexReplicas(indexReplicas);
}
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ActuateThrottleMetrics.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ActuateThrottleMetrics.java
new file mode 100644
index 0000000..55e7c36
--- /dev/null
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ActuateThrottleMetrics.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.server.internal.throttle;
+
+import com.netflix.concurrency.limits.limiter.AbstractLimiter;
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+import java.util.concurrent.ThreadPoolExecutor;
+import zipkin2.server.internal.ActuateCollectorMetrics;
+
+/** Follows the same naming convention as {@link ActuateCollectorMetrics} */
+final class ActuateThrottleMetrics {
+ final MeterRegistry registryInstance;
+
+ ActuateThrottleMetrics(MeterRegistry registryInstance) {
+ this.registryInstance = registryInstance;
+ }
+
+ void bind(ThreadPoolExecutor pool) {
+ Gauge.builder("zipkin_storage.throttle.concurrency", pool::getCorePoolSize)
+ .description("number of threads running storage requests")
+ .register(registryInstance);
+ Gauge.builder("zipkin_storage.throttle.queue_size", pool.getQueue()::size)
+ .description("number of items queued waiting for access to storage")
+ .register(registryInstance);
+ }
+
+ void bind(AbstractLimiter limiter) {
+ // This value should parallel (zipkin_storage.throttle.queue_size + zipkin_storage.throttle.concurrency)
+ // It is tracked to make sure it doesn't perpetually increase. If it does then we're not resolving LimitListeners.
+ Gauge.builder("zipkin_storage.throttle.in_flight_requests", limiter::getInflight)
+ .description("number of requests the limiter thinks are active")
+ .register(registryInstance);
+ }
+}
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java
new file mode 100644
index 0000000..f43d61e
--- /dev/null
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.server.internal.throttle;
+
+import com.netflix.concurrency.limits.Limiter;
+import com.netflix.concurrency.limits.Limiter.Listener;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.function.Supplier;
+import zipkin2.Call;
+import zipkin2.Callback;
+import zipkin2.storage.InMemoryStorage;
+
+/**
+ * {@link Call} implementation that is backed by an {@link ExecutorService}. The ExecutorService
+ * serves two purposes:
+ * <ol>
+ * <li>Limits the number of requests that can run in parallel.</li>
+ * <li>Depending on configuration, can queue up requests to make sure we don't aggressively drop
+ * requests that would otherwise succeed if given a moment. Bounded queues are safest for this as
+ * unbounded ones can lead to heap exhaustion and {@link OutOfMemoryError OOM errors}.</li>
+ * </ol>
+ *
+ * @see ThrottledStorageComponent
+ */
+final class ThrottledCall<V> extends Call<V> {
+ final ExecutorService executor;
+ final Limiter<Void> limiter;
+ final Listener limitListener;
+ /**
+ * supplier call needs to be supplied later to avoid having it take action when it is created
+ * (like {@link InMemoryStorage} and thus avoid being throttled.
+ */
+ final Supplier<? extends Call<V>> supplier;
+ volatile Call<V> delegate;
+ volatile boolean canceled;
+
+ public ThrottledCall(ExecutorService executor, Limiter<Void> limiter,
+ Supplier<? extends Call<V>> supplier) {
+ this.executor = executor;
+ this.limiter = limiter;
+ this.limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);
+ this.supplier = supplier;
+ }
+
+ // TODO: refactor this when in-memory no longer executes storage ops during assembly time
+ ThrottledCall(ThrottledCall<V> other) {
+ this(other.executor, other.limiter,
+ other.delegate == null ? other.supplier : () -> other.delegate.clone());
+ }
+
+ // TODO: we cannot currently extend Call.Base as tests execute the call multiple times,
+ // which is invalid as calls are one-shot. It isn't worth refactoring until we refactor out
+ // the need for assembly time throttling (fix to in-memory storage)
+ @Override public V execute() throws IOException {
+ try {
+ delegate = supplier.get();
+
+ // Make sure we throttle
+ Future<V> future = executor.submit(() -> {
+ String oldName = setCurrentThreadName(delegate.toString());
+ try {
+ return delegate.execute();
+ } finally {
+ setCurrentThreadName(oldName);
+ }
+ });
+ V result = future.get(); // Still block for the response
+
+ limitListener.onSuccess();
+ return result;
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof RejectedExecutionException) {
+ // Storage rejected us, throttle back
+ limitListener.onDropped();
+ } else {
+ limitListener.onIgnore();
+ }
+
+ if (cause instanceof RuntimeException) {
+ throw (RuntimeException) cause;
+ } else if (cause instanceof IOException) {
+ throw (IOException) cause;
+ } else {
+ throw new RuntimeException("Issue while executing on a throttled call", cause);
+ }
+ } catch (InterruptedException e) {
+ limitListener.onIgnore();
+ throw new RuntimeException("Interrupted while blocking on a throttled call", e);
+ } catch (RuntimeException | Error e) {
+ propagateIfFatal(e);
+ // Ignoring in all cases here because storage itself isn't saying we need to throttle. Though, we may still be
+ // write bound, but a drop in concurrency won't necessarily help.
+ limitListener.onIgnore();
+ throw e;
+ }
+ }
+
+ @Override public void enqueue(Callback<V> callback) {
+ try {
+ executor.execute(new QueuedCall(callback));
+ } catch (RuntimeException | Error e) {
+ propagateIfFatal(e);
+ // Ignoring in all cases here because storage itself isn't saying we need to throttle. Though, we may still be
+ // write bound, but a drop in concurrency won't necessarily help.
+ limitListener.onIgnore();
+ throw e;
+ }
+ }
+
+ @Override public void cancel() {
+ canceled = true;
+ if (delegate != null) delegate.cancel();
+ }
+
+ @Override public boolean isCanceled() {
+ return canceled || (delegate != null && delegate.isCanceled());
+ }
+
+ @Override public Call<V> clone() {
+ return new ThrottledCall<>(this);
+ }
+
+ @Override public String toString() {
+ return "Throttled" + supplier;
+ }
+
+ static String setCurrentThreadName(String name) {
+ Thread thread = Thread.currentThread();
+ String originalName = thread.getName();
+ thread.setName(name);
+ return originalName;
+ }
+
+ final class QueuedCall implements Runnable {
+ final Callback<V> callback;
+
+ QueuedCall(Callback<V> callback) {
+ this.callback = callback;
+ }
+
+ @Override public void run() {
+ try {
+ if (isCanceled()) return;
+
+ delegate = ThrottledCall.this.supplier.get();
+
+ String oldName = setCurrentThreadName(delegate.toString());
+ try {
+ enqueueAndWait();
+ } finally {
+ setCurrentThreadName(oldName);
+ }
+ } catch (RuntimeException | Error e) {
+ propagateIfFatal(e);
+ limitListener.onIgnore();
+ callback.onError(e);
+ }
+ }
+
+ void enqueueAndWait() {
+ ThrottledCallback<V> throttleCallback = new ThrottledCallback<>(callback, limitListener);
+ delegate.enqueue(throttleCallback);
+
+ // Need to wait here since the callback call will run asynchronously also.
+ // This ensures we don't exceed our throttle/queue limits.
+ throttleCallback.await();
+ }
+ }
+
+ static final class ThrottledCallback<V> implements Callback<V> {
+ final Callback<V> supplier;
+ final Listener limitListener;
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ ThrottledCallback(Callback<V> supplier, Listener limitListener) {
+ this.supplier = supplier;
+ this.limitListener = limitListener;
+ }
+
+ void await() {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ limitListener.onIgnore();
+ throw new RuntimeException("Interrupted while blocking on a throttled call", e);
+ }
+ }
+
+ @Override public void onSuccess(V value) {
+ try {
+ limitListener.onSuccess();
+ supplier.onSuccess(value);
+ } finally {
+ latch.countDown();
+ }
+ }
+
+ @Override public void onError(Throwable t) {
+ try {
+ if (t instanceof RejectedExecutionException) {
+ limitListener.onDropped();
+ } else {
+ limitListener.onIgnore();
+ }
+
+ supplier.onError(t);
+ } finally {
+ latch.countDown();
+ }
+ }
+ }
+}
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java
new file mode 100644
index 0000000..91e7b78
--- /dev/null
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.server.internal.throttle;
+
+import com.netflix.concurrency.limits.Limit;
+import com.netflix.concurrency.limits.Limiter;
+import com.netflix.concurrency.limits.limit.Gradient2Limit;
+import com.netflix.concurrency.limits.limiter.AbstractLimiter;
+import io.micrometer.core.instrument.MeterRegistry;
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import zipkin2.Call;
+import zipkin2.Span;
+import zipkin2.storage.SpanConsumer;
+import zipkin2.storage.SpanStore;
+import zipkin2.storage.StorageComponent;
+
+/**
+ * Delegating implementation that limits requests to the {@link #spanConsumer()} of another {@link
+ * StorageComponent}. The theory here is that this class can be used to:
+ * <ul>
+ * <li>Prevent spamming the storage engine with excessive, spike requests when they come in; thus
+ * preserving it's life.</li>
+ * <li>Optionally act as a buffer so that a fixed number requests can be queued for execution when
+ * the throttle allows for it. This optional queue must be bounded in order to avoid running out of
+ * memory from infinitely queueing.</li>
+ * </ul>
+ *
+ * @see ThrottledSpanConsumer
+ */
+public final class ThrottledStorageComponent extends StorageComponent {
+ final StorageComponent delegate;
+ final AbstractLimiter<Void> limiter;
+ final ThreadPoolExecutor executor;
+
+ public ThrottledStorageComponent(StorageComponent delegate, MeterRegistry registry,
+ int minConcurrency,
+ int maxConcurrency,
+ int maxQueueSize) {
+ this.delegate = Objects.requireNonNull(delegate);
+
+ Limit limit = Gradient2Limit.newBuilder()
+ .minLimit(minConcurrency)
+ .initialLimit(
+ minConcurrency) // Limiter will trend towards min until otherwise necessary so may as well start there
+ .maxConcurrency(maxConcurrency)
+ .queueSize(0)
+ .build();
+ this.limiter = new Builder().limit(limit).build();
+
+ // TODO: explain these parameters
+ this.executor = new ThreadPoolExecutor(limit.getLimit(),
+ limit.getLimit(),
+ 0,
+ TimeUnit.DAYS,
+ createQueue(maxQueueSize),
+ new ThottledThreadFactory(),
+ new ThreadPoolExecutor.AbortPolicy());
+
+ limit.notifyOnChange(new ThreadPoolExecutorResizer(executor));
+
+ ActuateThrottleMetrics metrics = new ActuateThrottleMetrics(registry);
+ metrics.bind(executor);
+ metrics.bind(limiter);
+ }
+
+ @Override public SpanStore spanStore() {
+ return delegate.spanStore();
+ }
+
+ @Override public SpanConsumer spanConsumer() {
+ return new ThrottledSpanConsumer(delegate.spanConsumer(), limiter, executor);
+ }
+
+ @Override public void close() throws IOException {
+ executor.shutdownNow();
+ delegate.close();
+ }
+
+ @Override public String toString() {
+ return "Throttled" + delegate;
+ }
+
+ final class ThrottledSpanConsumer implements SpanConsumer {
+ final SpanConsumer delegate;
+ final Limiter<Void> limiter;
+ final ExecutorService executor;
+
+ ThrottledSpanConsumer(SpanConsumer delegate, Limiter<Void> limiter, ExecutorService executor) {
+ this.delegate = delegate;
+ this.limiter = limiter;
+ this.executor = executor;
+ }
+
+ @Override public Call<Void> accept(List<Span> spans) {
+ return new ThrottledCall<>(executor, limiter, () -> delegate.accept(spans));
+ }
+
+ @Override public String toString() {
+ return "Throttled" + delegate;
+ }
+ }
+
+ static BlockingQueue<Runnable> createQueue(int maxSize) {
+ if (maxSize < 0) throw new IllegalArgumentException("maxSize < 0");
+
+ if (maxSize == 0) {
+ // 0 means we should be bounded but we can't create a queue with that size so use 1 instead.
+ maxSize = 1;
+ }
+
+ return new LinkedBlockingQueue<>(maxSize);
+ }
+
+ static final class ThottledThreadFactory implements ThreadFactory {
+ @Override public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ thread.setName("zipkin-throttle-pool-" + thread.getId());
+ return thread;
+ }
+ }
+
+ static final class ThreadPoolExecutorResizer implements Consumer<Integer> {
+ final ThreadPoolExecutor executor;
+
+ ThreadPoolExecutorResizer(ThreadPoolExecutor executor) {
+ this.executor = executor;
+ }
+
+ /**
+ * This is {@code synchronized} to ensure that we don't let the core/max pool sizes get out of
+ * sync; even for an instant. The two need to be tightly coupled together to ensure that when
+ * our queue fills up we don't spin up extra Threads beyond our calculated limit.
+ *
+ * <p>There is also an unfortunate aspect where the {@code max} has to always be greater than
+ * {@code core} or an exception will be thrown. So they have to be adjust appropriately
+ * relative to the direction the size is going.
+ */
+ @Override public synchronized void accept(Integer newValue) {
+ int previousValue = executor.getCorePoolSize();
+
+ int newValueInt = newValue;
+ if (previousValue < newValueInt) {
+ executor.setMaximumPoolSize(newValueInt);
+ executor.setCorePoolSize(newValueInt);
+ } else if (previousValue > newValueInt) {
+ executor.setCorePoolSize(newValueInt);
+ executor.setMaximumPoolSize(newValueInt);
+ }
+ // Note: no case for equals. Why modify something that doesn't need modified?
+ }
+ }
+
+ static final class Builder extends AbstractLimiter.Builder<Builder> {
+ NonLimitingLimiter build() {
+ return new NonLimitingLimiter(this);
+ }
+
+ @Override protected Builder self() {
+ return this;
+ }
+ }
+
+ /**
+ * Unlike a normal Limiter, this will actually not prevent the creation of a {@link Listener} in
+ * {@link #acquire(java.lang.Void)}. The point of this is to ensure that we can always derive an
+ * appropriate {@link Limit#getLimit() Limit} while the {@link #executor} handles actually
+ * limiting running requests.
+ */
+ static final class NonLimitingLimiter extends AbstractLimiter<Void> {
+ NonLimitingLimiter(AbstractLimiter.Builder<?> builder) {
+ super(builder);
+ }
+
+ @Override public Optional<Listener> acquire(Void context) {
+ return Optional.of(createListener());
+ }
+ }
+}
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ZipkinStorageThrottleProperties.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ZipkinStorageThrottleProperties.java
new file mode 100644
index 0000000..fd344db
--- /dev/null
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ZipkinStorageThrottleProperties.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.server.internal.throttle;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@ConfigurationProperties("zipkin.storage.throttle")
+public final class ZipkinStorageThrottleProperties {
+ /** Should we throttle at all? */
+ private boolean enabled;
+ /** Minimum number of storage requests to allow through at a given time. */
+ private int minConcurrency;
+ /**
+ * Maximum number of storage requests to allow through at a given time. Should be tuned to
+ * (bulk_index_pool_size / num_servers_in_cluster). e.g. 200 (default pool size in Elasticsearch)
+ * / 2 (number of load balanced zipkin-server instances) = 100.
+ */
+ private int maxConcurrency;
+ /**
+ * Maximum number of storage requests to buffer while waiting for open Thread. 0 = no buffering.
+ */
+ private int maxQueueSize;
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public int getMinConcurrency() {
+ return minConcurrency;
+ }
+
+ public void setMinConcurrency(int minConcurrency) {
+ this.minConcurrency = minConcurrency;
+ }
+
+ public int getMaxConcurrency() {
+ return maxConcurrency;
+ }
+
+ public void setMaxConcurrency(int maxConcurrency) {
+ this.maxConcurrency = maxConcurrency;
+ }
+
+ public int getMaxQueueSize() {
+ return maxQueueSize;
+ }
+
+ public void setMaxQueueSize(int maxQueueSize) {
+ this.maxQueueSize = maxQueueSize;
+ }
+}
diff --git a/zipkin-server/src/main/resources/zipkin-server-shared.yml b/zipkin-server/src/main/resources/zipkin-server-shared.yml
index 72b0fe8..009e3a0 100644
--- a/zipkin-server/src/main/resources/zipkin-server-shared.yml
+++ b/zipkin-server/src/main/resources/zipkin-server-shared.yml
@@ -53,6 +53,11 @@ zipkin:
autocomplete-ttl: ${AUTOCOMPLETE_TTL:3600000}
autocomplete-cardinality: 20000
type: ${STORAGE_TYPE:mem}
+ throttle:
+ enabled: ${STORAGE_THROTTLE_ENABLED:false}
+ min-concurrency: ${STORAGE_THROTTLE_MIN_CONCURRENCY:10}
+ max-concurrency: ${STORAGE_THROTTLE_MAX_CONCURRENCY:200}
+ max-queue-size: ${STORAGE_THROTTLE_MAX_QUEUE_SIZE:1000}
mem:
# Maximum number of spans to keep in memory. When exceeded, oldest traces (and their spans) will be purged.
# A safe estimate is 1K of memory per span (each span with 2 annotations + 1 binary annotation), plus
diff --git a/zipkin-server/src/test/kotlin/zipkin2/server/internal/elasticsearch/BasicAuthInterceptorTest.kt b/zipkin-server/src/test/kotlin/zipkin2/server/internal/elasticsearch/BasicAuthInterceptorTest.kt
index 40ec38c..6a5f5f3 100644
--- a/zipkin-server/src/test/kotlin/zipkin2/server/internal/elasticsearch/BasicAuthInterceptorTest.kt
+++ b/zipkin-server/src/test/kotlin/zipkin2/server/internal/elasticsearch/BasicAuthInterceptorTest.kt
@@ -29,7 +29,7 @@ class BasicAuthInterceptorTest {
@Rule @JvmField val thrown: ExpectedException = ExpectedException.none()
var client: OkHttpClient = OkHttpClient.Builder()
- .addNetworkInterceptor(BasicAuthInterceptor(ZipkinElasticsearchStorageProperties()))
+ .addNetworkInterceptor(BasicAuthInterceptor(ZipkinElasticsearchStorageProperties(false, 0)))
.build()
@Test fun intercept_whenESReturns403AndJsonBody_throwsWithResponseBodyMessage() {
diff --git a/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt
new file mode 100644
index 0000000..00eb02f
--- /dev/null
+++ b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.server.internal.throttle
+
+import com.netflix.concurrency.limits.Limiter
+import com.netflix.concurrency.limits.Limiter.Listener
+import com.netflix.concurrency.limits.limit.SettableLimit
+import com.netflix.concurrency.limits.limiter.SimpleLimiter
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.Test
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito
+import org.mockito.Mockito.`when`
+import org.mockito.Mockito.doThrow
+import org.mockito.Mockito.verify
+import zipkin2.Call
+import zipkin2.Callback
+import java.io.IOException
+import java.util.Optional
+import java.util.concurrent.Callable
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.ExecutionException
+import java.util.concurrent.ExecutorService
+import java.util.concurrent.Executors
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.RejectedExecutionException
+import java.util.concurrent.Semaphore
+import java.util.concurrent.ThreadPoolExecutor
+import java.util.concurrent.TimeUnit
+import java.util.function.Supplier
+
+// TODO: this class re-uses Call objects which is bad as they are one-shot. This needs to be
+// refactored in order to be realistic (calls throw if re-invoked, as clone() is the correct way)
+class ThrottledCallTest {
+ var limit = SettableLimit.startingAt(0)
+ var limiter = SimpleLimiter.newBuilder().limit(limit).build<Void>()
+
+ inline fun <reified T : Any> mock() = Mockito.mock(T::class.java)
+
+ @Test fun callCreation_isDeferred() {
+ val created = booleanArrayOf(false)
+
+ val throttle = createThrottle(Supplier {
+ created[0] = true
+ Call.create<Void>(null)
+ })
+
+ assertThat(created).contains(false)
+ throttle.execute()
+ assertThat(created).contains(true)
+ }
+
+ @Test fun execute_isThrottled() {
+ val numThreads = 1
+ val queueSize = 1
+ val totalTasks = numThreads + queueSize
+
+ val startLock = Semaphore(numThreads)
+ val waitLock = Semaphore(totalTasks)
+ val failLock = Semaphore(1)
+ val throttle =
+ createThrottle(numThreads, queueSize, Supplier { LockedCall(startLock, waitLock) })
+
+ // Step 1: drain appropriate locks
+ startLock.drainPermits()
+ waitLock.drainPermits()
+ failLock.drainPermits()
+
+ // Step 2: saturate threads and fill queue
+ val backgroundPool = Executors.newCachedThreadPool()
+ for (i in 0 until totalTasks) {
+ backgroundPool.submit(Callable { throttle.execute() })
+ }
+
+ try {
+ // Step 3: make sure the threads actually started
+ startLock.acquire(numThreads)
+
+ // Step 4: submit something beyond our limits
+ val future = backgroundPool.submit(Callable {
+ try {
+ throttle.execute()
+ } catch (e: IOException) {
+ throw RuntimeException(e)
+ } finally {
+ // Step 6: signal that we tripped the limit
+ failLock.release()
+ }
+ })
+
+ // Step 5: wait to make sure our limit actually tripped
+ failLock.acquire()
+
+ future.get()
+
+ // Step 7: Expect great things
+ assertThat(true).isFalse() // should raise a RejectedExecutionException
+ } catch (t: Throwable) {
+ assertThat(t)
+ .isInstanceOf(ExecutionException::class.java) // from future.get
+ .hasCauseInstanceOf(RejectedExecutionException::class.java)
+ } finally {
+ waitLock.release(totalTasks)
+ startLock.release(totalTasks)
+ backgroundPool.shutdownNow()
+ }
+ }
+
+ @Test fun execute_trottlesBack_whenStorageRejects() {
+ val listener: Listener = mock()
+ val call = FakeCall()
+ call.overCapacity = true
+
+ val throttle = ThrottledCall(createPool(1, 1), mockLimiter(listener), Supplier { call })
+ try {
+ throttle.execute()
+ assertThat(true).isFalse() // should raise a RejectedExecutionException
+ } catch (e: RejectedExecutionException) {
+ verify(listener).onDropped()
+ }
+ }
+
+ @Test fun execute_ignoresLimit_whenPoolFull() {
+ val listener: Listener = mock()
+
+ val throttle =
+ ThrottledCall(mockExhaustedPool(), mockLimiter(listener), Supplier { FakeCall() })
+ try {
+ throttle.execute()
+ assertThat(true).isFalse() // should raise a RejectedExecutionException
+ } catch (e: RejectedExecutionException) {
+ verify(listener).onIgnore()
+ }
+ }
+
+ @Test fun enqueue_isThrottled() {
+ val numThreads = 1
+ val queueSize = 1
+ val totalTasks = numThreads + queueSize
+
+ val startLock = Semaphore(numThreads)
+ val waitLock = Semaphore(totalTasks)
+ val throttle =
+ createThrottle(numThreads, queueSize, Supplier { LockedCall(startLock, waitLock) })
+
+ // Step 1: drain appropriate locks
+ startLock.drainPermits()
+ waitLock.drainPermits()
+
+ // Step 2: saturate threads and fill queue
+ val callback: Callback<Void> = mock()
+ for (i in 0 until totalTasks) {
+ throttle.enqueue(callback)
+ }
+
+ // Step 3: make sure the threads actually started
+ startLock.acquire(numThreads)
+
+ try {
+ // Step 4: submit something beyond our limits and make sure it fails
+ throttle.enqueue(callback)
+
+ assertThat(true).isFalse() // should raise a RejectedExecutionException
+ } catch (e: RejectedExecutionException) {
+ } finally {
+ waitLock.release(totalTasks)
+ startLock.release(totalTasks)
+ }
+ }
+
+ @Test fun enqueue_throttlesBack_whenStorageRejects() {
+ val listener: Listener = mock()
+ val call = FakeCall()
+ call.overCapacity = true
+
+ val throttle = ThrottledCall(createPool(1, 1), mockLimiter(listener), Supplier { call })
+ val latch = CountDownLatch(1)
+ throttle.enqueue(object : Callback<Void> {
+ override fun onSuccess(value: Void) {
+ latch.countDown()
+ }
+
+ override fun onError(t: Throwable) {
+ latch.countDown()
+ }
+ })
+
+ latch.await(1, TimeUnit.MINUTES)
+ verify(listener).onDropped()
+ }
+
+ @Test fun enqueue_ignoresLimit_whenPoolFull() {
+ val listener: Listener = mock()
+
+ val throttle =
+ ThrottledCall(mockExhaustedPool(), mockLimiter(listener), Supplier { FakeCall() })
+ try {
+ throttle.enqueue(null)
+ assertThat(true).isFalse() // should raise a RejectedExecutionException
+ } catch (e: RejectedExecutionException) {
+ verify(listener).onIgnore()
+ }
+ }
+
+ private fun createThrottle(delegate: Supplier<Call<Void>>): ThrottledCall<Void> {
+ return createThrottle(1, 1, delegate)
+ }
+
+ private fun createThrottle(
+ poolSize: Int,
+ queueSize: Int,
+ delegate: Supplier<Call<Void>>
+ ): ThrottledCall<Void> {
+ limit.setLimit(limit.getLimit() + 1)
+ return ThrottledCall(createPool(poolSize, queueSize), limiter, delegate)
+ }
+
+ private class LockedCall(val startLock: Semaphore, val waitLock: Semaphore) : Call.Base<Void>() {
+ override fun doExecute(): Void? {
+ try {
+ startLock.release()
+ waitLock.acquire()
+ return null;
+ } catch (e: InterruptedException) {
+ Thread.currentThread().interrupt()
+ throw AssertionError(e)
+ }
+ }
+
+ override fun doEnqueue(callback: Callback<Void>) {
+ try {
+ callback.onSuccess(doExecute())
+ } catch (t: Throwable) {
+ callback.onError(t)
+ }
+ }
+
+ override fun clone() = LockedCall(startLock, waitLock);
+ }
+
+ private fun createPool(poolSize: Int, queueSize: Int): ExecutorService {
+ return ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.DAYS,
+ LinkedBlockingQueue(queueSize))
+ }
+
+ private fun mockExhaustedPool(): ExecutorService {
+ val mock: ExecutorService = mock()
+ doThrow(RejectedExecutionException::class.java).`when`(mock).execute(any())
+ doThrow(RejectedExecutionException::class.java).`when`(mock).submit(any(Callable::class.java))
+ return mock
+ }
+
+ private fun mockLimiter(listener: Listener): Limiter<Void> {
+ val mock: Limiter<Void> = mock()
+ `when`(mock.acquire(any())).thenReturn(Optional.of(listener))
+ return mock
+ }
+
+ private class FakeCall(var overCapacity: Boolean = false) : Call.Base<Void>() {
+ override fun doExecute(): Void? {
+ if (overCapacity) throw RejectedExecutionException()
+ return null
+ }
+
+ override fun doEnqueue(callback: Callback<Void>) {
+ if (overCapacity) {
+ callback.onError(RejectedExecutionException())
+ } else {
+ callback.onSuccess(null)
+ }
+ }
+
+ override fun clone() = FakeCall()
+ }
+}
diff --git a/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.kt b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.kt
new file mode 100644
index 0000000..705967a
--- /dev/null
+++ b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.kt
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.server.internal.throttle
+
+import com.linecorp.armeria.common.metric.NoopMeterRegistry
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.Test
+import zipkin2.storage.InMemoryStorage
+
+class ThrottledStorageComponentTest {
+ val delegate = InMemoryStorage.newBuilder().build()
+ val registry = NoopMeterRegistry.get()
+
+ @Test fun spanConsumer_isProxied() {
+ val throttle = ThrottledStorageComponent(delegate, registry, 1, 2, 1)
+
+ assertThat(throttle.spanConsumer().accept(listOf()))
+ .isInstanceOf(ThrottledCall::class.java)
+ }
+
+ @Test fun createComponent_withZeroSizedQueue() {
+ val queueSize = 0
+ ThrottledStorageComponent(delegate, registry, 1, 2, queueSize)
+ // no exception == pass
+ }
+
+ @Test(expected = IllegalArgumentException::class)
+ fun createComponent_withNegativeQueue() {
+ val queueSize = -1
+ ThrottledStorageComponent(delegate, registry, 1, 2, queueSize)
+ }
+
+ @Test fun niceToString() {
+ assertThat(ThrottledStorageComponent(delegate, registry, 1, 2, 1))
+ .hasToString("ThrottledInMemoryStorage{traceCount=0}");
+ }
+}
diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java
index 6c5e4b3..507ad8e 100644
--- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java
+++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java
@@ -105,6 +105,11 @@ public final class HttpCall<V> extends Call<V> {
return new HttpCall<V>(call.clone(), semaphore, bodyConverter);
}
+ @Override
+ public String toString() {
+ return "HttpCall(" + call + ")";
+ }
+
static class V2CallbackAdapter<V> implements okhttp3.Callback {
final Semaphore semaphore;
final BodyConverter<V> bodyConverter;
diff --git a/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java b/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java
index 3057940..ce72238 100644
--- a/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java
+++ b/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java
@@ -574,4 +574,8 @@ public final class InMemoryStorage extends StorageComponent implements SpanStore
return h$;
}
}
+
+ @Override public String toString() {
+ return "InMemoryStorage{traceCount=" + traceIdToTraceIdTimeStamps.size() + "}";
+ }
}