You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/07/16 13:31:25 UTC
[camel] branch camel-3.4.x updated: Add reconnect during
FTPOperation delete and rename
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch camel-3.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.4.x by this push:
new aae93b5 Add reconnect during FTPOperation delete and rename
aae93b5 is described below
commit aae93b567d160b8e33c496d3f4781f0015d7068d
Author: Lukas Holthof <lu...@sap.com>
AuthorDate: Wed Jul 15 22:09:50 2020 +0200
Add reconnect during FTPOperation delete and rename
Change-Id: Ie97fee70f26792793166c491755665153bc3e746
---
.../camel/component/file/remote/FtpOperations.java | 19 +++-
.../FtpConsumerPostProcessingOnDisconnect.java | 117 +++++++++++++++++++++
.../file/remote/FtpServerTestSupport.java | 17 +++
3 files changed, 152 insertions(+), 1 deletion(-)
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
index 5837418..0a83906 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
@@ -303,12 +303,13 @@ public class FtpOperations implements RemoteFileOperations<FTPFile> {
@Override
public boolean deleteFile(String name) throws GenericFileOperationFailedException {
log.debug("Deleting file: {}", name);
-
+
boolean result;
String target = name;
String currentDir = null;
try {
+ reconnectIfNecessary(null);
if (endpoint.getConfiguration().isStepwise()) {
// remember current directory
currentDir = getCurrentDirectory();
@@ -343,6 +344,7 @@ public class FtpOperations implements RemoteFileOperations<FTPFile> {
public boolean renameFile(String from, String to) throws GenericFileOperationFailedException {
log.debug("Renaming file: {} to: {}", from, to);
try {
+ reconnectIfNecessary(null);
return client.rename(from, to);
} catch (IOException e) {
throw new GenericFileOperationFailedException(client.getReplyCode(), client.getReplyString(), e.getMessage(), e);
@@ -986,5 +988,20 @@ public class FtpOperations implements RemoteFileOperations<FTPFile> {
public FTPClient getClient() {
return client;
}
+
+ private void reconnectIfNecessary(Exchange exchange) throws GenericFileOperationFailedException {
+ if (isConnected()) {
+ log.trace("sendNoOp to check if connection should be reconnected");
+ try {
+ client.sendNoOp();
+ } catch (IOException e) {
+ log.trace("NoOp to server failed, try to reconnect");
+ connect(endpoint.getConfiguration(), exchange);
+ }
+ } else {
+ log.trace("Client is not connected, try to reconnect");
+ connect(endpoint.getConfiguration(), exchange);
+ }
+ }
}
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerPostProcessingOnDisconnect.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerPostProcessingOnDisconnect.java
new file mode 100644
index 0000000..8bff7dc
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerPostProcessingOnDisconnect.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class FtpConsumerPostProcessingOnDisconnect extends FtpServerTestSupport {
+ private static final String SAMPLE_FILE_NAME_1 = String.format("sample-1-%s.txt",
+ FtpConsumerPostProcessingOnDisconnect.class.getSimpleName());
+ private static final String SAMPLE_FILE_NAME_2 = String.format("sample-2-%s.txt",
+ FtpConsumerPostProcessingOnDisconnect.class.getSimpleName());
+ private static final String SAMPLE_FILE_CHARSET = "iso-8859-1";
+ private static final String SAMPLE_FILE_PAYLOAD = "abc";
+
+ @Test
+ public void testConsumeDelete() throws Exception {
+ if (!canTest()) {
+ return;
+ }
+
+ // prepare sample file to be consumed by FTP consumer
+ createSampleFile(SAMPLE_FILE_NAME_1);
+
+ // Prepare expectations
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+ mock.expectedBodiesReceived(SAMPLE_FILE_PAYLOAD);
+
+ context.getRouteController().startRoute("foo");
+
+ // Check that expectations are satisfied
+ assertMockEndpointsSatisfied();
+
+ // File is deleted
+ Thread.sleep(250);
+ File deletedFile = new File(FTP_ROOT_DIR + "/" + SAMPLE_FILE_NAME_1);
+ assertFalse(deletedFile.exists(), "File should have been deleted: " + deletedFile);
+ }
+
+ @Test
+ public void testConsumeMove() throws Exception {
+ if (!canTest()) {
+ return;
+ }
+
+ // moved file after its processed
+ String movedFile = FTP_ROOT_DIR + "/.camel/" + SAMPLE_FILE_NAME_2;
+
+ // prepare sample file to be consumed by FTP consumer
+ createSampleFile(SAMPLE_FILE_NAME_2);
+
+ // Prepare expectations
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+ mock.expectedBodiesReceived(SAMPLE_FILE_PAYLOAD);
+ // use mock to assert that the file will be moved there eventually
+ mock.expectedFileExists(movedFile);
+
+ context.getRouteController().startRoute("bar");
+
+ // Check that expectations are satisfied
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("ftp://admin@localhost:" + getPort() + "?password=admin&delete=true").routeId("foo").noAutoStartup()
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ disconnectAllSessions(); // disconnect all Sessions on FTP server
+ }
+ }).to("mock:result");
+ from("ftp://admin@localhost:" + getPort() + "?password=admin&noop=false&move=.camel").routeId("bar").noAutoStartup()
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ disconnectAllSessions(); // disconnect all Sessions on FTP server
+ }
+ }).to("mock:result");
+ }
+ };
+ }
+
+ private void createSampleFile(String fileName) throws IOException {
+ File file = new File(FTP_ROOT_DIR + "/" + fileName);
+ FileUtils.write(file, SAMPLE_FILE_PAYLOAD, SAMPLE_FILE_CHARSET);
+ }
+
+}
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpServerTestSupport.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpServerTestSupport.java
index 0585a29..2e1d772 100644
--- a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpServerTestSupport.java
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpServerTestSupport.java
@@ -17,7 +17,10 @@
package org.apache.camel.component.file.remote;
import java.io.File;
+import java.io.IOException;
import java.security.NoSuchAlgorithmException;
+import java.util.Map;
+import java.util.Set;
import org.apache.camel.Exchange;
import org.apache.camel.util.ObjectHelper;
@@ -26,6 +29,9 @@ import org.apache.ftpserver.FtpServer;
import org.apache.ftpserver.FtpServerFactory;
import org.apache.ftpserver.filesystem.nativefs.NativeFileSystemFactory;
import org.apache.ftpserver.ftplet.UserManager;
+import org.apache.ftpserver.impl.DefaultFtpServer;
+import org.apache.ftpserver.impl.FtpIoSession;
+import org.apache.ftpserver.listener.Listener;
import org.apache.ftpserver.listener.ListenerFactory;
import org.apache.ftpserver.usermanager.ClearTextPasswordEncryptor;
import org.apache.ftpserver.usermanager.PropertiesUserManagerFactory;
@@ -138,5 +144,16 @@ public abstract class FtpServerTestSupport extends BaseServerTestSupport {
public void sendFile(String url, Object body, String fileName) {
template.sendBodyAndHeader(url, body, Exchange.FILE_NAME, simple(fileName));
}
+
+ protected void disconnectAllSessions() throws IOException {
+ // stop all listeners
+ Map<String, Listener> listeners = ((DefaultFtpServer) ftpServer).getListeners();
+ for (Listener listener : listeners.values()) {
+ Set<FtpIoSession> sessions = listener.getActiveSessions();
+ for (FtpIoSession session : sessions) {
+ session.closeNow();
+ }
+ }
+ }
}