You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/12/12 10:45:25 UTC
[camel] 04/05: CAMEL-12991 setting processStrategy for sftp/ftp
endpoints (#2665)
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch sandbox/camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 64faf580bfbd31957b9a977fdb1b8a9ba530359e
Author: swalendzik <se...@gmail.com>
AuthorDate: Wed Dec 12 10:19:16 2018 +0100
CAMEL-12991 setting processStrategy for sftp/ftp endpoints (#2665)
---
.../camel/component/file/remote/FtpEndpoint.java | 2 +-
.../camel/component/file/remote/SftpEndpoint.java | 2 +-
.../remote/FtpConsumerProcessStrategyTest.java | 90 +++++++++++++++++++++
.../sftp/SftpConsumerProcessStrategyTest.java | 93 ++++++++++++++++++++++
4 files changed, 185 insertions(+), 2 deletions(-)
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java
index e7ce84d..0c0b61f 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java
@@ -96,7 +96,7 @@ public class FtpEndpoint<T extends FTPFile> extends RemoteFileEndpoint<FTPFile>
@Override
protected RemoteFileConsumer<FTPFile> buildConsumer(Processor processor) {
try {
- return new FtpConsumer(this, processor, createRemoteFileOperations(), createGenericFileStrategy());
+ return new FtpConsumer(this, processor, createRemoteFileOperations(), processStrategy != null ? processStrategy : createGenericFileStrategy());
} catch (Exception e) {
throw new FailedToCreateConsumerException(this, e);
}
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java
index 069dfa7..41456f5 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java
@@ -62,7 +62,7 @@ public class SftpEndpoint extends RemoteFileEndpoint<SftpRemoteFile> {
@Override
protected RemoteFileConsumer<SftpRemoteFile> buildConsumer(Processor processor) {
- return new SftpConsumer(this, processor, createRemoteFileOperations(), createGenericFileStrategy());
+ return new SftpConsumer(this, processor, createRemoteFileOperations(), processStrategy != null ? processStrategy : createGenericFileStrategy());
}
protected GenericFileProducer<SftpRemoteFile> buildProducer() {
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerProcessStrategyTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerProcessStrategyTest.java
new file mode 100644
index 0000000..13b2c45
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerProcessStrategyTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.file.GenericFile;
+import org.apache.camel.component.file.GenericFileEndpoint;
+import org.apache.camel.component.file.GenericFileOperations;
+import org.apache.camel.component.file.GenericFileProcessStrategy;
+import org.apache.camel.impl.JndiRegistry;
+import org.junit.Test;
+
+public class FtpConsumerProcessStrategyTest extends FtpServerTestSupport {
+
+ private MyStrategy myStrategy;
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry jndi = super.createRegistry();
+ myStrategy = new MyStrategy();
+ jndi.bind("myStrategy", myStrategy);
+ return jndi;
+ }
+
+ private String getFtpUrl() {
+ return "ftp://admin@localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?password=admin&processStrategy=#myStrategy";
+ }
+
+ @Test
+ public void testFtpConsume() throws Exception {
+ if (!canTest()) {
+ return;
+ }
+
+ sendFile(getFtpUrl(), "Hello World", "hello.txt");
+
+ String out = consumer.receiveBody(getFtpUrl(), 5000, String.class);
+ assertNotNull(out);
+ assertTrue(out.startsWith("Hello World"));
+ assertEquals("Begin should have been invoked 1 times", 1, myStrategy.getInvoked());
+ }
+
+ private static class MyStrategy implements GenericFileProcessStrategy {
+
+ private volatile int invoked;
+
+ @Override
+ public void prepareOnStartup(GenericFileOperations operations, GenericFileEndpoint endpoint) throws Exception {
+ //noop
+ }
+
+ @Override
+ public boolean begin(GenericFileOperations operations, GenericFileEndpoint endpoint, Exchange exchange, GenericFile file) throws Exception {
+ return true;
+ }
+
+ @Override
+ public void abort(GenericFileOperations operations, GenericFileEndpoint endpoint, Exchange exchange, GenericFile file) throws Exception {
+ //noop
+ }
+
+ @Override
+ public void commit(GenericFileOperations operations, GenericFileEndpoint endpoint, Exchange exchange, GenericFile file) throws Exception {
+ invoked++;
+ }
+
+ @Override
+ public void rollback(GenericFileOperations operations, GenericFileEndpoint endpoint, Exchange exchange, GenericFile file) throws Exception {
+ //noop
+ }
+
+ int getInvoked() {
+ return invoked;
+ }
+ }
+}
\ No newline at end of file
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpConsumerProcessStrategyTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpConsumerProcessStrategyTest.java
new file mode 100644
index 0000000..d314728
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpConsumerProcessStrategyTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote.sftp;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.file.GenericFile;
+import org.apache.camel.component.file.GenericFileEndpoint;
+import org.apache.camel.component.file.GenericFileOperations;
+import org.apache.camel.component.file.GenericFileProcessStrategy;
+import org.apache.camel.impl.JndiRegistry;
+import org.junit.Test;
+
+public class SftpConsumerProcessStrategyTest extends SftpServerTestSupport {
+
+ private MyStrategy myStrategy;
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry jndi = super.createRegistry();
+ myStrategy = new MyStrategy();
+ jndi.bind("myStrategy", myStrategy);
+ return jndi;
+ }
+
+ @Test
+ public void testSftpConsume() throws Exception {
+ if (!canTest()) {
+ return;
+ }
+
+ // create file using regular file
+ template.sendBodyAndHeader("file://" + FTP_ROOT_DIR, "Hello World", Exchange.FILE_NAME, "hello.txt");
+
+ String out = consumer.receiveBody("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&processStrategy=#myStrategy", 5000, String.class);
+ assertNotNull(out);
+ // Apache SSHD appends \u0000 at last byte in retrieved file
+ assertTrue(out.startsWith("Hello World"));
+ assertEquals("CustomProcessStrategy should have been invoked 1 times", 1, myStrategy.getInvoked());
+ }
+
+ private static class MyStrategy implements GenericFileProcessStrategy {
+
+ private volatile int invoked;
+
+ @Override
+ public void prepareOnStartup(GenericFileOperations operations, GenericFileEndpoint endpoint) throws Exception {
+ //noop
+ }
+
+ @Override
+ public boolean begin(GenericFileOperations operations, GenericFileEndpoint endpoint, Exchange exchange, GenericFile file) throws Exception {
+ return true;
+ }
+
+ @Override
+ public void abort(GenericFileOperations operations, GenericFileEndpoint endpoint, Exchange exchange, GenericFile file) throws Exception {
+ //noop
+ }
+
+ @Override
+ public void commit(GenericFileOperations operations, GenericFileEndpoint endpoint, Exchange exchange, GenericFile file) throws Exception {
+ invoked++;
+ }
+
+ @Override
+ public void rollback(GenericFileOperations operations, GenericFileEndpoint endpoint, Exchange exchange, GenericFile file) throws Exception {
+ //noop
+ }
+
+ int getInvoked() {
+ return invoked;
+ }
+ }
+}