You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/05/12 09:29:36 UTC
[camel] 01/02: CAMEL-18087: camel-core - Enable stream caching by default. (#7586)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 893b8cde2457642c2c55592253f1468dc006e89c
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu May 12 11:28:37 2022 +0200
CAMEL-18087: camel-core - Enable stream caching by default. (#7586)
---
.../netty/http/DefaultNettyHttpBinding.java | 3 +++
.../component/netty/http/NettyHttpProducer.java | 4 +++
.../NettyHttpStreamDisabledStreamCachingTest.java | 31 ++++++++++++++++++++++
.../java/org/apache/camel/ExtendedExchange.java | 10 +++++++
.../camel/impl/engine/CamelInternalProcessor.java | 4 +++
.../org/apache/camel/support/AbstractExchange.java | 12 +++++++++
.../camel/support/DefaultPooledExchange.java | 1 +
7 files changed, 65 insertions(+)
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
index 2819d3d5290..b536ed23336 100644
--- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
@@ -104,6 +104,7 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable {
// for proxy use case pass the request body buffer directly to the response to avoid additional processing
// we need to retain it so that the request can be released and we can keep the content
answer.setBody(request.content().retain());
+ answer.getExchange().adapt(ExtendedExchange.class).setStreamCacheDisabled(true);
exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() {
@Override
public void onDone(Exchange exchange) {
@@ -321,6 +322,8 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable {
if (configuration.isDisableStreamCache() || configuration.isHttpProxy()) {
// keep the body as is, and use type converters
answer.setBody(response.content());
+ // turn off stream cache as we use the raw body as-is
+ answer.getExchange().adapt(ExtendedExchange.class).setStreamCacheDisabled(true);
} else {
// stores as byte array as the netty ByteBuf will be freed when the producer is done, and then we can no longer access the message body
response.retain();
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpProducer.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpProducer.java
index 42f83caa3da..e4f40352560 100644
--- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpProducer.java
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpProducer.java
@@ -78,6 +78,10 @@ public class NettyHttpProducer extends NettyProducer {
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
+ if (getConfiguration().isDisableStreamCache() || getConfiguration().isHttpProxy()) {
+ exchange.adapt(ExtendedExchange.class).setStreamCacheDisabled(true);
+ }
+
return super.process(exchange, new NettyHttpProducerCallback(exchange, callback, getConfiguration()));
}
diff --git a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpStreamDisabledStreamCachingTest.java b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpStreamDisabledStreamCachingTest.java
new file mode 100644
index 00000000000..6dcfc5267e7
--- /dev/null
+++ b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpStreamDisabledStreamCachingTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty.http;
+
+import org.apache.camel.CamelContext;
+
+public class NettyHttpStreamDisabledStreamCachingTest extends NettyHttpStreamTest {
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext context = super.createCamelContext();
+ context.setStreamCaching(false);
+ return context;
+ }
+
+}
+
diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
index ce49eb348ab..929b7c0a943 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
@@ -116,6 +116,16 @@ public interface ExtendedExchange extends Exchange {
*/
void setHistoryNodeSource(String historyNodeSource);
+ /**
+ * Is stream caching disabled on the given exchange
+ */
+ boolean isStreamCacheDisabled();
+
+ /**
+ * Used to force disabling stream caching which some components can do in special use-cases.
+ */
+ void setStreamCacheDisabled(boolean streamCacheDisabled);
+
/**
* Sets whether the exchange is routed in a transaction.
*/
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 031bec38fdb..6a031a1f402 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -938,6 +938,10 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
}
boolean failed = cause != null && ObjectHelper.getException(StreamCacheException.class, cause) != null;
if (!failed) {
+ boolean disabled = exchange.adapt(ExtendedExchange.class).isStreamCacheDisabled();
+ if (disabled) {
+ return null;
+ }
try {
// cache the body and if we could do that replace it as the new body
StreamCache sc = strategy.cache(exchange);
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
index b5cab563c28..6e480f18227 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
@@ -86,6 +86,7 @@ class AbstractExchange implements ExtendedExchange {
boolean interrupted;
boolean interruptable = true;
boolean redeliveryExhausted;
+ boolean streamCacheDisabled;
Boolean errorHandlerHandled;
AsyncCallback defaultConsumerCallback; // optimize (do not reset)
Map<String, SafeCopyProperty> safeCopyProperties;
@@ -154,6 +155,7 @@ class AbstractExchange implements ExtendedExchange {
exchange.setNotifyEvent(notifyEvent);
exchange.setRedeliveryExhausted(redeliveryExhausted);
exchange.setErrorHandlerHandled(errorHandlerHandled);
+ exchange.setStreamCacheDisabled(streamCacheDisabled);
// copy properties after body as body may trigger lazy init
if (hasProperties()) {
@@ -870,6 +872,16 @@ class AbstractExchange implements ExtendedExchange {
this.errorHandlerHandled = errorHandlerHandled;
}
+ @Override
+ public boolean isStreamCacheDisabled() {
+ return streamCacheDisabled;
+ }
+
+ @Override
+ public void setStreamCacheDisabled(boolean streamCacheDisabled) {
+ this.streamCacheDisabled = streamCacheDisabled;
+ }
+
/**
* Configures the message after it has been set on the exchange
*/
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
index c7720538f28..a8aa4574b67 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
@@ -121,6 +121,7 @@ public final class DefaultPooledExchange extends AbstractExchange implements Poo
this.interruptable = true;
this.redeliveryExhausted = false;
this.errorHandlerHandled = null;
+ this.streamCacheDisabled = false;
if (onDone != null) {
onDone.onDone(this);