You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/05/20 15:02:29 UTC

[camel] branch main updated: CAMEL-18130: camel-file/camel-ftp - Route with last processor setting OUT message causes file/ftp to not be moved correctly when its uow is done. Thanks to Manuel Shenavai for reporting and unit test.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 1cf18beaa85 CAMEL-18130: camel-file/camel-ftp - Route with last processor setting OUT message causes file/ftp to not be moved correctly when its uow is done. Thanks to Manuel Shenavai for reporting and unit test.
1cf18beaa85 is described below

commit 1cf18beaa859eb2c3ced26be6eb13f93bcd9090e
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri May 20 17:02:15 2022 +0200

    CAMEL-18130: camel-file/camel-ftp - Route with last processor setting OUT message causes file/ftp to not be moved correctly when its uow is done. Thanks to Manuel Shenavai for reporting and unit test.
---
 .../apache/camel/component/file/GenericFile.java   |  6 +-
 .../integration/SftpMoveWithOutMessageTest.java    | 95 ++++++++++++++++++++++
 .../component/file/FileMoveWithInMessageTest.java  | 75 +++++++++++++++++
 .../component/file/FileMoveWithOutMessageTest.java | 75 +++++++++++++++++
 4 files changed, 250 insertions(+), 1 deletion(-)

diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
index 7b00b9fc0e8..42d52d5cc2f 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
@@ -129,7 +129,11 @@ public class GenericFile<T> implements WrappedFile<T>, GenericFileResumable<T> {
         GenericFileMessage<T> msg = new GenericFileMessage<>(exchange, this);
 
         headers = exchange.getMessage().hasHeaders() ? exchange.getMessage().getHeaders() : null;
-        exchange.setMessage(msg);
+        // force storing on IN as that is what Camel expects
+        exchange.setIn(msg);
+        if (exchange.hasOut()) {
+            exchange.setOut(null);
+        }
 
         // preserve any existing (non file) headers, before we re-populate
         // headers
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/integration/SftpMoveWithOutMessageTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/integration/SftpMoveWithOutMessageTest.java
new file mode 100644
index 00000000000..4f3e0505395
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/integration/SftpMoveWithOutMessageTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote.sftp.integration;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.support.DefaultMessage;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.EnabledIf;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test that the existence of a outMessage in an exchange will not break the move-file post-processing
+ */
+@EnabledIf(value = "org.apache.camel.component.file.remote.services.SftpEmbeddedService#hasRequiredAlgorithms")
+public class SftpMoveWithOutMessageTest extends SftpServerTestSupport {
+
+    @Timeout(value = 30)
+    @Test
+    public void testMoveFileForMultiplePollEnrich() throws Exception {
+        String expected = "Hello World";
+
+        // create two files using regular file
+        template.sendBodyAndHeader("file://" + service.getFtpRootDir(), expected, Exchange.FILE_NAME, "hello1.txt");
+        template.sendBodyAndHeader("file://" + service.getFtpRootDir(), expected, Exchange.FILE_NAME, "hello2.txt");
+
+        ProducerTemplate triggerTemplate = context.createProducerTemplate();
+        triggerTemplate.sendBody("vm:trigger", "");
+
+        File fileInArchive = ftpFile("archive/hello1.txt").toFile();
+        await().atMost(15, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertTrue(fileInArchive.exists(), "The file should exist in the archive folder"));
+
+        File fileInArchive2 = ftpFile("archive/hello2.txt").toFile();
+        await().atMost(15, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertTrue(fileInArchive2.exists(), "The file should exist in the archive folder"));
+
+        File originalFile = ftpFile("hello1.txt").toFile();
+        await().atMost(15, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertFalse(originalFile.exists(), "The file should have been moved"));
+
+        File originalFile2 = ftpFile("hello2.txt").toFile();
+        await().atMost(15, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertFalse(originalFile2.exists(), "The file should have been moved"));
+    }
+
+    @Override
+    protected RouteBuilder[] createRouteBuilders() throws Exception {
+        TestProcessor processor = new TestProcessor();
+        return new RouteBuilder[] { new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("vm:trigger")
+                        .pollEnrich(
+                                "sftp://localhost:{{ftp.server.port}}/{{ftp.root.dir}}?username=admin&password=admin&delay=10000&disconnect=true&move=archive")
+                        .pollEnrich(
+                                "sftp://localhost:{{ftp.server.port}}/{{ftp.root.dir}}?username=admin&password=admin&delay=10000&disconnect=true&move=archive")
+                        .process(processor);
+            }
+        } };
+    }
+
+    private static class TestProcessor implements Processor {
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            DefaultMessage msg = new DefaultMessage(exchange);
+            msg.setBody(exchange.getIn().getBody());
+            msg.setHeaders(exchange.getIn().getHeaders());
+            exchange.setOut(msg); // uses OUT on purpose for testing
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileMoveWithInMessageTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileMoveWithInMessageTest.java
new file mode 100644
index 00000000000..0ca9b669875
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileMoveWithInMessageTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.support.DefaultMessage;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class FileMoveWithInMessageTest extends ContextTestSupport {
+
+    @Test
+    public void testMove() throws Exception {
+        String uri = fileUri();
+        template.sendBodyAndHeader(uri, "Hello World1", Exchange.FILE_NAME, "hello1.txt");
+        template.sendBodyAndHeader(uri, "Hello World2", Exchange.FILE_NAME, "hello2.txt");
+
+        // trigger
+        template.sendBody("vm:trigger", "");
+
+        File file1 = new File(testDirectory().toFile(), "archive/hello1.txt");
+        await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertTrue(file1.exists(), "The file should exist in the archive folder"));
+
+        File file2 = new File(testDirectory().toFile(), "archive/hello2.txt");
+        await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertTrue(file2.exists(), "The file should exist in the archive folder"));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("vm:trigger")
+                        .pollEnrich(fileUri() + "?move=archive")
+                        .pollEnrich(fileUri() + "?move=archive")
+                        .process(new TestProcessor());
+            }
+        };
+    }
+
+    private static class TestProcessor implements Processor {
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            DefaultMessage msg = new DefaultMessage(exchange);
+            msg.setBody(exchange.getIn().getBody());
+            msg.setHeaders(exchange.getIn().getHeaders());
+            exchange.setIn(msg);
+        }
+    }
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileMoveWithOutMessageTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileMoveWithOutMessageTest.java
new file mode 100644
index 00000000000..87adfe3c2b5
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileMoveWithOutMessageTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.support.DefaultMessage;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class FileMoveWithOutMessageTest extends ContextTestSupport {
+
+    @Test
+    public void testMove() throws Exception {
+        String uri = fileUri();
+        template.sendBodyAndHeader(uri, "Hello World1", Exchange.FILE_NAME, "hello1.txt");
+        template.sendBodyAndHeader(uri, "Hello World2", Exchange.FILE_NAME, "hello2.txt");
+
+        // trigger
+        template.sendBody("vm:trigger", "");
+
+        File file1 = new File(testDirectory().toFile(), "archive/hello1.txt");
+        await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertTrue(file1.exists(), "The file should exist in the archive folder"));
+
+        File file2 = new File(testDirectory().toFile(), "archive/hello2.txt");
+        await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertTrue(file2.exists(), "The file should exist in the archive folder"));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("vm:trigger")
+                        .pollEnrich(fileUri() + "?move=archive")
+                        .pollEnrich(fileUri() + "?move=archive")
+                        .process(new TestProcessor());
+            }
+        };
+    }
+
+    private static class TestProcessor implements Processor {
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            DefaultMessage msg = new DefaultMessage(exchange);
+            msg.setBody(exchange.getIn().getBody());
+            msg.setHeaders(exchange.getIn().getHeaders());
+            exchange.setOut(msg); // uses OUT on purpose for testing
+        }
+    }
+}