You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/10/30 18:35:30 UTC

svn commit: r469204 - in /incubator/servicemix/trunk: servicemix-file/src/main/java/org/apache/servicemix/file/ servicemix-file/src/test/java/org/apache/servicemix/file/ servicemix-file/src/test/resources/ servicemix-ftp/ servicemix-ftp/src/main/java/o...

Author: gnodet
Date: Mon Oct 30 09:35:29 2006
New Revision: 469204

URL: http://svn.apache.org/viewvc?view=rev&rev=469204
Log:
Implement FtpPollingEndpoint
Add FTPClient validation by default when retrieving a client from the pool
Use LockManager (SM-725) for ftp / file polling endpoints

Added:
    incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollingEndpoint.java
    incubator/servicemix/trunk/servicemix-ftp/src/test/java/org/apache/servicemix/ftp/PollDirectoryTest.java
    incubator/servicemix/trunk/servicemix-ftp/src/test/resources/spring-polling.xml
Modified:
    incubator/servicemix/trunk/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollingEndpoint.java
    incubator/servicemix/trunk/servicemix-file/src/test/java/org/apache/servicemix/file/PollDirectoryTest.java
    incubator/servicemix/trunk/servicemix-file/src/test/resources/spring-polling.xml
    incubator/servicemix/trunk/servicemix-ftp/pom.xml
    incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpComponent.java
    incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpEndpoint.java
    incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/SocketClientPoolSupport.java
    incubator/servicemix/trunk/servicemix-ftp/src/test/resources/spring.xml

Modified: incubator/servicemix/trunk/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollingEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollingEndpoint.java?view=diff&rev=469204&r1=469203&r2=469204
==============================================================================
--- incubator/servicemix/trunk/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollingEndpoint.java (original)
+++ incubator/servicemix/trunk/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollingEndpoint.java Mon Oct 30 09:35:29 2006
@@ -1,5 +1,4 @@
-/**
- *
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -7,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -23,18 +22,23 @@
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.Set;
 
 import javax.jbi.management.DeploymentException;
 import javax.jbi.messaging.InOnly;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.xml.namespace.QName;
 
+import org.apache.servicemix.common.DefaultComponent;
+import org.apache.servicemix.common.ServiceUnit;
 import org.apache.servicemix.common.endpoints.PollingEndpoint;
 import org.apache.servicemix.components.util.DefaultFileMarshaler;
 import org.apache.servicemix.components.util.FileMarshaler;
+import org.apache.servicemix.locks.LockManager;
+import org.apache.servicemix.locks.impl.SimpleLockManager;
 
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
+import edu.emory.mathcs.backport.java.util.concurrent.locks.Lock;
 
 /**
  * A polling endpoint which looks for a file or files in a directory
@@ -53,7 +57,18 @@
     private boolean recursive = true;
     private boolean autoCreateDirectory = true;
     private FileMarshaler marshaler = new DefaultFileMarshaler();
-    private Set workingSet = new CopyOnWriteArraySet();
+    private LockManager lockManager;
+
+    public FilePollingEndpoint() {
+    }
+
+    public FilePollingEndpoint(ServiceUnit serviceUnit, QName service, String endpoint) {
+        super(serviceUnit, service, endpoint);
+    }
+
+    public FilePollingEndpoint(DefaultComponent component, ServiceEndpoint endpoint) {
+        super(component, endpoint);
+    }
 
     public void poll() throws Exception {
         pollFileOrDirectory(file);
@@ -67,6 +82,13 @@
         if (isAutoCreateDirectory() && !file.exists()) {
             file.mkdirs();
         }
+        if (lockManager == null) {
+            lockManager = createLockManager();
+        }
+    }
+    
+    protected LockManager createLockManager() {
+        return new SimpleLockManager();
     }
 
 
@@ -85,6 +107,20 @@
         this.file = file;
     }
 
+    /**
+     * @return the lockManager
+     */
+    public LockManager getLockManager() {
+        return lockManager;
+    }
+
+    /**
+     * @param lockManager the lockManager to set
+     */
+    public void setLockManager(LockManager lockManager) {
+        this.lockManager = lockManager;
+    }
+
     public FileFilter getFilter() {
         return filter;
     }
@@ -131,15 +167,6 @@
         this.marshaler = marshaler;
     }
 
-    /**
-     * The set of FTPFiles that this component is currently working on
-     *
-     * @return
-     */
-    public Set getWorkingSet() {
-        return workingSet;
-    }
-
     // Implementation methods
     //-------------------------------------------------------------------------
 
@@ -165,21 +192,27 @@
     }
 
     protected void pollFile(final File aFile) {
-        if (workingSet.add(aFile)) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Scheduling file " + aFile + " for processing");
-            }
-            getExecutor().execute(new Runnable() {
-                public void run() {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Scheduling file " + aFile + " for processing");
+        }
+        getExecutor().execute(new Runnable() {
+            public void run() {
+                String uri = file.toURI().relativize(aFile.toURI()).toString();
+                Lock lock = lockManager.getLock(uri);
+                if (lock.tryLock()) {
                     try {
                         processFileAndDelete(aFile);
                     }
                     finally {
-                        workingSet.remove(aFile);
+                        lock.unlock();
+                    }
+                } else {
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Unable to acquire lock on " + aFile);
                     }
                 }
-            });
-        }
+            }
+        });
     }
 
     protected void processFileAndDelete(File aFile) {

Modified: incubator/servicemix/trunk/servicemix-file/src/test/java/org/apache/servicemix/file/PollDirectoryTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-file/src/test/java/org/apache/servicemix/file/PollDirectoryTest.java?view=diff&rev=469204&r1=469203&r2=469204
==============================================================================
--- incubator/servicemix/trunk/servicemix-file/src/test/java/org/apache/servicemix/file/PollDirectoryTest.java (original)
+++ incubator/servicemix/trunk/servicemix-file/src/test/java/org/apache/servicemix/file/PollDirectoryTest.java Mon Oct 30 09:35:29 2006
@@ -24,8 +24,10 @@
 import javax.xml.namespace.QName;
 
 import org.apache.servicemix.client.DefaultServiceMixClient;
+import org.apache.servicemix.components.util.DefaultFileMarshaler;
 import org.apache.servicemix.jbi.jaxp.SourceTransformer;
 import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.tck.Receiver;
 import org.apache.servicemix.tck.SpringTestSupport;
 import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
 import org.springframework.context.support.AbstractXmlApplicationContext;
@@ -35,23 +37,25 @@
     protected String dynamicURI = "file:" + directoryName;
 
 
+    private int NUMBER = 10;
+
     public void testSendToWriterSoItCanBePolled() throws Exception {
         // now lets make a request on this endpoint
         DefaultServiceMixClient client = new DefaultServiceMixClient(jbi);
 
         // lets send a request to be written to a file
         // which should then be polled
-        InOnly me = client.createInOnlyExchange();
-        me.setService(new QName("urn:test", "service"));
-        NormalizedMessage message = me.getInMessage();
-
-        message.setProperty("name", "cheese");
-        message.setContent(new StringSource("<hello>world</hello>"));
-
-        client.sendSync(me);
-
+        for (int i = 0; i < NUMBER; i++) {
+            InOnly me = client.createInOnlyExchange();
+            me.setService(new QName("urn:test", "service"));
+            NormalizedMessage message = me.getInMessage();
+            message.setProperty(DefaultFileMarshaler.FILE_NAME_PROPERTY, "test" + i + ".xml");
+            message.setContent(new StringSource("<hello>world</hello>"));
+            client.sendSync(me);
+        }
 
-        Thread.sleep(5000);
+        Receiver receiver = (Receiver) getBean("receiver");
+        receiver.getMessageList().assertMessagesReceived(NUMBER);
     }
 
     protected void assertExchangeWorked(MessageExchange me) throws Exception {

Modified: incubator/servicemix/trunk/servicemix-file/src/test/resources/spring-polling.xml
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-file/src/test/resources/spring-polling.xml?view=diff&rev=469204&r1=469203&r2=469204
==============================================================================
--- incubator/servicemix/trunk/servicemix-file/src/test/resources/spring-polling.xml (original)
+++ incubator/servicemix/trunk/servicemix-file/src/test/resources/spring-polling.xml Mon Oct 30 09:35:29 2006
@@ -31,19 +31,25 @@
             	<file:endpoints>
                 <file:endpoint service="test:service"
                                endpoint="endpoint"
-                               directory="file:target/componentOutput" />
+                               directory="file:target/pollerFiles" />
             	</file:endpoints>
               
               <file:pollingEndpoints>
                 <file:pollingEndpoint service="test:poller"
                                       endpoint="poller"
-                                      targetService="test:service"
+                                      targetService="test:receiver"
                                       file="file:target/pollerFiles" />
               </file:pollingEndpoints>
             </file:component>
         </sm:component>
       </sm:activationSpec>
-
+      
+      <sm:activationSpec id="receiver" service="test:receiver">
+        <sm:component>
+          <bean class="org.apache.servicemix.tck.ReceiverComponent" />
+        </sm:component>
+      </sm:activationSpec>
+      
     </sm:activationSpecs>
   </sm:container>
 

Modified: incubator/servicemix/trunk/servicemix-ftp/pom.xml
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-ftp/pom.xml?view=diff&rev=469204&r1=469203&r2=469204
==============================================================================
--- incubator/servicemix/trunk/servicemix-ftp/pom.xml (original)
+++ incubator/servicemix/trunk/servicemix-ftp/pom.xml Mon Oct 30 09:35:29 2006
@@ -50,8 +50,11 @@
     <dependency>
       <groupId>commons-net</groupId>
       <artifactId>commons-net</artifactId>
-      <optional>true</optional>
-    </dependency>    
+    </dependency>
+    <dependency>
+      <groupId>oro</groupId>
+      <artifactId>oro</artifactId>
+    </dependency>
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>

Modified: incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpComponent.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpComponent.java?view=diff&rev=469204&r1=469203&r2=469204
==============================================================================
--- incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpComponent.java (original)
+++ incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpComponent.java Mon Oct 30 09:35:29 2006
@@ -17,6 +17,7 @@
 package org.apache.servicemix.ftp;
 
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -36,6 +37,7 @@
 public class FtpComponent extends DefaultComponent {
 
     private FtpEndpoint[] endpoints;
+    private FtpPollingEndpoint[] pollingEndpoints;
 
     public FtpEndpoint[] getEndpoints() {
         return endpoints;
@@ -45,12 +47,29 @@
         this.endpoints = endpoints;
     }
 
+    /**
+     * @return the pollingEndpoints
+     */
+    public FtpPollingEndpoint[] getPollingEndpoints() {
+        return pollingEndpoints;
+    }
+
+    /**
+     * @param pollingEndpoints the pollingEndpoints to set
+     */
+    public void setPollingEndpoints(FtpPollingEndpoint[] pollingEndpoints) {
+        this.pollingEndpoints = pollingEndpoints;
+    }
+
     protected List getConfiguredEndpoints() {
-        return asList(getEndpoints());
+        ArrayList l = new ArrayList();
+        l.addAll(asList(getEndpoints()));
+        l.addAll(asList(getPollingEndpoints()));
+        return l;
     }
 
     protected Class[] getEndpointClasses() {
-        return new Class[]{FtpEndpoint.class};
+        return new Class[] { FtpEndpoint.class, FtpPollingEndpoint.class };
     }
 
     protected Endpoint getResolvedEPR(ServiceEndpoint ep) throws Exception {

Modified: incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpEndpoint.java?view=diff&rev=469204&r1=469203&r2=469204
==============================================================================
--- incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpEndpoint.java (original)
+++ incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpEndpoint.java Mon Oct 30 09:35:29 2006
@@ -16,19 +16,20 @@
  */
 package org.apache.servicemix.ftp;
 
-import org.apache.commons.net.SocketClient;
-import org.apache.commons.net.ftp.FTPClient;
-import org.apache.servicemix.common.endpoints.ProviderEndpoint;
-import org.apache.servicemix.components.util.DefaultFileMarshaler;
-import org.apache.servicemix.components.util.FileMarshaler;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
 
+import javax.jbi.JBIException;
 import javax.jbi.management.DeploymentException;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.NormalizedMessage;
 import javax.jbi.servicedesc.ServiceEndpoint;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
+
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.servicemix.common.endpoints.ProviderEndpoint;
+import org.apache.servicemix.components.util.DefaultFileMarshaler;
+import org.apache.servicemix.components.util.FileMarshaler;
 
 /**
  * An FTP endpoint
@@ -133,7 +134,7 @@
         FTPClient client = null;
         OutputStream out = null;
         try {
-            client = (FTPClient) getClientPool().borrowClient();
+            client = borrowClient();
             // Change to the directory specified by the URI path if any
             if (uri != null && uri.getPath() != null) {
                 client.changeWorkingDirectory(uri.getPath());
@@ -162,7 +163,6 @@
                 throw new IOException("No output stream available for output name: " + name + ". Maybe the file already exists?");
             }
             marshaler.writeMessage(exchange, message, out, name);
-            done(exchange);
         }
         finally {
             returnClient(client);
@@ -183,7 +183,16 @@
         return pool;
     }
 
-    protected void returnClient(SocketClient client) {
+    protected FTPClient borrowClient() throws JBIException {
+        try {
+            return (FTPClient) getClientPool().borrowClient();
+        }
+        catch (Exception e) {
+            throw new JBIException(e);
+        }
+    }
+
+    protected void returnClient(FTPClient client) {
         if (client != null) {
             try {
                 getClientPool().returnClient(client);

Added: incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollingEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollingEndpoint.java?view=auto&rev=469204
==============================================================================
--- incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollingEndpoint.java (added)
+++ incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollingEndpoint.java Mon Oct 30 09:35:29 2006
@@ -0,0 +1,313 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.ftp;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+
+import javax.jbi.JBIException;
+import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.xml.namespace.QName;
+
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.commons.net.ftp.FTPFile;
+import org.apache.servicemix.common.DefaultComponent;
+import org.apache.servicemix.common.ServiceUnit;
+import org.apache.servicemix.common.endpoints.PollingEndpoint;
+import org.apache.servicemix.components.util.DefaultFileMarshaler;
+import org.apache.servicemix.components.util.FileMarshaler;
+import org.apache.servicemix.locks.LockManager;
+import org.apache.servicemix.locks.impl.SimpleLockManager;
+
+import edu.emory.mathcs.backport.java.util.concurrent.locks.Lock;
+
+/**
+ * A polling endpoint which looks for a file or files in a directory
+ * and sends the files into the JBI bus as messages, deleting the files
+ * by default when they are processed.
+ *
+ * @org.apache.xbean.XBean element="pollingEndpoint"
+ *
+ * @version $Revision: 468487 $
+ */
+public class FtpPollingEndpoint extends PollingEndpoint {
+
+    private FTPClientPool clientPool;
+    private FileFilter filter;
+    private boolean deleteFile = true;
+    private boolean recursive = true;
+    private FileMarshaler marshaler = new DefaultFileMarshaler();
+    private LockManager lockManager;
+    private URI uri;
+
+    public FtpPollingEndpoint() {
+    }
+
+    public FtpPollingEndpoint(ServiceUnit serviceUnit, QName service, String endpoint) {
+        super(serviceUnit, service, endpoint);
+    }
+
+    public FtpPollingEndpoint(DefaultComponent component, ServiceEndpoint endpoint) {
+        super(component, endpoint);
+    }
+
+    public void poll() throws Exception {
+        pollFileOrDirectory(getWorkingPath());
+    }
+
+    public void validate() throws DeploymentException {
+        super.validate();
+        if (uri == null && (getClientPool() == null || getClientPool().getHost() == null)) {
+            throw new DeploymentException("Property uri or clientPool.host must be configured");
+        }
+        if (uri != null && getClientPool() != null && getClientPool().getHost() != null) {
+            throw new DeploymentException("Properties uri and clientPool.host can not be configured at the same time");
+        }
+    }
+    
+    public void start() throws Exception {
+        if (lockManager == null) {
+            lockManager = createLockManager();
+        }
+        if (clientPool == null) {
+            clientPool = createClientPool();
+        }
+        if (uri != null) {
+            clientPool.setHost(uri.getHost());
+            clientPool.setPort(uri.getPort());
+            if (uri.getUserInfo() != null) {
+                String[] infos = uri.getUserInfo().split(":");
+                clientPool.setUsername(infos[0]);
+                if (infos.length > 1) {
+                    clientPool.setPassword(infos[1]);
+                }
+            }
+        } else {
+            String str = "ftp://" + clientPool.getHost();
+            if (clientPool.getPort() >= 0) {
+                str += ":" + clientPool.getPort();
+            }
+            str += "/";
+            uri = new URI(str);
+        }
+        super.start();
+    }
+    
+    protected LockManager createLockManager() {
+        return new SimpleLockManager();
+    }
+
+    private String getWorkingPath() {
+        return (uri != null && uri.getPath() != null) ? uri.getPath() : ".";
+    }
+
+    // Properties
+    //-------------------------------------------------------------------------
+    /**
+     * @return the clientPool
+     */
+    public FTPClientPool getClientPool() {
+        return clientPool;
+    }
+
+    /**
+     * @param clientPool the clientPool to set
+     */
+    public void setClientPool(FTPClientPool clientPool) {
+        this.clientPool = clientPool;
+    }
+
+    /**
+     * @return the uri
+     */
+    public URI getUri() {
+        return uri;
+    }
+
+    /**
+     * @param uri the uri to set
+     */
+    public void setUri(URI uri) {
+        this.uri = uri;
+    }
+
+    public FileFilter getFilter() {
+        return filter;
+    }
+
+    /**
+     * Sets the optional filter to choose which files to process
+     */
+    public void setFilter(FileFilter filter) {
+        this.filter = filter;
+    }
+
+    /**
+     * Returns whether or not we should delete the file when its processed
+     */
+    public boolean isDeleteFile() {
+        return deleteFile;
+    }
+
+    public void setDeleteFile(boolean deleteFile) {
+        this.deleteFile = deleteFile;
+    }
+
+    public boolean isRecursive() {
+        return recursive;
+    }
+
+    public void setRecursive(boolean recursive) {
+        this.recursive = recursive;
+    }
+
+    public FileMarshaler getMarshaler() {
+        return marshaler;
+    }
+
+    public void setMarshaler(FileMarshaler marshaler) {
+        this.marshaler = marshaler;
+    }
+
+    // Implementation methods
+    //-------------------------------------------------------------------------
+
+
+    protected void pollFileOrDirectory(String fileOrDirectory) throws Exception {
+        FTPClient ftp = borrowClient();
+        try {
+            logger.debug("Polling directory " + fileOrDirectory);
+            pollFileOrDirectory(ftp, fileOrDirectory, true);
+        }
+        finally {
+            returnClient(ftp);
+        }
+    }
+
+    protected void pollFileOrDirectory(FTPClient ftp, String fileOrDirectory, boolean processDir) throws Exception {
+        FTPFile[] files = ftp.listFiles(fileOrDirectory);
+        for (int i = 0; i < files.length; i++) {
+            String file = fileOrDirectory + "/" + files[i].getName();
+            if (!files[i].isDirectory()) {
+                if (getFilter() == null || getFilter().accept(new File(file))) {
+                    pollFile(file); // process the file
+                }
+            } else if (processDir) {
+                logger.debug("Polling directory " + file);
+                pollFileOrDirectory(ftp, file, isRecursive());
+            } else {
+                logger.debug("Skipping directory " + file);
+            }
+        }
+    }
+
+    protected void pollFile(final String file) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Scheduling file " + file + " for processing");
+        }
+        getExecutor().execute(new Runnable() {
+            public void run() {
+                final Lock lock = lockManager.getLock(file);
+                if (lock.tryLock()) {
+                    try {
+                        processFileAndDelete(file);
+                    }
+                    finally {
+                        lock.unlock();
+                    }
+                }
+            }
+        });
+    }
+
+    protected void processFileAndDelete(String file) {
+        FTPClient ftp = null;
+        try {
+            ftp = borrowClient();
+            if (logger.isDebugEnabled()) {
+                logger.debug("Processing file " + file);
+            }
+            processFile(ftp, file);
+            if (isDeleteFile()) {
+                if (!ftp.deleteFile(file)) {
+                    throw new IOException("Could not delete file " + file);
+                }
+            }
+        }
+        catch (Exception e) {
+            logger.error("Failed to process file: " + file + ". Reason: " + e, e);
+        } finally {
+            returnClient(ftp);
+        }
+    }
+
+    protected void processFile(FTPClient ftp, String file) throws Exception {
+        InputStream in = ftp.retrieveFileStream(file);
+        ftp.completePendingCommand();
+        InOnly exchange = getExchangeFactory().createInOnlyExchange();
+        configureExchangeTarget(exchange);
+        NormalizedMessage message = exchange.createMessage();
+        exchange.setInMessage(message);
+        marshaler.readMessage(exchange, message, in, file);
+        sendSync(exchange);
+        in.close();
+    }
+
+    public String getLocationURI() {
+        return uri.toString();
+    }
+
+    public void process(MessageExchange exchange) throws Exception {
+        // Do nothing. In our case, this method should never be called
+        // as we only send synchronous InOnly exchange
+    }
+
+    protected FTPClientPool createClientPool() throws Exception {
+        FTPClientPool pool = new FTPClientPool();
+        pool.afterPropertiesSet();
+        return pool;
+    }
+
+    protected FTPClient borrowClient() throws JBIException {
+        try {
+            return (FTPClient) getClientPool().borrowClient();
+        }
+        catch (Exception e) {
+            throw new JBIException(e);
+        }
+    }
+
+    protected void returnClient(FTPClient client) {
+        if (client != null) {
+            try {
+                getClientPool().returnClient(client);
+            }
+            catch (Exception e) {
+                logger.error("Failed to return client to pool: " + e, e);
+            }
+        }
+    }
+    
+}

Modified: incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/SocketClientPoolSupport.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/SocketClientPoolSupport.java?view=diff&rev=469204&r1=469203&r2=469204
==============================================================================
--- incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/SocketClientPoolSupport.java (original)
+++ incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/SocketClientPoolSupport.java Mon Oct 30 09:35:29 2006
@@ -44,7 +44,9 @@
 
     public void afterPropertiesSet() throws Exception {
         if (pool == null) {
-            pool = new GenericObjectPool();
+            GenericObjectPool goPool = new GenericObjectPool();
+            goPool.setTestOnBorrow(true);
+            pool = goPool;
         }
         pool.setFactory(this);
     }

Added: incubator/servicemix/trunk/servicemix-ftp/src/test/java/org/apache/servicemix/ftp/PollDirectoryTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-ftp/src/test/java/org/apache/servicemix/ftp/PollDirectoryTest.java?view=auto&rev=469204
==============================================================================
--- incubator/servicemix/trunk/servicemix-ftp/src/test/java/org/apache/servicemix/ftp/PollDirectoryTest.java (added)
+++ incubator/servicemix/trunk/servicemix-ftp/src/test/java/org/apache/servicemix/ftp/PollDirectoryTest.java Mon Oct 30 09:35:29 2006
@@ -0,0 +1,78 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.ftp;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.namespace.QName;
+
+import org.apache.servicemix.client.DefaultServiceMixClient;
+import org.apache.servicemix.components.util.DefaultFileMarshaler;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.tck.Receiver;
+import org.apache.servicemix.tck.SpringTestSupport;
+import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+
+public class PollDirectoryTest extends SpringTestSupport {
+    protected String directoryName = "target/pollDirectory";
+    protected String dynamicURI = "file:" + directoryName;
+
+    private int NUMBER = 10;
+
+    public void testSendToWriterSoItCanBePolled() throws Exception {
+        // now lets make a request on this endpoint
+        DefaultServiceMixClient client = new DefaultServiceMixClient(jbi);
+
+        // lets send a request to be written to a file
+        // which should then be polled
+        for (int i = 0; i < NUMBER; i++) {
+            InOnly me = client.createInOnlyExchange();
+            me.setService(new QName("urn:test", "service"));
+            NormalizedMessage message = me.getInMessage();
+            message.setProperty(DefaultFileMarshaler.FILE_NAME_PROPERTY, "test" + i + ".xml");
+            message.setContent(new StringSource("<hello>world</hello>"));
+            client.sendSync(me);
+        }
+
+        Receiver receiver = (Receiver) getBean("receiver");
+        receiver.getMessageList().assertMessagesReceived(NUMBER);
+    }
+
+    protected void assertExchangeWorked(MessageExchange me) throws Exception {
+        if (me.getStatus() == ExchangeStatus.ERROR) {
+            if (me.getError() != null) {
+                throw me.getError();
+            }
+            else {
+                fail("Received ERROR status");
+            }
+        }
+        else if (me.getFault() != null) {
+            fail("Received fault: " + new SourceTransformer().toString(me.getFault().getContent()));
+        }
+    }
+
+    protected AbstractXmlApplicationContext createBeanFactory() {
+        return new ClassPathXmlApplicationContext("spring-polling.xml");
+    }
+
+}

Added: incubator/servicemix/trunk/servicemix-ftp/src/test/resources/spring-polling.xml
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-ftp/src/test/resources/spring-polling.xml?view=auto&rev=469204
==============================================================================
--- incubator/servicemix/trunk/servicemix-ftp/src/test/resources/spring-polling.xml (added)
+++ incubator/servicemix/trunk/servicemix-ftp/src/test/resources/spring-polling.xml Mon Oct 30 09:35:29 2006
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<beans xmlns:sm="http://servicemix.apache.org/config/1.0" 
+	     xmlns:ftp="http://servicemix.apache.org/ftp/1.0"
+       xmlns:test="urn:test">
+
+  <sm:container id="jbi" embedded="true" createMBeanServer="false">
+    
+    <sm:activationSpecs>
+
+      <sm:activationSpec>
+      	<sm:component>
+            <ftp:component>
+            	<ftp:endpoints>
+            		<ftp:endpoint service="test:service"
+                              endpoint="endpoint" 
+                              uri="ftp://servicemix:rocks@localhost/smx/test" />
+            	</ftp:endpoints>
+            	
+              <ftp:pollingEndpoints>
+                <ftp:pollingEndpoint service="test:poller"
+                                     endpoint="endpoint"
+                                     targetService="test:receiver"
+                                     uri="ftp://servicemix:rocks@localhost/smx/test" />
+              </ftp:pollingEndpoints>
+            </ftp:component>
+        </sm:component>
+      </sm:activationSpec>
+      
+      <sm:activationSpec id="receiver" service="test:receiver">
+        <sm:component>
+          <bean class="org.apache.servicemix.tck.ReceiverComponent" />
+        </sm:component>
+      </sm:activationSpec>
+
+    </sm:activationSpecs>
+  </sm:container>
+
+</beans>

Modified: incubator/servicemix/trunk/servicemix-ftp/src/test/resources/spring.xml
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-ftp/src/test/resources/spring.xml?view=diff&rev=469204&r1=469203&r2=469204
==============================================================================
--- incubator/servicemix/trunk/servicemix-ftp/src/test/resources/spring.xml (original)
+++ incubator/servicemix/trunk/servicemix-ftp/src/test/resources/spring.xml Mon Oct 30 09:35:29 2006
@@ -29,8 +29,10 @@
       	<sm:component>
             <ftp:component>
             	<ftp:endpoints>
-            		<ftp:endpoint service="test:service" endpoint="endpoint" uri="ftp://servicemix:rocks@localhost/smx/test"/>
-            	</ftp:endpoints>
+            		<ftp:endpoint service="test:service"
+                              endpoint="endpoint" 
+                              uri="ftp://servicemix:rocks@localhost/smx/test" />
+            	</ftp:endpoints>
             </ftp:component>
         </sm:component>
       </sm:activationSpec>