You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/05/12 09:29:36 UTC

[camel] 01/02: CAMEL-18087: camel-core - Enable stream caching by default. (#7586)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 893b8cde2457642c2c55592253f1468dc006e89c
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu May 12 11:28:37 2022 +0200

    CAMEL-18087: camel-core - Enable stream caching by default. (#7586)
---
 .../netty/http/DefaultNettyHttpBinding.java        |  3 +++
 .../component/netty/http/NettyHttpProducer.java    |  4 +++
 .../NettyHttpStreamDisabledStreamCachingTest.java  | 31 ++++++++++++++++++++++
 .../java/org/apache/camel/ExtendedExchange.java    | 10 +++++++
 .../camel/impl/engine/CamelInternalProcessor.java  |  4 +++
 .../org/apache/camel/support/AbstractExchange.java | 12 +++++++++
 .../camel/support/DefaultPooledExchange.java       |  1 +
 7 files changed, 65 insertions(+)

diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
index 2819d3d5290..b536ed23336 100644
--- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
@@ -104,6 +104,7 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable {
             // for proxy use case pass the request body buffer directly to the response to avoid additional processing
             // we need to retain it so that the request can be released and we can keep the content
             answer.setBody(request.content().retain());
+            answer.getExchange().adapt(ExtendedExchange.class).setStreamCacheDisabled(true);
             exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() {
                 @Override
                 public void onDone(Exchange exchange) {
@@ -321,6 +322,8 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable {
         if (configuration.isDisableStreamCache() || configuration.isHttpProxy()) {
             // keep the body as is, and use type converters
             answer.setBody(response.content());
+            // turn off stream cache as we use the raw body as-is
+            answer.getExchange().adapt(ExtendedExchange.class).setStreamCacheDisabled(true);
         } else {
             // stores as byte array as the netty ByteBuf will be freed when the producer is done, and then we can no longer access the message body
             response.retain();
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpProducer.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpProducer.java
index 42f83caa3da..e4f40352560 100644
--- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpProducer.java
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpProducer.java
@@ -78,6 +78,10 @@ public class NettyHttpProducer extends NettyProducer {
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
+        if (getConfiguration().isDisableStreamCache() || getConfiguration().isHttpProxy()) {
+            exchange.adapt(ExtendedExchange.class).setStreamCacheDisabled(true);
+        }
+
         return super.process(exchange, new NettyHttpProducerCallback(exchange, callback, getConfiguration()));
     }
 
diff --git a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpStreamDisabledStreamCachingTest.java b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpStreamDisabledStreamCachingTest.java
new file mode 100644
index 00000000000..6dcfc5267e7
--- /dev/null
+++ b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpStreamDisabledStreamCachingTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty.http;
+
+import org.apache.camel.CamelContext;
+
+public class NettyHttpStreamDisabledStreamCachingTest extends NettyHttpStreamTest {
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        context.setStreamCaching(false);
+        return context;
+    }
+
+}
+
diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
index ce49eb348ab..929b7c0a943 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
@@ -116,6 +116,16 @@ public interface ExtendedExchange extends Exchange {
      */
     void setHistoryNodeSource(String historyNodeSource);
 
+    /**
+     * Is stream caching disabled on the given exchange
+     */
+    boolean isStreamCacheDisabled();
+
+    /**
+     * Used to force disabling stream caching which some components can do in special use-cases.
+     */
+    void setStreamCacheDisabled(boolean streamCacheDisabled);
+
     /**
      * Sets whether the exchange is routed in a transaction.
      */
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 031bec38fdb..6a031a1f402 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -938,6 +938,10 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
             }
             boolean failed = cause != null && ObjectHelper.getException(StreamCacheException.class, cause) != null;
             if (!failed) {
+                boolean disabled = exchange.adapt(ExtendedExchange.class).isStreamCacheDisabled();
+                if (disabled) {
+                    return null;
+                }
                 try {
                     // cache the body and if we could do that replace it as the new body
                     StreamCache sc = strategy.cache(exchange);
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
index b5cab563c28..6e480f18227 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
@@ -86,6 +86,7 @@ class AbstractExchange implements ExtendedExchange {
     boolean interrupted;
     boolean interruptable = true;
     boolean redeliveryExhausted;
+    boolean streamCacheDisabled;
     Boolean errorHandlerHandled;
     AsyncCallback defaultConsumerCallback; // optimize (do not reset)
     Map<String, SafeCopyProperty> safeCopyProperties;
@@ -154,6 +155,7 @@ class AbstractExchange implements ExtendedExchange {
         exchange.setNotifyEvent(notifyEvent);
         exchange.setRedeliveryExhausted(redeliveryExhausted);
         exchange.setErrorHandlerHandled(errorHandlerHandled);
+        exchange.setStreamCacheDisabled(streamCacheDisabled);
 
         // copy properties after body as body may trigger lazy init
         if (hasProperties()) {
@@ -870,6 +872,16 @@ class AbstractExchange implements ExtendedExchange {
         this.errorHandlerHandled = errorHandlerHandled;
     }
 
+    @Override
+    public boolean isStreamCacheDisabled() {
+        return streamCacheDisabled;
+    }
+
+    @Override
+    public void setStreamCacheDisabled(boolean streamCacheDisabled) {
+        this.streamCacheDisabled = streamCacheDisabled;
+    }
+
     /**
      * Configures the message after it has been set on the exchange
      */
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
index c7720538f28..a8aa4574b67 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
@@ -121,6 +121,7 @@ public final class DefaultPooledExchange extends AbstractExchange implements Poo
             this.interruptable = true;
             this.redeliveryExhausted = false;
             this.errorHandlerHandled = null;
+            this.streamCacheDisabled = false;
 
             if (onDone != null) {
                 onDone.onDone(this);