You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/19 18:19:25 UTC
[06/24] camel git commit: CAMEL-9879: Circuit Breaker EIP - That is
using hystrix
CAMEL-9879: Circuit Breaker EIP - That is using hystrix
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3dd001e4
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3dd001e4
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3dd001e4
Branch: refs/heads/master
Commit: 3dd001e4d3112b6cf19dffe74342a48a568b917c
Parents: ca54a54
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Apr 19 13:13:48 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Apr 19 18:16:45 2016 +0200
----------------------------------------------------------------------
.../model/HystrixConfigurationDefinition.java | 641 +++++++++++++++++++
.../apache/camel/model/HystrixDefinition.java | 65 +-
.../resources/org/apache/camel/model/jaxb.index | 1 +
.../hystrix/processor/HystrixProcessor.java | 9 +-
.../processor/HystrixProcessorCommand.java | 5 +-
.../processor/HystrixProcessorFactory.java | 106 ++-
6 files changed, 808 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3dd001e4/camel-core/src/main/java/org/apache/camel/model/HystrixConfigurationDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/HystrixConfigurationDefinition.java b/camel-core/src/main/java/org/apache/camel/model/HystrixConfigurationDefinition.java
new file mode 100644
index 0000000..5ea084e
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/model/HystrixConfigurationDefinition.java
@@ -0,0 +1,641 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.camel.spi.Metadata;
+
+@Metadata(label = "eip,routing,circuitbreaker")
+@XmlRootElement(name = "hystrixConfiguration")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class HystrixConfigurationDefinition {
+
+ @XmlTransient
+ private HystrixDefinition parent;
+
+ @XmlAttribute
+ @Metadata(label = "command", defaultValue = "true")
+ private Boolean circuitBreakerEnabled;
+ @XmlAttribute
+ @Metadata(label = "command", defaultValue = "50")
+ private Integer circuitBreakerErrorThresholdPercentage;
+ @XmlAttribute
+ @Metadata(label = "command", defaultValue = "true")
+ private Boolean circuitBreakerForceClosed;
+ @XmlAttribute
+ @Metadata(label = "command", defaultValue = "false")
+ private Boolean circuitBreakerForceOpen;
+ @XmlAttribute
+ @Metadata(label = "command", defaultValue = "20")
+ private Integer circuitBreakerRequestVolumeThreshold;
+ @XmlAttribute
+ @Metadata(label = "command", defaultValue = "5000")
+ private Integer circuitBreakerSleepWindowInMilliseconds;
+ @XmlAttribute
+ @Metadata(label = "command", defaultValue = "20")
+ private Integer executionIsolationSemaphoreMaxConcurrentRequests;
+ @XmlAttribute
+ @Metadata(label = "command", defaultValue = "THREAD")
+ private String executionIsolationStrategy;
+ @XmlAttribute
+ @Metadata(label = "command", defaultValue = "true")
+ private Boolean executionIsolationThreadInterruptOnTimeout;
+ @XmlAttribute
+ @Metadata(label = "command", defaultValue = "1000")
+ private Integer executionTimeoutInMilliseconds;
+ @XmlAttribute
+ @Metadata(label = "command", defaultValue = "true")
+ private Boolean executionTimeoutEnabled;
+ @XmlAttribute
+ @Metadata(label = "command", defaultValue = "10")
+ private Integer fallbackIsolationSemaphoreMaxConcurrentRequests;
+ @XmlAttribute
+ @Metadata(label = "command", defaultValue = "true")
+ private Boolean fallbackEnabled;
+ @XmlAttribute
+ @Metadata(label = "command", defaultValue = "500")
+ private Integer metricsHealthSnapshotIntervalInMilliseconds;
+ @XmlAttribute
+ @Metadata(label = "command", defaultValue = "10")
+ private Integer metricsRollingPercentileBucketSize;
+ @XmlAttribute
+ @Metadata(label = "command", defaultValue = "true")
+ private Boolean metricsRollingPercentileEnabled;
+ @XmlAttribute
+ @Metadata(label = "command", defaultValue = "false")
+ private Integer metricsRollingPercentileWindowInMilliseconds;
+ @XmlAttribute
+ @Metadata(label = "command", defaultValue = "6")
+ private Integer metricsRollingPercentileWindowBuckets;
+ @XmlAttribute
+ @Metadata(label = "command", defaultValue = "10000")
+ private Integer metricsRollingStatisticalWindowInMilliseconds;
+ @XmlAttribute
+ @Metadata(label = "command", defaultValue = "10")
+ private Integer metricsRollingStatisticalWindowBuckets;
+ @XmlAttribute
+ @Metadata(label = "command", defaultValue = "true")
+ private Boolean requestCacheEnabled;
+ @XmlAttribute
+ @Metadata(label = "command", defaultValue = "true")
+ private Boolean requestLogEnabled;
+
+ // thread-pool
+
+ @XmlAttribute
+ @Metadata(label = "threadpool", defaultValue = "10")
+ private Integer corePoolSize;
+ @XmlAttribute
+ @Metadata(label = "threadpool", defaultValue = "1")
+ private Integer keepAliveTime;
+ @XmlAttribute
+ @Metadata(label = "threadpool", defaultValue = "-1")
+ private Integer maxQueueSize;
+ @XmlAttribute
+ @Metadata(label = "threadpool", defaultValue = "5")
+ private Integer queueSizeRejectionThreshold;
+ @XmlAttribute
+ @Metadata(label = "threadpool", defaultValue = "10000")
+ private Integer threadPoolRollingNumberStatisticalWindowInMilliseconds;
+ @XmlAttribute
+ @Metadata(label = "threadpool", defaultValue = "10")
+ private Integer threadPoolRollingNumberStatisticalWindowBuckets;
+
+ public HystrixConfigurationDefinition() {
+ }
+
+ public HystrixConfigurationDefinition(HystrixDefinition parent) {
+ this.parent = parent;
+ }
+
+ // Getter/Setter
+ // -------------------------------------------------------------------------
+
+ public Boolean getCircuitBreakerEnabled() {
+ return circuitBreakerEnabled;
+ }
+
+ public void setCircuitBreakerEnabled(Boolean circuitBreakerEnabled) {
+ this.circuitBreakerEnabled = circuitBreakerEnabled;
+ }
+
+ public Integer getCircuitBreakerErrorThresholdPercentage() {
+ return circuitBreakerErrorThresholdPercentage;
+ }
+
+ public void setCircuitBreakerErrorThresholdPercentage(Integer circuitBreakerErrorThresholdPercentage) {
+ this.circuitBreakerErrorThresholdPercentage = circuitBreakerErrorThresholdPercentage;
+ }
+
+ public Boolean getCircuitBreakerForceClosed() {
+ return circuitBreakerForceClosed;
+ }
+
+ public void setCircuitBreakerForceClosed(Boolean circuitBreakerForceClosed) {
+ this.circuitBreakerForceClosed = circuitBreakerForceClosed;
+ }
+
+ public Boolean getCircuitBreakerForceOpen() {
+ return circuitBreakerForceOpen;
+ }
+
+ public void setCircuitBreakerForceOpen(Boolean circuitBreakerForceOpen) {
+ this.circuitBreakerForceOpen = circuitBreakerForceOpen;
+ }
+
+ public Integer getCircuitBreakerRequestVolumeThreshold() {
+ return circuitBreakerRequestVolumeThreshold;
+ }
+
+ public void setCircuitBreakerRequestVolumeThreshold(Integer circuitBreakerRequestVolumeThreshold) {
+ this.circuitBreakerRequestVolumeThreshold = circuitBreakerRequestVolumeThreshold;
+ }
+
+ public Integer getCircuitBreakerSleepWindowInMilliseconds() {
+ return circuitBreakerSleepWindowInMilliseconds;
+ }
+
+ public void setCircuitBreakerSleepWindowInMilliseconds(Integer circuitBreakerSleepWindowInMilliseconds) {
+ this.circuitBreakerSleepWindowInMilliseconds = circuitBreakerSleepWindowInMilliseconds;
+ }
+
+ public Integer getExecutionIsolationSemaphoreMaxConcurrentRequests() {
+ return executionIsolationSemaphoreMaxConcurrentRequests;
+ }
+
+ public void setExecutionIsolationSemaphoreMaxConcurrentRequests(Integer executionIsolationSemaphoreMaxConcurrentRequests) {
+ this.executionIsolationSemaphoreMaxConcurrentRequests = executionIsolationSemaphoreMaxConcurrentRequests;
+ }
+
+ public String getExecutionIsolationStrategy() {
+ return executionIsolationStrategy;
+ }
+
+ public void setExecutionIsolationStrategy(String executionIsolationStrategy) {
+ this.executionIsolationStrategy = executionIsolationStrategy;
+ }
+
+ public Boolean getExecutionIsolationThreadInterruptOnTimeout() {
+ return executionIsolationThreadInterruptOnTimeout;
+ }
+
+ public void setExecutionIsolationThreadInterruptOnTimeout(Boolean executionIsolationThreadInterruptOnTimeout) {
+ this.executionIsolationThreadInterruptOnTimeout = executionIsolationThreadInterruptOnTimeout;
+ }
+
+ public Integer getExecutionTimeoutInMilliseconds() {
+ return executionTimeoutInMilliseconds;
+ }
+
+ public void setExecutionTimeoutInMilliseconds(Integer executionTimeoutInMilliseconds) {
+ this.executionTimeoutInMilliseconds = executionTimeoutInMilliseconds;
+ }
+
+ public Boolean getExecutionTimeoutEnabled() {
+ return executionTimeoutEnabled;
+ }
+
+ public void setExecutionTimeoutEnabled(Boolean executionTimeoutEnabled) {
+ this.executionTimeoutEnabled = executionTimeoutEnabled;
+ }
+
+ public Integer getFallbackIsolationSemaphoreMaxConcurrentRequests() {
+ return fallbackIsolationSemaphoreMaxConcurrentRequests;
+ }
+
+ public void setFallbackIsolationSemaphoreMaxConcurrentRequests(Integer fallbackIsolationSemaphoreMaxConcurrentRequests) {
+ this.fallbackIsolationSemaphoreMaxConcurrentRequests = fallbackIsolationSemaphoreMaxConcurrentRequests;
+ }
+
+ public Boolean getFallbackEnabled() {
+ return fallbackEnabled;
+ }
+
+ public void setFallbackEnabled(Boolean fallbackEnabled) {
+ this.fallbackEnabled = fallbackEnabled;
+ }
+
+ public Integer getMetricsHealthSnapshotIntervalInMilliseconds() {
+ return metricsHealthSnapshotIntervalInMilliseconds;
+ }
+
+ public void setMetricsHealthSnapshotIntervalInMilliseconds(Integer metricsHealthSnapshotIntervalInMilliseconds) {
+ this.metricsHealthSnapshotIntervalInMilliseconds = metricsHealthSnapshotIntervalInMilliseconds;
+ }
+
+ public Integer getMetricsRollingPercentileBucketSize() {
+ return metricsRollingPercentileBucketSize;
+ }
+
+ public void setMetricsRollingPercentileBucketSize(Integer metricsRollingPercentileBucketSize) {
+ this.metricsRollingPercentileBucketSize = metricsRollingPercentileBucketSize;
+ }
+
+ public Boolean getMetricsRollingPercentileEnabled() {
+ return metricsRollingPercentileEnabled;
+ }
+
+ public void setMetricsRollingPercentileEnabled(Boolean metricsRollingPercentileEnabled) {
+ this.metricsRollingPercentileEnabled = metricsRollingPercentileEnabled;
+ }
+
+ public Integer getMetricsRollingPercentileWindowInMilliseconds() {
+ return metricsRollingPercentileWindowInMilliseconds;
+ }
+
+ public void setMetricsRollingPercentileWindowInMilliseconds(Integer metricsRollingPercentileWindowInMilliseconds) {
+ this.metricsRollingPercentileWindowInMilliseconds = metricsRollingPercentileWindowInMilliseconds;
+ }
+
+ public Integer getMetricsRollingPercentileWindowBuckets() {
+ return metricsRollingPercentileWindowBuckets;
+ }
+
+ public void setMetricsRollingPercentileWindowBuckets(Integer metricsRollingPercentileWindowBuckets) {
+ this.metricsRollingPercentileWindowBuckets = metricsRollingPercentileWindowBuckets;
+ }
+
+ public Integer getMetricsRollingStatisticalWindowInMilliseconds() {
+ return metricsRollingStatisticalWindowInMilliseconds;
+ }
+
+ public void setMetricsRollingStatisticalWindowInMilliseconds(Integer metricsRollingStatisticalWindowInMilliseconds) {
+ this.metricsRollingStatisticalWindowInMilliseconds = metricsRollingStatisticalWindowInMilliseconds;
+ }
+
+ public Integer getMetricsRollingStatisticalWindowBuckets() {
+ return metricsRollingStatisticalWindowBuckets;
+ }
+
+ public void setMetricsRollingStatisticalWindowBuckets(Integer metricsRollingStatisticalWindowBuckets) {
+ this.metricsRollingStatisticalWindowBuckets = metricsRollingStatisticalWindowBuckets;
+ }
+
+ public Boolean getRequestCacheEnabled() {
+ return requestCacheEnabled;
+ }
+
+ public void setRequestCacheEnabled(Boolean requestCacheEnabled) {
+ this.requestCacheEnabled = requestCacheEnabled;
+ }
+
+ public Boolean getRequestLogEnabled() {
+ return requestLogEnabled;
+ }
+
+ public void setRequestLogEnabled(Boolean requestLogEnabled) {
+ this.requestLogEnabled = requestLogEnabled;
+ }
+
+ public Integer getCorePoolSize() {
+ return corePoolSize;
+ }
+
+ public void setCorePoolSize(Integer corePoolSize) {
+ this.corePoolSize = corePoolSize;
+ }
+
+ public Integer getKeepAliveTime() {
+ return keepAliveTime;
+ }
+
+ public void setKeepAliveTime(Integer keepAliveTime) {
+ this.keepAliveTime = keepAliveTime;
+ }
+
+ public Integer getMaxQueueSize() {
+ return maxQueueSize;
+ }
+
+ public void setMaxQueueSize(Integer maxQueueSize) {
+ this.maxQueueSize = maxQueueSize;
+ }
+
+ public Integer getQueueSizeRejectionThreshold() {
+ return queueSizeRejectionThreshold;
+ }
+
+ public void setQueueSizeRejectionThreshold(Integer queueSizeRejectionThreshold) {
+ this.queueSizeRejectionThreshold = queueSizeRejectionThreshold;
+ }
+
+ public Integer getThreadPoolRollingNumberStatisticalWindowInMilliseconds() {
+ return threadPoolRollingNumberStatisticalWindowInMilliseconds;
+ }
+
+ public void setThreadPoolRollingNumberStatisticalWindowInMilliseconds(Integer threadPoolRollingNumberStatisticalWindowInMilliseconds) {
+ this.threadPoolRollingNumberStatisticalWindowInMilliseconds = threadPoolRollingNumberStatisticalWindowInMilliseconds;
+ }
+
+ public Integer getThreadPoolRollingNumberStatisticalWindowBuckets() {
+ return threadPoolRollingNumberStatisticalWindowBuckets;
+ }
+
+ public void setThreadPoolRollingNumberStatisticalWindowBuckets(Integer threadPoolRollingNumberStatisticalWindowBuckets) {
+ this.threadPoolRollingNumberStatisticalWindowBuckets = threadPoolRollingNumberStatisticalWindowBuckets;
+ }
+
+
+ // Fluent API
+ // -------------------------------------------------------------------------
+
+ /**
+ * Whether to use a HystrixCircuitBreaker or not. If false no circuit-breaker logic will be used and all requests permitted.
+ * <p>
+ * This is similar in effect to circuitBreakerForceClosed() except that continues tracking metrics and knowing whether it
+ * should be open/closed, this property results in not even instantiating a circuit-breaker.
+ */
+ public HystrixConfigurationDefinition circuitBreakerEnabled(Boolean circuitBreakerEnabled) {
+ this.circuitBreakerEnabled = circuitBreakerEnabled;
+ return this;
+ }
+
+ /**
+ * Error percentage threshold (as whole number such as 50) at which point the circuit breaker will trip open and reject requests.
+ * <p>
+ * It will stay tripped for the duration defined in circuitBreakerSleepWindowInMilliseconds;
+ * <p>
+ * The error percentage this is compared against comes from HystrixCommandMetrics.getHealthCounts().
+ */
+ public HystrixConfigurationDefinition circuitBreakerErrorThresholdPercentage(Integer circuitBreakerErrorThresholdPercentage) {
+ this.circuitBreakerErrorThresholdPercentage = circuitBreakerErrorThresholdPercentage;
+ return this;
+ }
+
+ /**
+ * If true the HystrixCircuitBreaker#allowRequest() will always return true to allow requests regardless of
+ * the error percentage from HystrixCommandMetrics.getHealthCounts().
+ * <p>
+ * The circuitBreakerForceOpen() property takes precedence so if it set to true this property does nothing.
+ */
+ public HystrixConfigurationDefinition circuitBreakerForceClosed(Boolean circuitBreakerForceClosed) {
+ this.circuitBreakerForceClosed = circuitBreakerForceClosed;
+ return this;
+ }
+
+ /**
+ * If true the HystrixCircuitBreaker.allowRequest() will always return false, causing the circuit to be open (tripped) and reject all requests.
+ * <p>
+ * This property takes precedence over circuitBreakerForceClosed();
+ */
+ public HystrixConfigurationDefinition circuitBreakerForceOpen(Boolean circuitBreakerForceOpen) {
+ this.circuitBreakerForceOpen = circuitBreakerForceOpen;
+ return this;
+ }
+
+ /**
+ * Minimum number of requests in the metricsRollingStatisticalWindowInMilliseconds() that must exist before the HystrixCircuitBreaker will trip.
+ * <p>
+ * If below this number the circuit will not trip regardless of error percentage.
+ */
+ public HystrixConfigurationDefinition circuitBreakerRequestVolumeThreshold(Integer circuitBreakerRequestVolumeThreshold) {
+ this.circuitBreakerRequestVolumeThreshold = circuitBreakerRequestVolumeThreshold;
+ return this;
+ }
+
+ /**
+ * The time in milliseconds after a HystrixCircuitBreaker trips open that it should wait before trying requests again.
+ */
+ public HystrixConfigurationDefinition circuitBreakerSleepWindowInMilliseconds(Integer circuitBreakerSleepWindowInMilliseconds) {
+ this.circuitBreakerSleepWindowInMilliseconds = circuitBreakerSleepWindowInMilliseconds;
+ return this;
+ }
+
+ /**
+ * Number of concurrent requests permitted to HystrixCommand.run(). Requests beyond the concurrent limit will be rejected.
+ * <p>
+ * Applicable only when executionIsolationStrategy == SEMAPHORE.
+ */
+ public HystrixConfigurationDefinition executionIsolationSemaphoreMaxConcurrentRequests(Integer executionIsolationSemaphoreMaxConcurrentRequests) {
+ this.executionIsolationSemaphoreMaxConcurrentRequests = executionIsolationSemaphoreMaxConcurrentRequests;
+ return this;
+ }
+
+ /**
+ * What isolation strategy HystrixCommand.run() will be executed with.
+ * <p>
+ * If THREAD then it will be executed on a separate thread and concurrent requests limited by the number of threads in the thread-pool.
+ * <p>
+ * If SEMAPHORE then it will be executed on the calling thread and concurrent requests limited by the semaphore count.
+ */
+ public HystrixConfigurationDefinition executionIsolationStrategy(String executionIsolationStrategy) {
+ this.executionIsolationStrategy = executionIsolationStrategy;
+ return this;
+ }
+
+ /**
+ * Whether the execution thread should attempt an interrupt (using {@link Future#cancel}) when a thread times out.
+ * <p>
+ * Applicable only when executionIsolationStrategy() == THREAD.
+ */
+ public HystrixConfigurationDefinition executionIsolationThreadInterruptOnTimeout(Boolean executionIsolationThreadInterruptOnTimeout) {
+ this.executionIsolationThreadInterruptOnTimeout = executionIsolationThreadInterruptOnTimeout;
+ return this;
+ }
+
+ /**
+ * Time in milliseconds at which point the command will timeout and halt execution.
+ * <p>
+ * If {@link #executionIsolationThreadInterruptOnTimeout} == true and the command is thread-isolated, the executing thread will be interrupted.
+ * If the command is semaphore-isolated and a HystrixObservableCommand, that command will get unsubscribed.
+ */
+ public HystrixConfigurationDefinition executionTimeoutInMilliseconds(Integer executionTimeoutInMilliseconds) {
+ this.executionTimeoutInMilliseconds = executionTimeoutInMilliseconds;
+ return this;
+ }
+
+ /**
+ * Whether the timeout mechanism is enabled for this command
+ */
+ public HystrixConfigurationDefinition executionTimeoutEnabled(Boolean executionTimeoutEnabled) {
+ this.executionTimeoutEnabled = executionTimeoutEnabled;
+ return this;
+ }
+
+ /**
+ * Number of concurrent requests permitted to HystrixCommand.getFallback().
+ * Requests beyond the concurrent limit will fail-fast and not attempt retrieving a fallback.
+ */
+ public HystrixConfigurationDefinition fallbackIsolationSemaphoreMaxConcurrentRequests(Integer fallbackIsolationSemaphoreMaxConcurrentRequests) {
+ this.fallbackIsolationSemaphoreMaxConcurrentRequests = fallbackIsolationSemaphoreMaxConcurrentRequests;
+ return this;
+ }
+
+ /**
+ * Whether HystrixCommand.getFallback() should be attempted when failure occurs.
+ */
+ public HystrixConfigurationDefinition fallbackEnabled(Boolean fallbackEnabled) {
+ this.fallbackEnabled = fallbackEnabled;
+ return this;
+ }
+
+ /**
+ * Time in milliseconds to wait between allowing health snapshots to be taken that calculate success and error
+ * percentages and affect HystrixCircuitBreaker.isOpen() status.
+ * <p>
+ * On high-volume circuits the continual calculation of error percentage can become CPU intensive thus this controls how often it is calculated.
+ */
+ public HystrixConfigurationDefinition metricsHealthSnapshotIntervalInMilliseconds(Integer metricsHealthSnapshotIntervalInMilliseconds) {
+ this.metricsHealthSnapshotIntervalInMilliseconds = metricsHealthSnapshotIntervalInMilliseconds;
+ return this;
+ }
+
+ /**
+ * Maximum number of values stored in each bucket of the rolling percentile.
+ * This is passed into HystrixRollingPercentile inside HystrixCommandMetrics.
+ */
+ public HystrixConfigurationDefinition metricsRollingPercentileBucketSize(Integer metricsRollingPercentileBucketSize) {
+ this.metricsRollingPercentileBucketSize = metricsRollingPercentileBucketSize;
+ return this;
+ }
+
+ /**
+ * Whether percentile metrics should be captured using HystrixRollingPercentile inside HystrixCommandMetrics.
+ */
+ public HystrixConfigurationDefinition metricsRollingPercentileEnabled(Boolean metricsRollingPercentileEnabled) {
+ this.metricsRollingPercentileEnabled = metricsRollingPercentileEnabled;
+ return this;
+ }
+
+ /**
+ * Duration of percentile rolling window in milliseconds.
+ * This is passed into HystrixRollingPercentile inside HystrixCommandMetrics.
+ */
+ public HystrixConfigurationDefinition metricsRollingPercentileWindowInMilliseconds(Integer metricsRollingPercentileWindowInMilliseconds) {
+ this.metricsRollingPercentileWindowInMilliseconds = metricsRollingPercentileWindowInMilliseconds;
+ return this;
+ }
+
+ /**
+ * Number of buckets the rolling percentile window is broken into.
+ * This is passed into HystrixRollingPercentile inside HystrixCommandMetrics.
+ */
+ public HystrixConfigurationDefinition metricsRollingPercentileWindowBuckets(Integer metricsRollingPercentileWindowBuckets) {
+ this.metricsRollingPercentileWindowBuckets = metricsRollingPercentileWindowBuckets;
+ return this;
+ }
+
+ /**
+ * Duration of statistical rolling window in milliseconds.
+ * This is passed into HystrixRollingNumber inside HystrixCommandMetrics.
+ */
+ public HystrixConfigurationDefinition metricsRollingStatisticalWindowInMilliseconds(Integer metricsRollingStatisticalWindowInMilliseconds) {
+ this.metricsRollingStatisticalWindowInMilliseconds = metricsRollingStatisticalWindowInMilliseconds;
+ return this;
+ }
+
+ /**
+ * Number of buckets the rolling statistical window is broken into.
+ * This is passed into HystrixRollingNumber inside HystrixCommandMetrics.
+ */
+ public HystrixConfigurationDefinition metricsRollingStatisticalWindowBuckets(Integer metricsRollingStatisticalWindowBuckets) {
+ this.metricsRollingStatisticalWindowBuckets = metricsRollingStatisticalWindowBuckets;
+ return this;
+ }
+
+ /**
+ * Whether HystrixCommand.getCacheKey() should be used with HystrixRequestCache
+ * to provide de-duplication functionality via request-scoped caching.
+ */
+ public HystrixConfigurationDefinition requestCacheEnabled(Boolean requestCacheEnabled) {
+ this.requestCacheEnabled = requestCacheEnabled;
+ return this;
+ }
+
+ /**
+ * Whether HystrixCommand execution and events should be logged to HystrixRequestLog.
+ */
+ public HystrixConfigurationDefinition requestLogEnabled(Boolean requestLogEnabled) {
+ this.requestLogEnabled = requestLogEnabled;
+ return this;
+ }
+
+ /**
+ * Core thread-pool size that gets passed to {@link java.util.concurrent.ThreadPoolExecutor#setCorePoolSize(int)}
+ */
+ public HystrixConfigurationDefinition corePoolSize(Integer corePoolSize) {
+ this.corePoolSize = corePoolSize;
+ return this;
+ }
+
+ /**
+ * Keep-alive time in minutes that gets passed to {@link ThreadPoolExecutor#setKeepAliveTime(long, TimeUnit)}
+ */
+ public HystrixConfigurationDefinition keepAliveTime(Integer keepAliveTime) {
+ this.keepAliveTime = keepAliveTime;
+ return this;
+ }
+
+ /**
+ * Max queue size that gets passed to {@link BlockingQueue} in HystrixConcurrencyStrategy.getBlockingQueue(int)
+ *
+ * This should only affect the instantiation of a threadpool - it is not eliglible to change a queue size on the fly.
+ * For that, use queueSizeRejectionThreshold().
+ */
+ public HystrixConfigurationDefinition maxQueueSize(Integer maxQueueSize) {
+ this.maxQueueSize = maxQueueSize;
+ return this;
+ }
+
+ /**
+ * Queue size rejection threshold is an artificial "max" size at which rejections will occur even
+ * if {@link #maxQueueSize} has not been reached. This is done because the {@link #maxQueueSize}
+ * of a {@link BlockingQueue} can not be dynamically changed and we want to support dynamically
+ * changing the queue size that affects rejections.
+ * <p>
+ * This is used by HystrixCommand when queuing a thread for execution.
+ */
+ public HystrixConfigurationDefinition queueSizeRejectionThreshold(Integer queueSizeRejectionThreshold) {
+ this.queueSizeRejectionThreshold = queueSizeRejectionThreshold;
+ return this;
+ }
+
+ /**
+ * Duration of statistical rolling window in milliseconds.
+ * This is passed into HystrixRollingNumber inside each HystrixThreadPoolMetrics instance.
+ */
+ public HystrixConfigurationDefinition threadPoolRollingNumberStatisticalWindowInMilliseconds(Integer threadPoolRollingNumberStatisticalWindowInMilliseconds) {
+ this.threadPoolRollingNumberStatisticalWindowInMilliseconds = threadPoolRollingNumberStatisticalWindowInMilliseconds;
+ return this;
+ }
+
+ /**
+ * Number of buckets the rolling statistical window is broken into.
+ * This is passed into HystrixRollingNumber inside each HystrixThreadPoolMetrics instance.
+ */
+ public HystrixConfigurationDefinition threadPoolRollingNumberStatisticalWindowBuckets(Integer threadPoolRollingNumberStatisticalWindowBuckets) {
+ this.threadPoolRollingNumberStatisticalWindowBuckets = threadPoolRollingNumberStatisticalWindowBuckets;
+ return this;
+ }
+
+ /**
+ * End of configuration
+ */
+ public HystrixDefinition end() {
+ return parent;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/3dd001e4/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java b/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java
index c8abd53..171da30 100644
--- a/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java
@@ -16,10 +16,13 @@
*/
package org.apache.camel.model;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlElementRef;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.camel.Processor;
@@ -32,6 +35,10 @@ import org.apache.camel.spi.RouteContext;
public class HystrixDefinition extends OutputDefinition<HystrixDefinition> {
@XmlElement
+ private HystrixConfigurationDefinition hystrixConfiguration;
+ @XmlElementRef
+ protected List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>();
+ @XmlElement
private FallbackDefinition fallback;
public HystrixDefinition() {
@@ -52,6 +59,23 @@ public class HystrixDefinition extends OutputDefinition<HystrixDefinition> {
throw new IllegalStateException("Cannot find camel-hystrix on the classpath.");
}
+ public List<ProcessorDefinition<?>> getOutputs() {
+ return outputs;
+ }
+
+ public boolean isOutputSupported() {
+ return true;
+ }
+
+ public void setOutputs(List<ProcessorDefinition<?>> outputs) {
+ this.outputs = outputs;
+ if (outputs != null) {
+ for (ProcessorDefinition<?> output : outputs) {
+ configureChild(output);
+ }
+ }
+ }
+
@Override
public void addOutput(ProcessorDefinition<?> output) {
if (output instanceof FallbackDefinition) {
@@ -74,14 +98,6 @@ public class HystrixDefinition extends OutputDefinition<HystrixDefinition> {
return super.end();
}
- public FallbackDefinition getFallback() {
- return fallback;
- }
-
- public void setFallback(FallbackDefinition fallback) {
- this.fallback = fallback;
- }
-
protected void preCreateProcessor() {
// move the fallback from outputs to fallback which we need to ensure
// such as when using the XML DSL
@@ -95,13 +111,30 @@ public class HystrixDefinition extends OutputDefinition<HystrixDefinition> {
}
}
+ // Getter/Setter
+ // -------------------------------------------------------------------------
+
+ public FallbackDefinition getFallback() {
+ return fallback;
+ }
+
+ public void setFallback(FallbackDefinition fallback) {
+ this.fallback = fallback;
+ }
+
+ public HystrixConfigurationDefinition getHystrixConfiguration() {
+ return hystrixConfiguration;
+ }
+
+ public void setHystrixConfiguration(HystrixConfigurationDefinition hystrixConfiguration) {
+ this.hystrixConfiguration = hystrixConfiguration;
+ }
+
// Fluent API
// -------------------------------------------------------------------------
/**
- * Sets the otherwise node
- *
- * @return the builder
+ * Sets the fallback node
*/
public HystrixDefinition fallback() {
fallback = new FallbackDefinition();
@@ -109,4 +142,14 @@ public class HystrixDefinition extends OutputDefinition<HystrixDefinition> {
return this;
}
+ /**
+ * Configures the Hystrix EIP
+ * <p/>
+ * Use <tt>end</tt> when configuration is complete, to return back to the Hystrix EIP.
+ */
+ public HystrixConfigurationDefinition configure() {
+ hystrixConfiguration = new HystrixConfigurationDefinition(this);
+ return hystrixConfiguration;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3dd001e4/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
----------------------------------------------------------------------
diff --git a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
index 1d80346..9315335 100644
--- a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
+++ b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
@@ -32,6 +32,7 @@ FilterDefinition
FinallyDefinition
FromDefinition
HystrixDefinition
+HystrixConfigurationDefinition
IdempotentConsumerDefinition
InOnlyDefinition
InOutDefinition
http://git-wip-us.apache.org/repos/asf/camel/blob/3dd001e4/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
index c3b272d..3400af8 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
@@ -19,7 +19,7 @@ package org.apache.camel.component.hystrix.processor;
import java.util.ArrayList;
import java.util.List;
-import com.netflix.hystrix.HystrixCommandGroupKey;
+import com.netflix.hystrix.HystrixCommand;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
@@ -36,11 +36,13 @@ import org.apache.camel.util.AsyncProcessorHelper;
public class HystrixProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, org.apache.camel.Traceable, IdAware {
private String id;
+ private final HystrixCommand.Setter setter;
private final AsyncProcessor processor;
private final AsyncProcessor fallback;
- public HystrixProcessor(String id, Processor processor, Processor fallback) {
+ public HystrixProcessor(String id, HystrixCommand.Setter setter, Processor processor, Processor fallback) {
this.id = id;
+ this.setter = setter;
this.processor = AsyncProcessorConverterHelper.convert(processor);
this.fallback = AsyncProcessorConverterHelper.convert(fallback);
}
@@ -85,8 +87,7 @@ public class HystrixProcessor extends ServiceSupport implements AsyncProcessor,
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
- HystrixCommandGroupKey key = HystrixCommandGroupKey.Factory.asKey(id);
- HystrixProcessorCommand command = new HystrixProcessorCommand(key, exchange, callback, processor, fallback);
+ HystrixProcessorCommand command = new HystrixProcessorCommand(setter, exchange, callback, processor, fallback);
try {
command.queue();
} catch (Throwable e) {
http://git-wip-us.apache.org/repos/asf/camel/blob/3dd001e4/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
index 16c0645..c728d6a 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
@@ -35,9 +35,8 @@ public class HystrixProcessorCommand extends HystrixCommand<Exchange> {
private final AsyncProcessor processor;
private final AsyncProcessor fallback;
- public HystrixProcessorCommand(HystrixCommandGroupKey group, Exchange exchange, AsyncCallback callback,
- AsyncProcessor processor, AsyncProcessor fallback) {
- super(group);
+ public HystrixProcessorCommand(Setter setter, Exchange exchange, AsyncCallback callback, AsyncProcessor processor, AsyncProcessor fallback) {
+ super(setter);
this.exchange = exchange;
this.callback = callback;
this.processor = processor;
http://git-wip-us.apache.org/repos/asf/camel/blob/3dd001e4/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java
index b6d3a80..8e4a7f1 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java
@@ -16,7 +16,12 @@
*/
package org.apache.camel.component.hystrix.processor;
+import com.netflix.hystrix.HystrixCommand;
+import com.netflix.hystrix.HystrixCommandGroupKey;
+import com.netflix.hystrix.HystrixCommandProperties;
+import com.netflix.hystrix.HystrixThreadPoolProperties;
import org.apache.camel.Processor;
+import org.apache.camel.model.HystrixConfigurationDefinition;
import org.apache.camel.model.HystrixDefinition;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.spi.ProcessorFactory;
@@ -46,7 +51,106 @@ public class HystrixProcessorFactory implements ProcessorFactory {
fallback = cb.getFallback().createProcessor(routeContext);
}
- return new HystrixProcessor(id, processor, fallback);
+ // create setter using the default options
+ HystrixCommand.Setter setter = HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(id));
+ HystrixCommandProperties.Setter command = HystrixCommandProperties.Setter();
+ setter.andCommandPropertiesDefaults(command);
+ HystrixThreadPoolProperties.Setter threadPool = HystrixThreadPoolProperties.Setter();
+ setter.andThreadPoolPropertiesDefaults(threadPool);
+
+ // any custom configuration then override the setter
+ if (cb.getHystrixConfiguration() != null) {
+ HystrixConfigurationDefinition config = cb.getHystrixConfiguration();
+
+ // command
+ if (config.getCircuitBreakerEnabled() != null) {
+ command.withCircuitBreakerEnabled(config.getCircuitBreakerEnabled());
+ }
+ if (config.getCircuitBreakerErrorThresholdPercentage() != null) {
+ command.withCircuitBreakerErrorThresholdPercentage(config.getCircuitBreakerErrorThresholdPercentage());
+ }
+ if (config.getCircuitBreakerForceClosed() != null) {
+ command.withCircuitBreakerForceClosed(config.getCircuitBreakerForceClosed());
+ }
+ if (config.getCircuitBreakerForceOpen() != null) {
+ command.withCircuitBreakerForceOpen(config.getCircuitBreakerForceOpen());
+ }
+ if (config.getCircuitBreakerRequestVolumeThreshold() != null) {
+ command.withCircuitBreakerRequestVolumeThreshold(config.getCircuitBreakerRequestVolumeThreshold());
+ }
+ if (config.getCircuitBreakerSleepWindowInMilliseconds() != null) {
+ command.withCircuitBreakerSleepWindowInMilliseconds(config.getCircuitBreakerSleepWindowInMilliseconds());
+ }
+ if (config.getExecutionIsolationSemaphoreMaxConcurrentRequests() != null) {
+ command.withExecutionIsolationSemaphoreMaxConcurrentRequests(config.getExecutionIsolationSemaphoreMaxConcurrentRequests());
+ }
+ if (config.getExecutionIsolationStrategy() != null) {
+ command.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.valueOf(config.getExecutionIsolationStrategy()));
+ }
+ if (config.getExecutionIsolationThreadInterruptOnTimeout() != null) {
+ command.withExecutionIsolationThreadInterruptOnTimeout(config.getExecutionIsolationThreadInterruptOnTimeout());
+ }
+ if (config.getExecutionTimeoutInMilliseconds() != null) {
+ command.withExecutionTimeoutInMilliseconds(config.getExecutionTimeoutInMilliseconds());
+ }
+ if (config.getExecutionTimeoutEnabled() != null) {
+ command.withExecutionTimeoutEnabled(config.getExecutionTimeoutEnabled());
+ }
+ if (config.getFallbackIsolationSemaphoreMaxConcurrentRequests() != null) {
+ command.withFallbackIsolationSemaphoreMaxConcurrentRequests(config.getFallbackIsolationSemaphoreMaxConcurrentRequests());
+ }
+ if (config.getFallbackEnabled() != null) {
+ command.withFallbackEnabled(config.getFallbackEnabled());
+ }
+ if (config.getMetricsHealthSnapshotIntervalInMilliseconds() != null) {
+ command.withMetricsHealthSnapshotIntervalInMilliseconds(config.getMetricsHealthSnapshotIntervalInMilliseconds());
+ }
+ if (config.getMetricsRollingPercentileBucketSize() != null) {
+ command.withMetricsRollingPercentileBucketSize(config.getMetricsRollingPercentileBucketSize());
+ }
+ if (config.getMetricsRollingPercentileEnabled() != null) {
+ command.withMetricsRollingPercentileEnabled(config.getMetricsRollingPercentileEnabled());
+ }
+ if (config.getMetricsRollingPercentileWindowInMilliseconds() != null) {
+ command.withMetricsRollingPercentileWindowInMilliseconds(config.getMetricsRollingPercentileWindowInMilliseconds());
+ }
+ if (config.getMetricsRollingPercentileWindowBuckets() != null) {
+ command.withMetricsRollingPercentileWindowBuckets(config.getMetricsRollingPercentileWindowBuckets());
+ }
+ if (config.getMetricsRollingStatisticalWindowInMilliseconds() != null) {
+ command.withMetricsRollingStatisticalWindowInMilliseconds(config.getMetricsRollingStatisticalWindowInMilliseconds());
+ }
+ if (config.getMetricsRollingStatisticalWindowBuckets() != null) {
+ command.withMetricsRollingStatisticalWindowBuckets(config.getMetricsRollingStatisticalWindowBuckets());
+ }
+ if (config.getRequestCacheEnabled() != null) {
+ command.withRequestCacheEnabled(config.getRequestCacheEnabled());
+ }
+ if (config.getRequestLogEnabled() != null) {
+ command.withRequestLogEnabled(config.getRequestLogEnabled());
+ }
+ // thread pool
+ if (config.getCorePoolSize() != null) {
+ threadPool.withCoreSize(config.getCorePoolSize());
+ }
+ if (config.getKeepAliveTime() != null) {
+ threadPool.withKeepAliveTimeMinutes(config.getKeepAliveTime());
+ }
+ if (config.getMaxQueueSize() != null) {
+ threadPool.withMaxQueueSize(config.getMaxQueueSize());
+ }
+ if (config.getQueueSizeRejectionThreshold() != null) {
+ threadPool.withQueueSizeRejectionThreshold(config.getQueueSizeRejectionThreshold());
+ }
+ if (config.getThreadPoolRollingNumberStatisticalWindowInMilliseconds() != null) {
+ threadPool.withMetricsRollingStatisticalWindowInMilliseconds(config.getThreadPoolRollingNumberStatisticalWindowInMilliseconds());
+ }
+ if (config.getThreadPoolRollingNumberStatisticalWindowBuckets() != null) {
+ threadPool.withMetricsRollingStatisticalWindowBuckets(config.getThreadPoolRollingNumberStatisticalWindowBuckets());
+ }
+ }
+
+ return new HystrixProcessor(id, setter, processor, fallback);
} else {
return null;
}