You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/03/21 12:11:35 UTC

[2/2] camel git commit: CAMEL-7565: SFTP using PollEnrich with disconnect=true and delete=true does NOT delete the file.

CAMEL-7565: SFTP using PollEnrich with disconnect=true and delete=true does NOT delete the file.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/adf655d5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/adf655d5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/adf655d5

Branch: refs/heads/camel-2.15.x
Commit: adf655d54abd0fb67326f4b50d35d5ee2676af7e
Parents: 8e9b488
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Mar 21 12:12:36 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Mar 21 12:13:25 2015 +0100

----------------------------------------------------------------------
 .../file/remote/RemoteFileConsumer.java         | 39 ++++++++--
 ...nrichConsumeWithDisconnectAndDeleteTest.java | 78 ++++++++++++++++++++
 2 files changed, 111 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/adf655d5/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
index bb418e2..4a4ba2d 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
@@ -20,17 +20,19 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.Ordered;
 import org.apache.camel.Processor;
 import org.apache.camel.component.file.GenericFile;
 import org.apache.camel.component.file.GenericFileConsumer;
 import org.apache.camel.component.file.GenericFileOperationFailedException;
+import org.apache.camel.support.SynchronizationAdapter;
 
 /**
  * Base class for remote file consumers.
  */
 public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> {
-    protected boolean loggedIn;
-    protected boolean loggedInWarning;
+    protected transient boolean loggedIn;
+    protected transient boolean loggedInWarning;
 
     public RemoteFileConsumer(RemoteFileEndpoint<T> endpoint, Processor processor, RemoteFileOperations<T> operations) {
         super(endpoint, processor, operations);
@@ -87,10 +89,6 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> {
         if (log.isTraceEnabled()) {
             log.trace("postPollCheck on " + getEndpoint().getConfiguration().remoteServerInformation());
         }
-        if (getEndpoint().isDisconnect()) {
-            log.trace("postPollCheck disconnect from: {}", getEndpoint());
-            disconnect();
-        }
     }
 
     @Override
@@ -98,6 +96,35 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> {
         // mark the exchange to be processed synchronously as the ftp client is not thread safe
         // and we must execute the callbacks in the same thread as this consumer
         exchange.setProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC, Boolean.TRUE);
+
+        // defer disconnect til the UoW is complete - but only the last exchange from the batch should do that
+        boolean isLast = exchange.getProperty(Exchange.BATCH_COMPLETE, true, Boolean.class);
+        if (isLast && getEndpoint().isDisconnect()) {
+            exchange.addOnCompletion(new SynchronizationAdapter() {
+                @Override
+                public void onDone(Exchange exchange) {
+                    log.trace("postPollCheck disconnect from: {}", getEndpoint());
+                    disconnect();
+                }
+
+                @Override
+                public boolean allowHandover() {
+                    // do not allow handover as we must execute the callbacks in the same thread as this consumer
+                    return false;
+                }
+
+                @Override
+                public int getOrder() {
+                    // we want to disconnect last
+                    return Ordered.LOWEST;
+                }
+
+                public String toString() {
+                    return "Disconnect";
+                }
+            });
+        }
+
         return super.processExchange(exchange);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/adf655d5/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/SftpPollEnrichConsumeWithDisconnectAndDeleteTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/SftpPollEnrichConsumeWithDisconnectAndDeleteTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/SftpPollEnrichConsumeWithDisconnectAndDeleteTest.java
new file mode 100644
index 0000000..f547b15
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/SftpPollEnrichConsumeWithDisconnectAndDeleteTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.File;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.file.remote.sftp.SftpServerTestSupport;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class SftpPollEnrichConsumeWithDisconnectAndDeleteTest extends SftpServerTestSupport {
+
+    @Test
+    public void testSftpSimpleConsume() throws Exception {
+        if (!canTest()) {
+            return;
+        }
+
+        String expected = "Hello World";
+
+        // create file using regular file
+        template.sendBodyAndHeader("file://" + FTP_ROOT_DIR, expected, Exchange.FILE_NAME, "hello.txt");
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.expectedHeaderReceived(Exchange.FILE_NAME, "hello.txt");
+        mock.expectedBodiesReceived(expected);
+
+        ProducerTemplate triggerTemplate = context.createProducerTemplate();
+        triggerTemplate.sendBody("vm:trigger", "");
+
+        assertMockEndpointsSatisfied();
+
+        long startFileDeletionCheckTime = System.currentTimeMillis();
+        boolean fileExists = true;
+        while (System.currentTimeMillis() - startFileDeletionCheckTime < 3000) {  // wait up to 3000ms for file to be deleted
+            File file = new File(FTP_ROOT_DIR + "/hello.txt");
+            fileExists = file.exists();
+
+            if (fileExists) {
+                log.info("Will check that file has been deleted again in 200ms");
+                Thread.sleep(200);
+            }
+        }
+
+        assertFalse("The file should have been deleted", fileExists);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("vm:trigger")
+                    .pollEnrich("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&delay=10s&disconnect=true&delete=true")
+                    .routeId("foo")
+                    .to("mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file