You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/04/30 12:44:49 UTC

svn commit: r533687 - in /activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file: FileConsumer.java FileEndpoint.java FileExchange.java FileProducer.java

Author: rajdavies
Date: Mon Apr 30 03:44:48 2007
New Revision: 533687

URL: http://svn.apache.org/viewvc?view=rev&rev=533687
Log:
allow writing if file locking not selected as an option

Modified:
    activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
    activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java
    activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileExchange.java
    activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileProducer.java

Modified: activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java?view=diff&rev=533687&r1=533686&r2=533687
==============================================================================
--- activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java (original)
+++ activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java Mon Apr 30 03:44:48 2007
@@ -17,20 +17,11 @@
  */
 package org.apache.camel.component.file;
 
-import java.io.BufferedInputStream;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.RandomAccessFile;
-import java.net.SocketAddress;
 import java.nio.channels.FileChannel;
-import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.locks.Lock;
-import javax.management.Query;
-import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.PollingConsumer;
 import org.apache.commons.logging.Log;
@@ -39,134 +30,126 @@
 /**
  * @version $Revision: 523016 $
  */
-public class FileConsumer extends PollingConsumer<FileExchange> {
-    private static final transient Log log = LogFactory.getLog(FileConsumer.class);
+public class FileConsumer extends PollingConsumer<FileExchange>{
 
+    private static final transient Log log=LogFactory.getLog(FileConsumer.class);
     private final FileEndpoint endpoint;
     private boolean recursive=true;
     private boolean attemptFileLock=false;
-    private String regexPattern = "";
-    private long lastPollTime = 0l;
-   
-   
-
-    public FileConsumer(final FileEndpoint endpoint, Processor<FileExchange> processor,ScheduledExecutorService executor) {
-        super(endpoint, processor,executor);
-        this.endpoint = endpoint;
-        
-        
+    private String regexPattern="";
+    private long lastPollTime=0l;
+
+    public FileConsumer(final FileEndpoint endpoint,Processor<FileExchange> processor,ScheduledExecutorService executor){
+        super(endpoint,processor,executor);
+        this.endpoint=endpoint;
     }
-    protected void poll() throws Exception {
+
+    protected void poll() throws Exception{
         pollFileOrDirectory(endpoint.getFile(),isRecursive());
         lastPollTime=System.currentTimeMillis();
     }
-    
-    
-    protected void pollFileOrDirectory(File fileOrDirectory, boolean processDir) {
-        if (!fileOrDirectory.isDirectory()) {
+
+    protected void pollFileOrDirectory(File fileOrDirectory,boolean processDir){
+        if(!fileOrDirectory.isDirectory()){
             pollFile(fileOrDirectory); // process the file
-        }
-        else if (processDir) {
-            log.debug("Polling directory " + fileOrDirectory);
-            File[] files = fileOrDirectory.listFiles();
-            for (int i = 0; i < files.length; i++) {
-                pollFileOrDirectory(files[i], isRecursive()); // self-recursion
+        }else if(processDir){
+            log.debug("Polling directory "+fileOrDirectory);
+            File[] files=fileOrDirectory.listFiles();
+            for(int i=0;i<files.length;i++){
+                pollFileOrDirectory(files[i],isRecursive()); // self-recursion
             }
-        }
-        else {
-            log.debug("Skipping directory " + fileOrDirectory);
+        }else{
+            log.debug("Skipping directory "+fileOrDirectory);
         }
     }
 
-    protected void pollFile(final File file) {
-        if (file.exists() && file.lastModified() > lastPollTime) {
-            if (isValidFile(file)) {
+    protected void pollFile(final File file){
+        if(file.exists()&&file.lastModified()>lastPollTime){
+            if(isValidFile(file)){
                 processFile(file);
             }
         }
     }
 
-    
-
-    protected void processFile(File file) {
+    protected void processFile(File file){
         getProcessor().process(endpoint.createExchange(file));
     }
-    
-   
-    
+
     protected boolean isValidFile(File file){
         boolean result=false;
         if(file!=null&&file.exists()){
-            if (isMatched(file)) {
-            if(isAttemptFileLock()){
-                FileChannel fc=null;
-                try{
-                    fc=new RandomAccessFile(file,"rw").getChannel();
-                    fc.lock();
-                    result=true;
-                }catch(Throwable e){
-                }finally{
-                    if(fc!=null){
-                        try{
-                            fc.close();
-                        }catch(IOException e){
+            if(isMatched(file)){
+                if(isAttemptFileLock()){
+                    FileChannel fc=null;
+                    try{
+                        fc=new RandomAccessFile(file,"rw").getChannel();
+                        fc.lock();
+                        result=true;
+                    }catch(Throwable e){
+                        log.debug("Failed to get the lock on file: " + file,e);
+                    }finally{
+                        if(fc!=null){
+                            try{
+                                fc.close();
+                            }catch(IOException e){
+                            }
                         }
                     }
+                }else{
+                    result=true;
                 }
             }
-            }
         }
         return result;
     }
-    
-    protected boolean isMatched(File file) {
-        boolean result = true;
-        if ( regexPattern != null  && regexPattern.length() > 0 ) {
-            result = file.getName().matches(getRegexPattern());
+
+    protected boolean isMatched(File file){
+        boolean result=true;
+        if(regexPattern!=null&&regexPattern.length()>0){
+            result=file.getName().matches(getRegexPattern());
         }
         return result;
     }
-    
+
     /**
      * @return the recursive
      */
     public boolean isRecursive(){
         return this.recursive;
     }
-    
+
     /**
      * @param recursive the recursive to set
      */
     public void setRecursive(boolean recursive){
         this.recursive=recursive;
     }
-    
+
     /**
      * @return the attemptFileLock
      */
     public boolean isAttemptFileLock(){
         return this.attemptFileLock;
     }
-    
+
     /**
      * @param attemptFileLock the attemptFileLock to set
      */
     public void setAttemptFileLock(boolean checkAppending){
         this.attemptFileLock=checkAppending;
     }
-    
+
     /**
      * @return the regexPattern
      */
     public String getRegexPattern(){
         return this.regexPattern;
     }
-    
+
     /**
      * @param regexPattern the regexPattern to set
      */
     public void setRegexPattern(String regexPattern){
         this.regexPattern=regexPattern;
     }
-
 }

Modified: activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java?view=diff&rev=533687&r1=533686&r2=533687
==============================================================================
--- activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java (original)
+++ activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java Mon Apr 30 03:44:48 2007
@@ -46,7 +46,7 @@
      * @see org.apache.camel.Endpoint#createConsumer(org.apache.camel.Processor)
      */
     public Consumer<FileExchange> createConsumer(Processor<FileExchange> file) throws Exception{
-        return new FileConsumer(this, file, executor);
+        return new FileConsumer(this, file, getExecutor());
     }
 
     /**
@@ -103,4 +103,6 @@
 		return true;
 	}
   
+    
+
 }

Modified: activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileExchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileExchange.java?view=diff&rev=533687&r1=533686&r2=533687
==============================================================================
--- activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileExchange.java (original)
+++ activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileExchange.java Mon Apr 30 03:44:48 2007
@@ -23,7 +23,7 @@
 import org.apache.camel.impl.DefaultExchange;
 
 /**
- * A {@link Exchange} for MINA
+ * A {@link Exchange} for File
  * 
  * @version $Revision: 520985 $
  */

Modified: activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileProducer.java?view=diff&rev=533687&r1=533686&r2=533687
==============================================================================
--- activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileProducer.java (original)
+++ activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileProducer.java Mon Apr 30 03:44:48 2007
@@ -24,30 +24,32 @@
 import org.apache.commons.logging.LogFactory;
 
 /**
- * A {@link Producer} implementation for MINA
+ * A {@link Producer} implementation for File
  * 
  * @version $Revision: 523016 $
  */
 public class FileProducer extends DefaultProducer<FileExchange>{
-    private static final transient Log log = LogFactory.getLog(FileProducer.class);
 
+    private static final transient Log log=LogFactory.getLog(FileProducer.class);
     private final FileEndpoint endpoint;
+
     public FileProducer(FileEndpoint endpoint){
         super(endpoint);
-        this.endpoint = endpoint;
+        this.endpoint=endpoint;
     }
 
     /**
-     * @param arg0
+     * @param exchange
      * @see org.apache.camel.Processor#process(java.lang.Object)
      */
     public void process(FileExchange exchange){
-        ByteBuffer payload = exchange.getIn().getBody(ByteBuffer.class);
-        File file = null;
-        if (endpoint.getFile() != null && endpoint.getFile().isDirectory()) {
-            file = new File(endpoint.getFile(),exchange.getFile().getName());
-        }else {
-            file = exchange.getFile();
+        ByteBuffer payload=exchange.getIn().getBody(ByteBuffer.class);
+        payload.flip();
+        File file=null;
+        if(endpoint.getFile()!=null&&endpoint.getFile().isDirectory()){
+            file=new File(endpoint.getFile(),exchange.getFile().getName());
+        }else{
+            file=exchange.getFile();
         }
         try{
             FileChannel fc=new RandomAccessFile(file,"rw").getChannel();
@@ -55,7 +57,7 @@
             fc.write(payload);
             fc.close();
         }catch(Throwable e){
-            log.error("Failed to write to File: " + file,e);
+            log.error("Failed to write to File: "+file,e);
         }
     }
 }