You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/09 07:15:23 UTC

[camel] branch master updated: Stream caching should be able to init with camel-quarkus out of the box

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 087b5a7  Stream caching should be able to init with camel-quarkus out of the box
087b5a7 is described below

commit 087b5a7db18c8070e37b119cb9db0513e3dd0865
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Aug 9 09:15:00 2019 +0200

    Stream caching should be able to init with camel-quarkus out of the box
---
 .../impl/engine/DefaultStreamCachingStrategy.java  |  34 +++--
 .../StreamCachingSpoolDirectoryQuarkusTest.java    | 147 +++++++++++++++++++++
 2 files changed, 173 insertions(+), 8 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java
index 9d0ccb4..8fd7663 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java
@@ -22,6 +22,7 @@ import java.lang.management.MemoryMXBean;
 import java.util.LinkedHashSet;
 import java.util.Set;
 import java.util.UUID;
+import java.util.regex.Matcher;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
@@ -222,15 +223,32 @@ public class DefaultStreamCachingStrategy extends ServiceSupport implements Came
     }
 
     protected String resolveSpoolDirectory(String path) {
-        String name = camelContext.getManagementNameStrategy().resolveManagementName(path, camelContext.getName(), false);
-        if (name != null) {
-            name = customResolveManagementName(name);
-        }
-        // and then check again with invalid check to ensure all ## is resolved
-        if (name != null) {
-            name = camelContext.getManagementNameStrategy().resolveManagementName(name, camelContext.getName(), true);
+        if (camelContext.getManagementNameStrategy() != null) {
+            String name = camelContext.getManagementNameStrategy().resolveManagementName(path, camelContext.getName(), false);
+            if (name != null) {
+                name = customResolveManagementName(name);
+            }
+            // and then check again with invalid check to ensure all ## is resolved
+            if (name != null) {
+                name = camelContext.getManagementNameStrategy().resolveManagementName(name, camelContext.getName(), true);
+            }
+            return name;
+        } else {
+            return defaultManagementName(path);
         }
-        return name;
+    }
+
+    protected String defaultManagementName(String path) {
+        // must quote the names to have it work as literal replacement
+        String name = Matcher.quoteReplacement(camelContext.getName());
+
+        // replace tokens
+        String answer = path;
+        answer = answer.replaceFirst("#camelId#", name);
+        answer = answer.replaceFirst("#name#", name);
+        // replace custom
+        answer = customResolveManagementName(answer);
+        return answer;
     }
 
     protected String customResolveManagementName(String pattern) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/StreamCachingSpoolDirectoryQuarkusTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/StreamCachingSpoolDirectoryQuarkusTest.java
new file mode 100644
index 0000000..7ba40f8
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/StreamCachingSpoolDirectoryQuarkusTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FilterInputStream;
+import java.io.InputStream;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.spi.ManagementNameStrategy;
+import org.apache.camel.spi.StreamCachingStrategy;
+import org.junit.Before;
+import org.junit.Test;
+
+public class StreamCachingSpoolDirectoryQuarkusTest extends ContextTestSupport {
+
+    private MyCustomSpoolRule spoolRule = new MyCustomSpoolRule();
+
+    private class MyCamelContext extends DefaultCamelContext {
+
+        public MyCamelContext(boolean init) {
+            super(init);
+        }
+
+        @Override
+        public ManagementNameStrategy getManagementNameStrategy() {
+            // quarkus has no management at all
+            return null;
+        }
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        deleteDirectory("target/data/cachedir");
+        super.setUp();
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        DefaultCamelContext context = new MyCamelContext(false);
+        context.disableJMX();
+        context.setRegistry(createRegistry());
+        context.setLoadTypeConverters(isLoadTypeConverters());
+        return context;
+    }
+
+    @Test
+    public void testByteArrayInputStream() throws Exception {
+        getMockEndpoint("mock:english").expectedBodiesReceived("<hello/>");
+        getMockEndpoint("mock:dutch").expectedBodiesReceived("<hallo/>");
+        getMockEndpoint("mock:german").expectedBodiesReceived("<hallo/>");
+        getMockEndpoint("mock:french").expectedBodiesReceived("<hellos/>");
+
+        // need to wrap in MyInputStream as ByteArrayInputStream is optimized to just reuse in memory buffer
+        // and not needed to spool to disk
+        template.sendBody("direct:a", new MyInputStream(new ByteArrayInputStream("<hello/>".getBytes())));
+
+        spoolRule.setSpool(true);
+        template.sendBody("direct:a", new MyInputStream(new ByteArrayInputStream("<hallo/>".getBytes())));
+        template.sendBody("direct:a", new MyInputStream(new ByteArrayInputStream("<hellos/>".getBytes())));
+
+        assertMockEndpointsSatisfied();
+    }
+
+    private final class MyInputStream extends FilterInputStream {
+
+        private MyInputStream(InputStream in) {
+            super(in);
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.getStreamCachingStrategy().setSpoolDirectory("target/cachedir");
+                context.getStreamCachingStrategy().addSpoolRule(spoolRule);
+                context.getStreamCachingStrategy().setAnySpoolRules(true);
+                context.setStreamCaching(true);
+
+                from("direct:a")
+                    .choice()
+                        .when(xpath("//hello")).to("mock:english")
+                        .when(xpath("//hallo")).to("mock:dutch", "mock:german")
+                        .otherwise().to("mock:french")
+                    .end()
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            // check if spool file exists
+                            if (spoolRule.isSpool()) {
+                                String[] names = new File("target/cachedir").list();
+                                assertEquals("There should be a cached spool file", 1, names.length);
+                            }
+                        }
+                    });
+
+            }
+        };
+    }
+
+    private static final class MyCustomSpoolRule implements StreamCachingStrategy.SpoolRule {
+
+        private volatile boolean spool;
+
+        @Override
+        public boolean shouldSpoolCache(long length) {
+            return spool;
+        }
+
+        public boolean isSpool() {
+            return spool;
+        }
+
+        public void setSpool(boolean spool) {
+            this.spool = spool;
+        }
+
+        @Override
+        public String toString() {
+            return "MyCustomSpoolRule";
+        }
+    }
+}
+