You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/03/21 12:11:35 UTC
[2/2] camel git commit: CAMEL-7565: SFTP using PollEnrich with
disconnect=true and delete=true does NOT delete the file.
CAMEL-7565: SFTP using PollEnrich with disconnect=true and delete=true does NOT delete the file.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/adf655d5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/adf655d5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/adf655d5
Branch: refs/heads/camel-2.15.x
Commit: adf655d54abd0fb67326f4b50d35d5ee2676af7e
Parents: 8e9b488
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Mar 21 12:12:36 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Mar 21 12:13:25 2015 +0100
----------------------------------------------------------------------
.../file/remote/RemoteFileConsumer.java | 39 ++++++++--
...nrichConsumeWithDisconnectAndDeleteTest.java | 78 ++++++++++++++++++++
2 files changed, 111 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/adf655d5/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
index bb418e2..4a4ba2d 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
@@ -20,17 +20,19 @@ import java.io.IOException;
import java.util.List;
import org.apache.camel.Exchange;
+import org.apache.camel.Ordered;
import org.apache.camel.Processor;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.component.file.GenericFileConsumer;
import org.apache.camel.component.file.GenericFileOperationFailedException;
+import org.apache.camel.support.SynchronizationAdapter;
/**
* Base class for remote file consumers.
*/
public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> {
- protected boolean loggedIn;
- protected boolean loggedInWarning;
+ protected transient boolean loggedIn;
+ protected transient boolean loggedInWarning;
public RemoteFileConsumer(RemoteFileEndpoint<T> endpoint, Processor processor, RemoteFileOperations<T> operations) {
super(endpoint, processor, operations);
@@ -87,10 +89,6 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> {
if (log.isTraceEnabled()) {
log.trace("postPollCheck on " + getEndpoint().getConfiguration().remoteServerInformation());
}
- if (getEndpoint().isDisconnect()) {
- log.trace("postPollCheck disconnect from: {}", getEndpoint());
- disconnect();
- }
}
@Override
@@ -98,6 +96,35 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> {
// mark the exchange to be processed synchronously as the ftp client is not thread safe
// and we must execute the callbacks in the same thread as this consumer
exchange.setProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC, Boolean.TRUE);
+
+ // defer disconnect til the UoW is complete - but only the last exchange from the batch should do that
+ boolean isLast = exchange.getProperty(Exchange.BATCH_COMPLETE, true, Boolean.class);
+ if (isLast && getEndpoint().isDisconnect()) {
+ exchange.addOnCompletion(new SynchronizationAdapter() {
+ @Override
+ public void onDone(Exchange exchange) {
+ log.trace("postPollCheck disconnect from: {}", getEndpoint());
+ disconnect();
+ }
+
+ @Override
+ public boolean allowHandover() {
+ // do not allow handover as we must execute the callbacks in the same thread as this consumer
+ return false;
+ }
+
+ @Override
+ public int getOrder() {
+ // we want to disconnect last
+ return Ordered.LOWEST;
+ }
+
+ public String toString() {
+ return "Disconnect";
+ }
+ });
+ }
+
return super.processExchange(exchange);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/adf655d5/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/SftpPollEnrichConsumeWithDisconnectAndDeleteTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/SftpPollEnrichConsumeWithDisconnectAndDeleteTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/SftpPollEnrichConsumeWithDisconnectAndDeleteTest.java
new file mode 100644
index 0000000..f547b15
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/SftpPollEnrichConsumeWithDisconnectAndDeleteTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.File;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.file.remote.sftp.SftpServerTestSupport;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class SftpPollEnrichConsumeWithDisconnectAndDeleteTest extends SftpServerTestSupport {
+
+ @Test
+ public void testSftpSimpleConsume() throws Exception {
+ if (!canTest()) {
+ return;
+ }
+
+ String expected = "Hello World";
+
+ // create file using regular file
+ template.sendBodyAndHeader("file://" + FTP_ROOT_DIR, expected, Exchange.FILE_NAME, "hello.txt");
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+ mock.expectedHeaderReceived(Exchange.FILE_NAME, "hello.txt");
+ mock.expectedBodiesReceived(expected);
+
+ ProducerTemplate triggerTemplate = context.createProducerTemplate();
+ triggerTemplate.sendBody("vm:trigger", "");
+
+ assertMockEndpointsSatisfied();
+
+ long startFileDeletionCheckTime = System.currentTimeMillis();
+ boolean fileExists = true;
+ while (System.currentTimeMillis() - startFileDeletionCheckTime < 3000) { // wait up to 3000ms for file to be deleted
+ File file = new File(FTP_ROOT_DIR + "/hello.txt");
+ fileExists = file.exists();
+
+ if (fileExists) {
+ log.info("Will check that file has been deleted again in 200ms");
+ Thread.sleep(200);
+ }
+ }
+
+ assertFalse("The file should have been deleted", fileExists);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("vm:trigger")
+ .pollEnrich("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&delay=10s&disconnect=true&delete=true")
+ .routeId("foo")
+ .to("mock:result");
+ }
+ };
+ }
+}
\ No newline at end of file