You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/20 07:07:19 UTC

camel git commit: CAMEL-9970: CamelFileLength header is wrong for long write file. Implemented a generic solution. Thanks to Sergey Monichev for the test patch we are using.

Repository: camel
Updated Branches:
  refs/heads/master 092c7053e -> 5c47d946e


CAMEL-9970: CamelFileLength header is wrong for long write file. Implemented a generic solution. Thanks to Sergey Monichev for the test patch we are using.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5c47d946
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5c47d946
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5c47d946

Branch: refs/heads/master
Commit: 5c47d946e1f469f515203b594b720b9f5a297ddc
Parents: 092c705
Author: Claus Ibsen <da...@apache.org>
Authored: Fri May 20 08:57:27 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri May 20 08:57:27 2016 +0200

----------------------------------------------------------------------
 .../apache/camel/component/file/FileConsumer.java   | 16 ++++++++++++++++
 .../camel/component/file/GenericFileConsumer.java   | 15 +++++++++++++++
 .../file/strategy/FileChangedReadLockTest.java      | 10 ++++++++++
 .../camel/component/file/remote/FtpConsumer.java    | 15 +++++++++++++++
 .../camel/component/file/remote/SftpConsumer.java   | 15 +++++++++++++++
 .../remote/RemoteFileIgnoreDoPollErrorTest.java     |  6 ++++++
 6 files changed, 77 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5c47d946/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java b/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
index 7ffc1a7..5e3a180 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
@@ -27,6 +27,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.util.FileUtil;
 import org.apache.camel.util.ObjectHelper;
@@ -234,6 +236,20 @@ public class FileConsumer extends GenericFileConsumer<File> {
     }
 
     @Override
+    protected void updateFileHeaders(GenericFile<File> file, Message message) {
+        long length = file.getFile().length();
+        long modified = file.getFile().lastModified();
+        file.setFileLength(length);
+        file.setLastModified(modified);
+        if (length >= 0) {
+            message.setHeader(Exchange.FILE_LENGTH, length);
+        }
+        if (modified >= 0) {
+            message.setHeader(Exchange.FILE_LAST_MODIFIED, modified);
+        }
+    }
+
+    @Override
     public FileEndpoint getEndpoint() {
         return (FileEndpoint) super.getEndpoint();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/5c47d946/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
index 399dabd..bf7dc23 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
@@ -26,6 +26,7 @@ import java.util.regex.Pattern;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.ShutdownRunningTask;
 import org.apache.camel.impl.ScheduledBatchPollingConsumer;
@@ -396,6 +397,11 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
         // must use file from exchange as it can be updated due the
         // preMoveNamePrefix/preMoveNamePostfix options
         final GenericFile<T> target = getExchangeFileProperty(exchange);
+
+        // we can begin processing the file so update file headers on the Camel message
+        // in case it took some time to acquire read lock, and file size/timestamp has been updated since etc
+        updateFileHeaders(target, exchange.getIn());
+
         // must use full name when downloading so we have the correct path
         final String name = target.getAbsoluteFilePath();
         try {
@@ -476,6 +482,15 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
     }
 
     /**
+     * Updates the information on {@link Message} after we have acquired read-lock and
+     * can begin process the file.
+     *
+     * @param file    the file
+     * @param message the Camel message to update its headers
+     */
+    protected abstract void updateFileHeaders(GenericFile<T> file, Message message);
+
+    /**
      * Override if required.  Files are retrieved / returns true by default
      *
      * @return <tt>true</tt> to retrieve files, <tt>false</tt> to skip retrieval of files.

http://git-wip-us.apache.org/repos/asf/camel/blob/5c47d946/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java
index ede2496..5373bd7 100644
--- a/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java
@@ -20,6 +20,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.slf4j.Logger;
@@ -43,6 +44,7 @@ public class FileChangedReadLockTest extends ContextTestSupport {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(1);
         mock.expectedFileExists("target/changed/out/slowfile.dat");
+        mock.expectedHeaderReceived(Exchange.FILE_LENGTH, expectedFileLength());
 
         writeSlowFile();
 
@@ -71,6 +73,14 @@ public class FileChangedReadLockTest extends ContextTestSupport {
         LOG.debug("Writing slow file DONE...");
     }
 
+    long expectedFileLength() {
+        long length = 0;
+        for (int i = 0; i < 20; i++) {
+            length += ("Line " + i + LS).getBytes().length;
+        }
+        return length;
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {

http://git-wip-us.apache.org/repos/asf/camel/blob/5c47d946/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
index c0a37fe..6ece7e0 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.component.file.GenericFile;
 import org.apache.camel.component.file.GenericFileOperationFailedException;
@@ -253,6 +254,20 @@ public class FtpConsumer extends RemoteFileConsumer<FTPFile> {
         return answer;
     }
 
+    @Override
+    protected void updateFileHeaders(GenericFile<FTPFile> file, Message message) {
+        long length = file.getFile().getSize();
+        long modified = file.getFile().getTimestamp() != null ? file.getFile().getTimestamp().getTimeInMillis() : -1;
+        file.setFileLength(length);
+        file.setLastModified(modified);
+        if (length >= 0) {
+            message.setHeader(Exchange.FILE_LENGTH, length);
+        }
+        if (modified >= 0) {
+            message.setHeader(Exchange.FILE_LAST_MODIFIED, modified);
+        }
+    }
+
     private boolean isStepwise() {
         RemoteFileConfiguration config = (RemoteFileConfiguration) endpoint.getConfiguration();
         return config.isStepwise();

http://git-wip-us.apache.org/repos/asf/camel/blob/5c47d946/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
index 4be7394..048c5ac 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
@@ -21,6 +21,7 @@ import java.util.List;
 import com.jcraft.jsch.ChannelSftp;
 import com.jcraft.jsch.SftpException;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.component.file.GenericFile;
 import org.apache.camel.component.file.GenericFileOperationFailedException;
@@ -223,6 +224,20 @@ public class SftpConsumer extends RemoteFileConsumer<ChannelSftp.LsEntry> {
         return answer;
     }
 
+    @Override
+    protected void updateFileHeaders(GenericFile<ChannelSftp.LsEntry> file, Message message) {
+        long length = file.getFile().getAttrs().getSize();
+        long modified = file.getFile().getAttrs().getMTime() * 1000L;
+        file.setFileLength(length);
+        file.setLastModified(modified);
+        if (length >= 0) {
+            message.setHeader(Exchange.FILE_LENGTH, length);
+        }
+        if (modified >= 0) {
+            message.setHeader(Exchange.FILE_LAST_MODIFIED, modified);
+        }
+    }
+
     private boolean isStepwise() {
         RemoteFileConfiguration config = (RemoteFileConfiguration) endpoint.getConfiguration();
         return config.isStepwise();

http://git-wip-us.apache.org/repos/asf/camel/blob/5c47d946/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/RemoteFileIgnoreDoPollErrorTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/RemoteFileIgnoreDoPollErrorTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/RemoteFileIgnoreDoPollErrorTest.java
index bcd4201..b225fa9 100644
--- a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/RemoteFileIgnoreDoPollErrorTest.java
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/RemoteFileIgnoreDoPollErrorTest.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.component.file.GenericFile;
 import org.apache.camel.component.file.GenericFileOperationFailedException;
@@ -110,6 +111,11 @@ public class RemoteFileIgnoreDoPollErrorTest {
             protected boolean ignoreCannotRetrieveFile(String name, Exchange exchange, Exception cause) {
                 return ignoreCannotRetrieveFile;
             }
+
+            @Override
+            protected void updateFileHeaders(GenericFile<Object> genericFile, Message message) {
+                // noop
+            }
         };
     }
 }