You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/01/23 07:23:12 UTC
[camel] 01/02: CAMEL-14426: camel-core - Optimize inflight
repository
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4e945fa44153ca970d4f3d0782bfee3b764c5973
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Jan 23 08:02:00 2020 +0100
CAMEL-14426: camel-core - Optimize inflight repository
---
.../camel/blueprint/CamelContextFactoryBean.java | 11 +++++++
.../camel/cdi/xml/CamelContextFactoryBean.java | 12 ++++++++
.../camel/spring/CamelContextFactoryBean.java | 16 ++++++++++
.../org/apache/camel/spi/InflightRepository.java | 16 ++++++++++
.../impl/engine/DefaultInflightRepository.java | 34 ++++++++++++++++++++--
.../core/xml/AbstractCamelContextFactoryBean.java | 5 ++++
.../InflightRepositoryBrowseFromRouteTest.java | 8 +++++
.../camel/impl/InflightRepositoryBrowseTest.java | 8 +++++
.../camel/main/DefaultConfigurationConfigurer.java | 2 ++
.../camel/main/DefaultConfigurationProperties.java | 28 ++++++++++++++++++
.../camel-main-configuration-metadata.json | 8 ++++-
.../mbean/ManagedInflightRepositoryMBean.java | 3 ++
.../mbean/ManagedInflightRepository.java | 5 ++++
.../management/ManagedInflightRepositoryTest.java | 3 ++
.../management/ManagedInflightStatisticsTest.java | 8 +++++
15 files changed, 163 insertions(+), 4 deletions(-)
diff --git a/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/CamelContextFactoryBean.java b/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/CamelContextFactoryBean.java
index 12f04c3..4db65ae 100644
--- a/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/CamelContextFactoryBean.java
+++ b/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/CamelContextFactoryBean.java
@@ -140,6 +140,8 @@ public class CamelContextFactoryBean extends AbstractCamelContextFactoryBean<Blu
@XmlAttribute
private Boolean typeConverterStatisticsEnabled;
@XmlAttribute
+ private Boolean inflightRepositoryExchangeEnabled;
+ @XmlAttribute
private TypeConverterExists typeConverterExists;
@XmlAttribute
private LoggingLevel typeConverterExistsLoggingLevel;
@@ -494,6 +496,15 @@ public class CamelContextFactoryBean extends AbstractCamelContextFactoryBean<Blu
}
@Override
+ public Boolean getInflightRepositoryExchangeEnabled() {
+ return inflightRepositoryExchangeEnabled;
+ }
+
+ public void setInflightRepositoryExchangeEnabled(Boolean inflightRepositoryExchangeEnabled) {
+ this.inflightRepositoryExchangeEnabled = inflightRepositoryExchangeEnabled;
+ }
+
+ @Override
public TypeConverterExists getTypeConverterExists() {
return typeConverterExists;
}
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/xml/CamelContextFactoryBean.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/xml/CamelContextFactoryBean.java
index f271c76..797bc8b 100644
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/xml/CamelContextFactoryBean.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/xml/CamelContextFactoryBean.java
@@ -147,6 +147,9 @@ public class CamelContextFactoryBean extends AbstractCamelContextFactoryBean<Def
private Boolean typeConverterStatisticsEnabled;
@XmlAttribute
+ private Boolean inflightRepositoryExchangeEnabled;
+
+ @XmlAttribute
private TypeConverterExists typeConverterExists;
@XmlAttribute
@@ -757,6 +760,15 @@ public class CamelContextFactoryBean extends AbstractCamelContextFactoryBean<Def
}
@Override
+ public Boolean getInflightRepositoryExchangeEnabled() {
+ return inflightRepositoryExchangeEnabled;
+ }
+
+ public void setInflightRepositoryExchangeEnabled(Boolean inflightRepositoryExchangeEnabled) {
+ this.inflightRepositoryExchangeEnabled = inflightRepositoryExchangeEnabled;
+ }
+
+ @Override
public TypeConverterExists getTypeConverterExists() {
return typeConverterExists;
}
diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java b/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
index f82068f..7423b80 100644
--- a/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
+++ b/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
@@ -150,6 +150,8 @@ public class CamelContextFactoryBean extends AbstractCamelContextFactoryBean<Spr
private Boolean loadTypeConverters;
@XmlAttribute
private Boolean typeConverterStatisticsEnabled;
+ @XmlAttribute
+ private Boolean inflightRepositoryExchangeEnabled;
@XmlAttribute @Metadata(defaultValue = "Override")
private TypeConverterExists typeConverterExists;
@XmlAttribute @Metadata(defaultValue = "WARN")
@@ -920,6 +922,20 @@ public class CamelContextFactoryBean extends AbstractCamelContextFactoryBean<Spr
}
@Override
+ public Boolean getInflightRepositoryExchangeEnabled() {
+ return inflightRepositoryExchangeEnabled;
+ }
+
+ /**
+ * Sets whether the inflight repository should track each inflight exchange.
+ *
+ * This is by default disabled as there is a very slight performance overhead when enabled.
+ */
+ public void setInflightRepositoryExchangeEnabled(Boolean inflightRepositoryExchangeEnabled) {
+ this.inflightRepositoryExchangeEnabled = inflightRepositoryExchangeEnabled;
+ }
+
+ @Override
public String getManagementNamePattern() {
return managementNamePattern;
}
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InflightRepository.java b/core/camel-api/src/main/java/org/apache/camel/spi/InflightRepository.java
index 2c0f5ae..eb678ea 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/InflightRepository.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/InflightRepository.java
@@ -135,6 +135,22 @@ public interface InflightRepository extends StaticService {
int size(String routeId);
/**
+ * Whether tracking inflight exchanges is enabled. This is required to be enabled for the browse and oldest APIs to function.
+ *
+ * This is by default disabled as there is a very slight performance overhead when enabled.
+ */
+ boolean isInflightExchangeEnabled();
+
+ /**
+ * Whether tracking inflight exchanges is enabled. This is required to be enabled for the browse and oldest APIs to function.
+ *
+ * This is by default disabled as there is a very slight performance overhead when enabled.
+ *
+ * @param inflightExchangeEnabled whether tracking inflight exchanges is enabled
+ */
+ void setInflightExchangeEnabled(boolean inflightExchangeEnabled);
+
+ /**
* A <i>read-only</i> browser of the {@link InflightExchange}s that are currently inflight.
*/
Collection<InflightExchange> browse();
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java
index 4215e64..3ab887f 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java
@@ -44,17 +44,27 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh
private static final Logger LOG = LoggerFactory.getLogger(DefaultInflightRepository.class);
+ private final AtomicInteger size = new AtomicInteger();
private final ConcurrentMap<String, Exchange> inflight = new ConcurrentHashMap<>();
private final ConcurrentMap<String, AtomicInteger> routeCount = new ConcurrentHashMap<>();
+ private boolean inflightExchangeEnabled;
@Override
public void add(Exchange exchange) {
- inflight.put(exchange.getExchangeId(), exchange);
+ size.incrementAndGet();
+
+ if (inflightExchangeEnabled) {
+ inflight.put(exchange.getExchangeId(), exchange);
+ }
}
@Override
public void remove(Exchange exchange) {
- inflight.remove(exchange.getExchangeId());
+ size.decrementAndGet();
+
+ if (inflightExchangeEnabled) {
+ inflight.remove(exchange.getExchangeId());
+ }
}
@Override
@@ -75,7 +85,7 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh
@Override
public int size() {
- return inflight.size();
+ return size.get();
}
@Override
@@ -95,6 +105,16 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh
}
@Override
+ public boolean isInflightExchangeEnabled() {
+ return inflightExchangeEnabled;
+ }
+
+ @Override
+ public void setInflightExchangeEnabled(boolean inflightExchangeEnabled) {
+ this.inflightExchangeEnabled = inflightExchangeEnabled;
+ }
+
+ @Override
public Collection<InflightExchange> browse() {
return browse(null, -1, false);
}
@@ -111,6 +131,10 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh
@Override
public Collection<InflightExchange> browse(String fromRouteId, int limit, boolean sortByLongestDuration) {
+ if (!inflightExchangeEnabled) {
+ return Collections.emptyList();
+ }
+
Stream<Exchange> values;
if (fromRouteId == null) {
// all values
@@ -144,6 +168,10 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh
@Override
public InflightExchange oldest(String fromRouteId) {
+ if (!inflightExchangeEnabled) {
+ return null;
+ }
+
Stream<Exchange> values;
if (fromRouteId == null) {
diff --git a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
index 314ce92..e8165f5 100644
--- a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
+++ b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
@@ -824,6 +824,8 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex
public abstract Boolean getLoadTypeConverters();
+ public abstract Boolean getInflightRepositoryExchangeEnabled();
+
public abstract Boolean getTypeConverterStatisticsEnabled();
public abstract LoggingLevel getTypeConverterExistsLoggingLevel();
@@ -961,6 +963,9 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex
if (getTypeConverterStatisticsEnabled() != null) {
context.setTypeConverterStatisticsEnabled(getTypeConverterStatisticsEnabled());
}
+ if (getInflightRepositoryExchangeEnabled() != null) {
+ context.getInflightRepository().setInflightExchangeEnabled(getInflightRepositoryExchangeEnabled());
+ }
if (getTypeConverterExists() != null) {
context.getTypeConverterRegistry().setTypeConverterExists(getTypeConverterExists());
}
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseFromRouteTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseFromRouteTest.java
index 7af2178..2772200 100644
--- a/core/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseFromRouteTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseFromRouteTest.java
@@ -18,6 +18,7 @@ package org.apache.camel.impl;
import java.util.Collection;
+import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -27,6 +28,13 @@ import org.junit.Test;
public class InflightRepositoryBrowseFromRouteTest extends ContextTestSupport {
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext context = super.createCamelContext();
+ context.getInflightRepository().setInflightExchangeEnabled(true);
+ return context;
+ }
+
@Test
public void testInflight() throws Exception {
assertEquals(0, context.getInflightRepository().browse().size());
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java
index 13f15cc..7807fbe 100644
--- a/core/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java
@@ -18,6 +18,7 @@ package org.apache.camel.impl;
import java.util.Collection;
+import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -27,6 +28,13 @@ import org.junit.Test;
public class InflightRepositoryBrowseTest extends ContextTestSupport {
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext context = super.createCamelContext();
+ context.getInflightRepository().setInflightExchangeEnabled(true);
+ return context;
+ }
+
@Test
public void testInflight() throws Exception {
assertEquals(0, context.getInflightRepository().browse().size());
diff --git a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
index 9b21acc..6b25e36 100644
--- a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
+++ b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
@@ -107,6 +107,8 @@ public final class DefaultConfigurationConfigurer {
camelContext.getShutdownStrategy().setShutdownRoutesInReverseOrder(config.isShutdownRoutesInReverseOrder());
camelContext.getShutdownStrategy().setLogInflightExchangesOnTimeout(config.isShutdownLogInflightExchangesOnTimeout());
+ camelContext.getInflightRepository().setInflightExchangeEnabled(config.isInflightRepositoryExchangeEnabled());
+
if (config.getLogDebugMaxChars() != 0) {
camelContext.getGlobalOptions().put(Exchange.LOG_DEBUG_BODY_MAX_CHARS, "" + config.getLogDebugMaxChars());
}
diff --git a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java
index a91b6c5..e938314 100644
--- a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java
+++ b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java
@@ -34,6 +34,7 @@ public abstract class DefaultConfigurationProperties<T> {
private boolean shutdownNowOnTimeout = true;
private boolean shutdownRoutesInReverseOrder = true;
private boolean shutdownLogInflightExchangesOnTimeout = true;
+ private boolean inflightRepositoryExchangeEnabled;
private String fileConfigurations;
private boolean jmxEnabled = true;
private int producerTemplateCacheSize = 1000;
@@ -190,11 +191,26 @@ public abstract class DefaultConfigurationProperties<T> {
/**
* Sets whether to log information about the inflight Exchanges which are still running
* during a shutdown which didn't complete without the given timeout.
+ *
+ * This requires to enable the option inflightRepositoryExchangeEnabled.
*/
public void setShutdownLogInflightExchangesOnTimeout(boolean shutdownLogInflightExchangesOnTimeout) {
this.shutdownLogInflightExchangesOnTimeout = shutdownLogInflightExchangesOnTimeout;
}
+ public boolean isInflightRepositoryExchangeEnabled() {
+ return inflightRepositoryExchangeEnabled;
+ }
+
+ /**
+ * Sets whether the inflight repository should track each inflight exchange.
+ *
+ * This is by default disabled as there is a very slight performance overhead when enabled.
+ */
+ public void setInflightRepositoryExchangeEnabled(boolean inflightRepositoryExchangeEnabled) {
+ this.inflightRepositoryExchangeEnabled = inflightRepositoryExchangeEnabled;
+ }
+
public String getFileConfigurations() {
return fileConfigurations;
}
@@ -937,6 +953,8 @@ public abstract class DefaultConfigurationProperties<T> {
/**
* Sets whether to log information about the inflight Exchanges which are still running
* during a shutdown which didn't complete without the given timeout.
+ *
+ * This requires to enable the option inflightRepositoryExchangeEnabled.
*/
public T withShutdownLogInflightExchangesOnTimeout(boolean shutdownLogInflightExchangesOnTimeout) {
this.shutdownLogInflightExchangesOnTimeout = shutdownLogInflightExchangesOnTimeout;
@@ -944,6 +962,16 @@ public abstract class DefaultConfigurationProperties<T> {
}
/**
+ * Sets whether the inflight repository should track each inflight exchange.
+ *
+ * This is by default disabled as there is a very slight performance overhead when enabled.
+ */
+ public T withInflightRepositoryExchangeEnabled(boolean inflightRepositoryExchangeEnabled) {
+ this.inflightRepositoryExchangeEnabled = inflightRepositoryExchangeEnabled;
+ return (T) this;
+ }
+
+ /**
* Directory to load additional configuration files that contains
* configuration values that takes precedence over any other configuration.
* This can be used to refer to files that may have secret configuration that
diff --git a/core/camel-main/src/main/resources/META-INF/camel-main-configuration-metadata.json b/core/camel-main/src/main/resources/META-INF/camel-main-configuration-metadata.json
index 29d8e30..3637664 100644
--- a/core/camel-main/src/main/resources/META-INF/camel-main-configuration-metadata.json
+++ b/core/camel-main/src/main/resources/META-INF/camel-main-configuration-metadata.json
@@ -176,6 +176,12 @@
"defaultValue":true
},
{
+ "name":"camel.main.inflight-repository-exchange-enabled",
+ "type":"boolean",
+ "sourceType":"org.apache.camel.main.DefaultConfigurationProperties",
+ "description":"Sets whether the inflight repository should track each inflight exchange. This is by default disabled as there is a very slight performance overhead when enabled."
+ },
+ {
"name":"camel.main.java-routes-exclude-pattern",
"type":"java.lang.String",
"sourceType":"org.apache.camel.main.DefaultConfigurationProperties",
@@ -293,7 +299,7 @@
"name":"camel.main.shutdown-log-inflight-exchanges-on-timeout",
"type":"boolean",
"sourceType":"org.apache.camel.main.DefaultConfigurationProperties",
- "description":"Sets whether to log information about the inflight Exchanges which are still running during a shutdown which didn't complete without the given timeout.",
+ "description":"Sets whether to log information about the inflight Exchanges which are still running during a shutdown which didn't complete without the given timeout. This requires to enable the option inflightRepositoryExchangeEnabled.",
"defaultValue":true
},
{
diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java
index f3845d7..260cec1 100644
--- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java
+++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java
@@ -26,6 +26,9 @@ public interface ManagedInflightRepositoryMBean extends ManagedServiceMBean {
@ManagedAttribute(description = "Current size of inflight exchanges.")
int getSize();
+ @ManagedAttribute(description = "Whether tracking inflight exchanges is enabled. This is required to be enabled for the browse operations to function.")
+ boolean isInflightExchangeEnabled();
+
@ManagedOperation(description = "Current size of inflight exchanges which are from the given route.")
int size(String routeId);
diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java
index 7afa7c8..2891cd3 100644
--- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java
+++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java
@@ -54,6 +54,11 @@ public class ManagedInflightRepository extends ManagedService implements Managed
}
@Override
+ public boolean isInflightExchangeEnabled() {
+ return inflightRepository.isInflightExchangeEnabled();
+ }
+
+ @Override
public int size(String routeId) {
return inflightRepository.size(routeId);
}
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedInflightRepositoryTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedInflightRepositoryTest.java
index cfaa7f8..b0166b8 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedInflightRepositoryTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedInflightRepositoryTest.java
@@ -20,6 +20,7 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.openmbean.TabularData;
+import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.junit.Test;
@@ -44,6 +45,8 @@ public class ManagedInflightRepositoryTest extends ManagementTestSupport {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
+ context.getInflightRepository().setInflightExchangeEnabled(true);
+
from("direct:start").routeId("foo")
.to("mock:a")
.process(exchange -> {
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java
index 927742b..bae32e9 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Test;
@@ -32,6 +33,13 @@ import static org.awaitility.Awaitility.await;
public class ManagedInflightStatisticsTest extends ManagementTestSupport {
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext context = super.createCamelContext();
+ context.getInflightRepository().setInflightExchangeEnabled(true);
+ return context;
+ }
+
@Test
public void testOldestInflight() throws Exception {
// JMX tests dont work well on AIX CI servers (hangs them)