You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/12/12 09:32:07 UTC

[camel] 01/02: CAMEL-12991 setting processStrategy for sftp/ftp endpoints (#2665)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.23.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 926eb1fe8b4a14e638d07c02c22c5dd20e5612f6
Author: swalendzik <se...@gmail.com>
AuthorDate: Wed Dec 12 10:19:16 2018 +0100

    CAMEL-12991 setting processStrategy for sftp/ftp endpoints (#2665)
---
 .../camel/component/file/remote/FtpEndpoint.java   |  2 +-
 .../camel/component/file/remote/SftpEndpoint.java  |  2 +-
 .../remote/FtpConsumerProcessStrategyTest.java     | 90 +++++++++++++++++++++
 .../sftp/SftpConsumerProcessStrategyTest.java      | 93 ++++++++++++++++++++++
 4 files changed, 185 insertions(+), 2 deletions(-)

diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java
index 22a72ca..51a590c 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java
@@ -96,7 +96,7 @@ public class FtpEndpoint<T extends FTPFile> extends RemoteFileEndpoint<FTPFile>
     @Override
     protected RemoteFileConsumer<FTPFile> buildConsumer(Processor processor) {
         try {
-            return new FtpConsumer(this, processor, createRemoteFileOperations(), createGenericFileStrategy());
+            return new FtpConsumer(this, processor, createRemoteFileOperations(), processStrategy != null ? processStrategy : createGenericFileStrategy());
         } catch (Exception e) {
             throw new FailedToCreateConsumerException(this, e);
         }
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java
index 069dfa7..41456f5 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java
@@ -62,7 +62,7 @@ public class SftpEndpoint extends RemoteFileEndpoint<SftpRemoteFile> {
 
     @Override
     protected RemoteFileConsumer<SftpRemoteFile> buildConsumer(Processor processor) {
-        return new SftpConsumer(this, processor, createRemoteFileOperations(), createGenericFileStrategy());
+        return new SftpConsumer(this, processor, createRemoteFileOperations(), processStrategy != null ? processStrategy : createGenericFileStrategy());
     }
 
     protected GenericFileProducer<SftpRemoteFile> buildProducer() {
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerProcessStrategyTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerProcessStrategyTest.java
new file mode 100644
index 0000000..13b2c45
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerProcessStrategyTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.file.GenericFile;
+import org.apache.camel.component.file.GenericFileEndpoint;
+import org.apache.camel.component.file.GenericFileOperations;
+import org.apache.camel.component.file.GenericFileProcessStrategy;
+import org.apache.camel.impl.JndiRegistry;
+import org.junit.Test;
+
+public class FtpConsumerProcessStrategyTest extends FtpServerTestSupport {
+
+    private MyStrategy myStrategy;
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        myStrategy = new MyStrategy();
+        jndi.bind("myStrategy", myStrategy);
+        return jndi;
+    }
+
+    private String getFtpUrl() {
+        return "ftp://admin@localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?password=admin&processStrategy=#myStrategy";
+    }
+
+    @Test
+    public void testFtpConsume() throws Exception {
+        if (!canTest()) {
+            return;
+        }
+
+        sendFile(getFtpUrl(), "Hello World", "hello.txt");
+
+        String out = consumer.receiveBody(getFtpUrl(), 5000, String.class);
+        assertNotNull(out);
+        assertTrue(out.startsWith("Hello World"));
+        assertEquals("Begin should have been invoked 1 times", 1, myStrategy.getInvoked());
+    }
+
+    private static class MyStrategy implements GenericFileProcessStrategy {
+
+        private volatile int invoked;
+
+        @Override
+        public void prepareOnStartup(GenericFileOperations operations, GenericFileEndpoint endpoint) throws Exception {
+            //noop
+        }
+
+        @Override
+        public boolean begin(GenericFileOperations operations, GenericFileEndpoint endpoint, Exchange exchange, GenericFile file) throws Exception {
+            return true;
+        }
+
+        @Override
+        public void abort(GenericFileOperations operations, GenericFileEndpoint endpoint, Exchange exchange, GenericFile file) throws Exception {
+            //noop
+        }
+
+        @Override
+        public void commit(GenericFileOperations operations, GenericFileEndpoint endpoint, Exchange exchange, GenericFile file) throws Exception {
+            invoked++;
+        }
+
+        @Override
+        public void rollback(GenericFileOperations operations, GenericFileEndpoint endpoint, Exchange exchange, GenericFile file) throws Exception {
+            //noop
+        }
+
+        int getInvoked() {
+            return invoked;
+        }
+    }
+}
\ No newline at end of file
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpConsumerProcessStrategyTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpConsumerProcessStrategyTest.java
new file mode 100644
index 0000000..d314728
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpConsumerProcessStrategyTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote.sftp;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.file.GenericFile;
+import org.apache.camel.component.file.GenericFileEndpoint;
+import org.apache.camel.component.file.GenericFileOperations;
+import org.apache.camel.component.file.GenericFileProcessStrategy;
+import org.apache.camel.impl.JndiRegistry;
+import org.junit.Test;
+
+public class SftpConsumerProcessStrategyTest extends SftpServerTestSupport {
+
+    private MyStrategy myStrategy;
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        myStrategy = new MyStrategy();
+        jndi.bind("myStrategy", myStrategy);
+        return jndi;
+    }
+
+    @Test
+    public void testSftpConsume() throws Exception {
+        if (!canTest()) {
+            return;
+        }
+
+        // create file using regular file
+        template.sendBodyAndHeader("file://" + FTP_ROOT_DIR, "Hello World", Exchange.FILE_NAME, "hello.txt");
+
+        String out = consumer.receiveBody("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&processStrategy=#myStrategy", 5000, String.class);
+        assertNotNull(out);
+        // Apache SSHD appends \u0000 at last byte in retrieved file
+        assertTrue(out.startsWith("Hello World"));
+        assertEquals("CustomProcessStrategy should have been invoked 1 times", 1, myStrategy.getInvoked());
+    }
+
+    private static class MyStrategy implements GenericFileProcessStrategy {
+
+        private volatile int invoked;
+
+        @Override
+        public void prepareOnStartup(GenericFileOperations operations, GenericFileEndpoint endpoint) throws Exception {
+            //noop
+        }
+
+        @Override
+        public boolean begin(GenericFileOperations operations, GenericFileEndpoint endpoint, Exchange exchange, GenericFile file) throws Exception {
+            return true;
+        }
+
+        @Override
+        public void abort(GenericFileOperations operations, GenericFileEndpoint endpoint, Exchange exchange, GenericFile file) throws Exception {
+            //noop
+        }
+
+        @Override
+        public void commit(GenericFileOperations operations, GenericFileEndpoint endpoint, Exchange exchange, GenericFile file) throws Exception {
+            invoked++;
+        }
+
+        @Override
+        public void rollback(GenericFileOperations operations, GenericFileEndpoint endpoint, Exchange exchange, GenericFile file) throws Exception {
+            //noop
+        }
+
+        int getInvoked() {
+            return invoked;
+        }
+    }
+}