You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/10/30 18:35:30 UTC
svn commit: r469204 - in /incubator/servicemix/trunk:
servicemix-file/src/main/java/org/apache/servicemix/file/
servicemix-file/src/test/java/org/apache/servicemix/file/
servicemix-file/src/test/resources/ servicemix-ftp/
servicemix-ftp/src/main/java/o...
Author: gnodet
Date: Mon Oct 30 09:35:29 2006
New Revision: 469204
URL: http://svn.apache.org/viewvc?view=rev&rev=469204
Log:
Implement FtpPollingEndpoint
Add FTPClient validation by default when retrieving a client from the pool
Use LockManager (SM-725) for ftp / file polling endpoints
Added:
incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollingEndpoint.java
incubator/servicemix/trunk/servicemix-ftp/src/test/java/org/apache/servicemix/ftp/PollDirectoryTest.java
incubator/servicemix/trunk/servicemix-ftp/src/test/resources/spring-polling.xml
Modified:
incubator/servicemix/trunk/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollingEndpoint.java
incubator/servicemix/trunk/servicemix-file/src/test/java/org/apache/servicemix/file/PollDirectoryTest.java
incubator/servicemix/trunk/servicemix-file/src/test/resources/spring-polling.xml
incubator/servicemix/trunk/servicemix-ftp/pom.xml
incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpComponent.java
incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpEndpoint.java
incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/SocketClientPoolSupport.java
incubator/servicemix/trunk/servicemix-ftp/src/test/resources/spring.xml
Modified: incubator/servicemix/trunk/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollingEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollingEndpoint.java?view=diff&rev=469204&r1=469203&r2=469204
==============================================================================
--- incubator/servicemix/trunk/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollingEndpoint.java (original)
+++ incubator/servicemix/trunk/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollingEndpoint.java Mon Oct 30 09:35:29 2006
@@ -1,5 +1,4 @@
-/**
- *
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -7,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -23,18 +22,23 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.util.Set;
import javax.jbi.management.DeploymentException;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.xml.namespace.QName;
+import org.apache.servicemix.common.DefaultComponent;
+import org.apache.servicemix.common.ServiceUnit;
import org.apache.servicemix.common.endpoints.PollingEndpoint;
import org.apache.servicemix.components.util.DefaultFileMarshaler;
import org.apache.servicemix.components.util.FileMarshaler;
+import org.apache.servicemix.locks.LockManager;
+import org.apache.servicemix.locks.impl.SimpleLockManager;
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
+import edu.emory.mathcs.backport.java.util.concurrent.locks.Lock;
/**
* A polling endpoint which looks for a file or files in a directory
@@ -53,7 +57,18 @@
private boolean recursive = true;
private boolean autoCreateDirectory = true;
private FileMarshaler marshaler = new DefaultFileMarshaler();
- private Set workingSet = new CopyOnWriteArraySet();
+ private LockManager lockManager;
+
+ public FilePollingEndpoint() {
+ }
+
+ public FilePollingEndpoint(ServiceUnit serviceUnit, QName service, String endpoint) {
+ super(serviceUnit, service, endpoint);
+ }
+
+ public FilePollingEndpoint(DefaultComponent component, ServiceEndpoint endpoint) {
+ super(component, endpoint);
+ }
public void poll() throws Exception {
pollFileOrDirectory(file);
@@ -67,6 +82,13 @@
if (isAutoCreateDirectory() && !file.exists()) {
file.mkdirs();
}
+ if (lockManager == null) {
+ lockManager = createLockManager();
+ }
+ }
+
+ protected LockManager createLockManager() {
+ return new SimpleLockManager();
}
@@ -85,6 +107,20 @@
this.file = file;
}
+ /**
+ * @return the lockManager
+ */
+ public LockManager getLockManager() {
+ return lockManager;
+ }
+
+ /**
+ * @param lockManager the lockManager to set
+ */
+ public void setLockManager(LockManager lockManager) {
+ this.lockManager = lockManager;
+ }
+
public FileFilter getFilter() {
return filter;
}
@@ -131,15 +167,6 @@
this.marshaler = marshaler;
}
- /**
- * The set of FTPFiles that this component is currently working on
- *
- * @return
- */
- public Set getWorkingSet() {
- return workingSet;
- }
-
// Implementation methods
//-------------------------------------------------------------------------
@@ -165,21 +192,27 @@
}
protected void pollFile(final File aFile) {
- if (workingSet.add(aFile)) {
- if (logger.isDebugEnabled()) {
- logger.debug("Scheduling file " + aFile + " for processing");
- }
- getExecutor().execute(new Runnable() {
- public void run() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Scheduling file " + aFile + " for processing");
+ }
+ getExecutor().execute(new Runnable() {
+ public void run() {
+ String uri = file.toURI().relativize(aFile.toURI()).toString();
+ Lock lock = lockManager.getLock(uri);
+ if (lock.tryLock()) {
try {
processFileAndDelete(aFile);
}
finally {
- workingSet.remove(aFile);
+ lock.unlock();
+ }
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Unable to acquire lock on " + aFile);
}
}
- });
- }
+ }
+ });
}
protected void processFileAndDelete(File aFile) {
Modified: incubator/servicemix/trunk/servicemix-file/src/test/java/org/apache/servicemix/file/PollDirectoryTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-file/src/test/java/org/apache/servicemix/file/PollDirectoryTest.java?view=diff&rev=469204&r1=469203&r2=469204
==============================================================================
--- incubator/servicemix/trunk/servicemix-file/src/test/java/org/apache/servicemix/file/PollDirectoryTest.java (original)
+++ incubator/servicemix/trunk/servicemix-file/src/test/java/org/apache/servicemix/file/PollDirectoryTest.java Mon Oct 30 09:35:29 2006
@@ -24,8 +24,10 @@
import javax.xml.namespace.QName;
import org.apache.servicemix.client.DefaultServiceMixClient;
+import org.apache.servicemix.components.util.DefaultFileMarshaler;
import org.apache.servicemix.jbi.jaxp.SourceTransformer;
import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.tck.Receiver;
import org.apache.servicemix.tck.SpringTestSupport;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.context.support.AbstractXmlApplicationContext;
@@ -35,23 +37,25 @@
protected String dynamicURI = "file:" + directoryName;
+ private int NUMBER = 10;
+
public void testSendToWriterSoItCanBePolled() throws Exception {
// now lets make a request on this endpoint
DefaultServiceMixClient client = new DefaultServiceMixClient(jbi);
// lets send a request to be written to a file
// which should then be polled
- InOnly me = client.createInOnlyExchange();
- me.setService(new QName("urn:test", "service"));
- NormalizedMessage message = me.getInMessage();
-
- message.setProperty("name", "cheese");
- message.setContent(new StringSource("<hello>world</hello>"));
-
- client.sendSync(me);
-
+ for (int i = 0; i < NUMBER; i++) {
+ InOnly me = client.createInOnlyExchange();
+ me.setService(new QName("urn:test", "service"));
+ NormalizedMessage message = me.getInMessage();
+ message.setProperty(DefaultFileMarshaler.FILE_NAME_PROPERTY, "test" + i + ".xml");
+ message.setContent(new StringSource("<hello>world</hello>"));
+ client.sendSync(me);
+ }
- Thread.sleep(5000);
+ Receiver receiver = (Receiver) getBean("receiver");
+ receiver.getMessageList().assertMessagesReceived(NUMBER);
}
protected void assertExchangeWorked(MessageExchange me) throws Exception {
Modified: incubator/servicemix/trunk/servicemix-file/src/test/resources/spring-polling.xml
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-file/src/test/resources/spring-polling.xml?view=diff&rev=469204&r1=469203&r2=469204
==============================================================================
--- incubator/servicemix/trunk/servicemix-file/src/test/resources/spring-polling.xml (original)
+++ incubator/servicemix/trunk/servicemix-file/src/test/resources/spring-polling.xml Mon Oct 30 09:35:29 2006
@@ -31,19 +31,25 @@
<file:endpoints>
<file:endpoint service="test:service"
endpoint="endpoint"
- directory="file:target/componentOutput" />
+ directory="file:target/pollerFiles" />
</file:endpoints>
<file:pollingEndpoints>
<file:pollingEndpoint service="test:poller"
endpoint="poller"
- targetService="test:service"
+ targetService="test:receiver"
file="file:target/pollerFiles" />
</file:pollingEndpoints>
</file:component>
</sm:component>
</sm:activationSpec>
-
+
+ <sm:activationSpec id="receiver" service="test:receiver">
+ <sm:component>
+ <bean class="org.apache.servicemix.tck.ReceiverComponent" />
+ </sm:component>
+ </sm:activationSpec>
+
</sm:activationSpecs>
</sm:container>
Modified: incubator/servicemix/trunk/servicemix-ftp/pom.xml
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-ftp/pom.xml?view=diff&rev=469204&r1=469203&r2=469204
==============================================================================
--- incubator/servicemix/trunk/servicemix-ftp/pom.xml (original)
+++ incubator/servicemix/trunk/servicemix-ftp/pom.xml Mon Oct 30 09:35:29 2006
@@ -50,8 +50,11 @@
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
- <optional>true</optional>
- </dependency>
+ </dependency>
+ <dependency>
+ <groupId>oro</groupId>
+ <artifactId>oro</artifactId>
+ </dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Modified: incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpComponent.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpComponent.java?view=diff&rev=469204&r1=469203&r2=469204
==============================================================================
--- incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpComponent.java (original)
+++ incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpComponent.java Mon Oct 30 09:35:29 2006
@@ -17,6 +17,7 @@
package org.apache.servicemix.ftp;
import java.net.URI;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -36,6 +37,7 @@
public class FtpComponent extends DefaultComponent {
private FtpEndpoint[] endpoints;
+ private FtpPollingEndpoint[] pollingEndpoints;
public FtpEndpoint[] getEndpoints() {
return endpoints;
@@ -45,12 +47,29 @@
this.endpoints = endpoints;
}
+ /**
+ * @return the pollingEndpoints
+ */
+ public FtpPollingEndpoint[] getPollingEndpoints() {
+ return pollingEndpoints;
+ }
+
+ /**
+ * @param pollingEndpoints the pollingEndpoints to set
+ */
+ public void setPollingEndpoints(FtpPollingEndpoint[] pollingEndpoints) {
+ this.pollingEndpoints = pollingEndpoints;
+ }
+
protected List getConfiguredEndpoints() {
- return asList(getEndpoints());
+ ArrayList l = new ArrayList();
+ l.addAll(asList(getEndpoints()));
+ l.addAll(asList(getPollingEndpoints()));
+ return l;
}
protected Class[] getEndpointClasses() {
- return new Class[]{FtpEndpoint.class};
+ return new Class[] { FtpEndpoint.class, FtpPollingEndpoint.class };
}
protected Endpoint getResolvedEPR(ServiceEndpoint ep) throws Exception {
Modified: incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpEndpoint.java?view=diff&rev=469204&r1=469203&r2=469204
==============================================================================
--- incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpEndpoint.java (original)
+++ incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpEndpoint.java Mon Oct 30 09:35:29 2006
@@ -16,19 +16,20 @@
*/
package org.apache.servicemix.ftp;
-import org.apache.commons.net.SocketClient;
-import org.apache.commons.net.ftp.FTPClient;
-import org.apache.servicemix.common.endpoints.ProviderEndpoint;
-import org.apache.servicemix.components.util.DefaultFileMarshaler;
-import org.apache.servicemix.components.util.FileMarshaler;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import javax.jbi.JBIException;
import javax.jbi.management.DeploymentException;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.NormalizedMessage;
import javax.jbi.servicedesc.ServiceEndpoint;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
+
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.servicemix.common.endpoints.ProviderEndpoint;
+import org.apache.servicemix.components.util.DefaultFileMarshaler;
+import org.apache.servicemix.components.util.FileMarshaler;
/**
* An FTP endpoint
@@ -133,7 +134,7 @@
FTPClient client = null;
OutputStream out = null;
try {
- client = (FTPClient) getClientPool().borrowClient();
+ client = borrowClient();
// Change to the directory specified by the URI path if any
if (uri != null && uri.getPath() != null) {
client.changeWorkingDirectory(uri.getPath());
@@ -162,7 +163,6 @@
throw new IOException("No output stream available for output name: " + name + ". Maybe the file already exists?");
}
marshaler.writeMessage(exchange, message, out, name);
- done(exchange);
}
finally {
returnClient(client);
@@ -183,7 +183,16 @@
return pool;
}
- protected void returnClient(SocketClient client) {
+ protected FTPClient borrowClient() throws JBIException {
+ try {
+ return (FTPClient) getClientPool().borrowClient();
+ }
+ catch (Exception e) {
+ throw new JBIException(e);
+ }
+ }
+
+ protected void returnClient(FTPClient client) {
if (client != null) {
try {
getClientPool().returnClient(client);
Added: incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollingEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollingEndpoint.java?view=auto&rev=469204
==============================================================================
--- incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollingEndpoint.java (added)
+++ incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollingEndpoint.java Mon Oct 30 09:35:29 2006
@@ -0,0 +1,313 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.ftp;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+
+import javax.jbi.JBIException;
+import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.xml.namespace.QName;
+
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.commons.net.ftp.FTPFile;
+import org.apache.servicemix.common.DefaultComponent;
+import org.apache.servicemix.common.ServiceUnit;
+import org.apache.servicemix.common.endpoints.PollingEndpoint;
+import org.apache.servicemix.components.util.DefaultFileMarshaler;
+import org.apache.servicemix.components.util.FileMarshaler;
+import org.apache.servicemix.locks.LockManager;
+import org.apache.servicemix.locks.impl.SimpleLockManager;
+
+import edu.emory.mathcs.backport.java.util.concurrent.locks.Lock;
+
+/**
+ * A polling endpoint which looks for a file or files in a directory
+ * and sends the files into the JBI bus as messages, deleting the files
+ * by default when they are processed.
+ *
+ * @org.apache.xbean.XBean element="pollingEndpoint"
+ *
+ * @version $Revision: 468487 $
+ */
+public class FtpPollingEndpoint extends PollingEndpoint {
+
+ private FTPClientPool clientPool;
+ private FileFilter filter;
+ private boolean deleteFile = true;
+ private boolean recursive = true;
+ private FileMarshaler marshaler = new DefaultFileMarshaler();
+ private LockManager lockManager;
+ private URI uri;
+
+ public FtpPollingEndpoint() {
+ }
+
+ public FtpPollingEndpoint(ServiceUnit serviceUnit, QName service, String endpoint) {
+ super(serviceUnit, service, endpoint);
+ }
+
+ public FtpPollingEndpoint(DefaultComponent component, ServiceEndpoint endpoint) {
+ super(component, endpoint);
+ }
+
+ public void poll() throws Exception {
+ pollFileOrDirectory(getWorkingPath());
+ }
+
+ public void validate() throws DeploymentException {
+ super.validate();
+ if (uri == null && (getClientPool() == null || getClientPool().getHost() == null)) {
+ throw new DeploymentException("Property uri or clientPool.host must be configured");
+ }
+ if (uri != null && getClientPool() != null && getClientPool().getHost() != null) {
+ throw new DeploymentException("Properties uri and clientPool.host can not be configured at the same time");
+ }
+ }
+
+ public void start() throws Exception {
+ if (lockManager == null) {
+ lockManager = createLockManager();
+ }
+ if (clientPool == null) {
+ clientPool = createClientPool();
+ }
+ if (uri != null) {
+ clientPool.setHost(uri.getHost());
+ clientPool.setPort(uri.getPort());
+ if (uri.getUserInfo() != null) {
+ String[] infos = uri.getUserInfo().split(":");
+ clientPool.setUsername(infos[0]);
+ if (infos.length > 1) {
+ clientPool.setPassword(infos[1]);
+ }
+ }
+ } else {
+ String str = "ftp://" + clientPool.getHost();
+ if (clientPool.getPort() >= 0) {
+ str += ":" + clientPool.getPort();
+ }
+ str += "/";
+ uri = new URI(str);
+ }
+ super.start();
+ }
+
+ protected LockManager createLockManager() {
+ return new SimpleLockManager();
+ }
+
+ private String getWorkingPath() {
+ return (uri != null && uri.getPath() != null) ? uri.getPath() : ".";
+ }
+
+ // Properties
+ //-------------------------------------------------------------------------
+ /**
+ * @return the clientPool
+ */
+ public FTPClientPool getClientPool() {
+ return clientPool;
+ }
+
+ /**
+ * @param clientPool the clientPool to set
+ */
+ public void setClientPool(FTPClientPool clientPool) {
+ this.clientPool = clientPool;
+ }
+
+ /**
+ * @return the uri
+ */
+ public URI getUri() {
+ return uri;
+ }
+
+ /**
+ * @param uri the uri to set
+ */
+ public void setUri(URI uri) {
+ this.uri = uri;
+ }
+
+ public FileFilter getFilter() {
+ return filter;
+ }
+
+ /**
+ * Sets the optional filter to choose which files to process
+ */
+ public void setFilter(FileFilter filter) {
+ this.filter = filter;
+ }
+
+ /**
+ * Returns whether or not we should delete the file when its processed
+ */
+ public boolean isDeleteFile() {
+ return deleteFile;
+ }
+
+ public void setDeleteFile(boolean deleteFile) {
+ this.deleteFile = deleteFile;
+ }
+
+ public boolean isRecursive() {
+ return recursive;
+ }
+
+ public void setRecursive(boolean recursive) {
+ this.recursive = recursive;
+ }
+
+ public FileMarshaler getMarshaler() {
+ return marshaler;
+ }
+
+ public void setMarshaler(FileMarshaler marshaler) {
+ this.marshaler = marshaler;
+ }
+
+ // Implementation methods
+ //-------------------------------------------------------------------------
+
+
+ protected void pollFileOrDirectory(String fileOrDirectory) throws Exception {
+ FTPClient ftp = borrowClient();
+ try {
+ logger.debug("Polling directory " + fileOrDirectory);
+ pollFileOrDirectory(ftp, fileOrDirectory, true);
+ }
+ finally {
+ returnClient(ftp);
+ }
+ }
+
+ protected void pollFileOrDirectory(FTPClient ftp, String fileOrDirectory, boolean processDir) throws Exception {
+ FTPFile[] files = ftp.listFiles(fileOrDirectory);
+ for (int i = 0; i < files.length; i++) {
+ String file = fileOrDirectory + "/" + files[i].getName();
+ if (!files[i].isDirectory()) {
+ if (getFilter() == null || getFilter().accept(new File(file))) {
+ pollFile(file); // process the file
+ }
+ } else if (processDir) {
+ logger.debug("Polling directory " + file);
+ pollFileOrDirectory(ftp, file, isRecursive());
+ } else {
+ logger.debug("Skipping directory " + file);
+ }
+ }
+ }
+
+ protected void pollFile(final String file) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Scheduling file " + file + " for processing");
+ }
+ getExecutor().execute(new Runnable() {
+ public void run() {
+ final Lock lock = lockManager.getLock(file);
+ if (lock.tryLock()) {
+ try {
+ processFileAndDelete(file);
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
+ });
+ }
+
+ protected void processFileAndDelete(String file) {
+ FTPClient ftp = null;
+ try {
+ ftp = borrowClient();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Processing file " + file);
+ }
+ processFile(ftp, file);
+ if (isDeleteFile()) {
+ if (!ftp.deleteFile(file)) {
+ throw new IOException("Could not delete file " + file);
+ }
+ }
+ }
+ catch (Exception e) {
+ logger.error("Failed to process file: " + file + ". Reason: " + e, e);
+ } finally {
+ returnClient(ftp);
+ }
+ }
+
+ protected void processFile(FTPClient ftp, String file) throws Exception {
+ InputStream in = ftp.retrieveFileStream(file);
+ ftp.completePendingCommand();
+ InOnly exchange = getExchangeFactory().createInOnlyExchange();
+ configureExchangeTarget(exchange);
+ NormalizedMessage message = exchange.createMessage();
+ exchange.setInMessage(message);
+ marshaler.readMessage(exchange, message, in, file);
+ sendSync(exchange);
+ in.close();
+ }
+
+ public String getLocationURI() {
+ return uri.toString();
+ }
+
+ public void process(MessageExchange exchange) throws Exception {
+ // Do nothing. In our case, this method should never be called
+ // as we only send synchronous InOnly exchange
+ }
+
+ protected FTPClientPool createClientPool() throws Exception {
+ FTPClientPool pool = new FTPClientPool();
+ pool.afterPropertiesSet();
+ return pool;
+ }
+
+ protected FTPClient borrowClient() throws JBIException {
+ try {
+ return (FTPClient) getClientPool().borrowClient();
+ }
+ catch (Exception e) {
+ throw new JBIException(e);
+ }
+ }
+
+ protected void returnClient(FTPClient client) {
+ if (client != null) {
+ try {
+ getClientPool().returnClient(client);
+ }
+ catch (Exception e) {
+ logger.error("Failed to return client to pool: " + e, e);
+ }
+ }
+ }
+
+}
Modified: incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/SocketClientPoolSupport.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/SocketClientPoolSupport.java?view=diff&rev=469204&r1=469203&r2=469204
==============================================================================
--- incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/SocketClientPoolSupport.java (original)
+++ incubator/servicemix/trunk/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/SocketClientPoolSupport.java Mon Oct 30 09:35:29 2006
@@ -44,7 +44,9 @@
public void afterPropertiesSet() throws Exception {
if (pool == null) {
- pool = new GenericObjectPool();
+ GenericObjectPool goPool = new GenericObjectPool();
+ goPool.setTestOnBorrow(true);
+ pool = goPool;
}
pool.setFactory(this);
}
Added: incubator/servicemix/trunk/servicemix-ftp/src/test/java/org/apache/servicemix/ftp/PollDirectoryTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-ftp/src/test/java/org/apache/servicemix/ftp/PollDirectoryTest.java?view=auto&rev=469204
==============================================================================
--- incubator/servicemix/trunk/servicemix-ftp/src/test/java/org/apache/servicemix/ftp/PollDirectoryTest.java (added)
+++ incubator/servicemix/trunk/servicemix-ftp/src/test/java/org/apache/servicemix/ftp/PollDirectoryTest.java Mon Oct 30 09:35:29 2006
@@ -0,0 +1,78 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.ftp;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.namespace.QName;
+
+import org.apache.servicemix.client.DefaultServiceMixClient;
+import org.apache.servicemix.components.util.DefaultFileMarshaler;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.tck.Receiver;
+import org.apache.servicemix.tck.SpringTestSupport;
+import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+
+public class PollDirectoryTest extends SpringTestSupport {
+ protected String directoryName = "target/pollDirectory";
+ protected String dynamicURI = "file:" + directoryName;
+
+ private int NUMBER = 10;
+
+ public void testSendToWriterSoItCanBePolled() throws Exception {
+ // now lets make a request on this endpoint
+ DefaultServiceMixClient client = new DefaultServiceMixClient(jbi);
+
+ // lets send a request to be written to a file
+ // which should then be polled
+ for (int i = 0; i < NUMBER; i++) {
+ InOnly me = client.createInOnlyExchange();
+ me.setService(new QName("urn:test", "service"));
+ NormalizedMessage message = me.getInMessage();
+ message.setProperty(DefaultFileMarshaler.FILE_NAME_PROPERTY, "test" + i + ".xml");
+ message.setContent(new StringSource("<hello>world</hello>"));
+ client.sendSync(me);
+ }
+
+ Receiver receiver = (Receiver) getBean("receiver");
+ receiver.getMessageList().assertMessagesReceived(NUMBER);
+ }
+
+ protected void assertExchangeWorked(MessageExchange me) throws Exception {
+ if (me.getStatus() == ExchangeStatus.ERROR) {
+ if (me.getError() != null) {
+ throw me.getError();
+ }
+ else {
+ fail("Received ERROR status");
+ }
+ }
+ else if (me.getFault() != null) {
+ fail("Received fault: " + new SourceTransformer().toString(me.getFault().getContent()));
+ }
+ }
+
+ protected AbstractXmlApplicationContext createBeanFactory() {
+ return new ClassPathXmlApplicationContext("spring-polling.xml");
+ }
+
+}
Added: incubator/servicemix/trunk/servicemix-ftp/src/test/resources/spring-polling.xml
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-ftp/src/test/resources/spring-polling.xml?view=auto&rev=469204
==============================================================================
--- incubator/servicemix/trunk/servicemix-ftp/src/test/resources/spring-polling.xml (added)
+++ incubator/servicemix/trunk/servicemix-ftp/src/test/resources/spring-polling.xml Mon Oct 30 09:35:29 2006
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<beans xmlns:sm="http://servicemix.apache.org/config/1.0"
+ xmlns:ftp="http://servicemix.apache.org/ftp/1.0"
+ xmlns:test="urn:test">
+
+ <sm:container id="jbi" embedded="true" createMBeanServer="false">
+
+ <sm:activationSpecs>
+
+ <sm:activationSpec>
+ <sm:component>
+ <ftp:component>
+ <ftp:endpoints>
+ <ftp:endpoint service="test:service"
+ endpoint="endpoint"
+ uri="ftp://servicemix:rocks@localhost/smx/test" />
+ </ftp:endpoints>
+
+ <ftp:pollingEndpoints>
+ <ftp:pollingEndpoint service="test:poller"
+ endpoint="endpoint"
+ targetService="test:receiver"
+ uri="ftp://servicemix:rocks@localhost/smx/test" />
+ </ftp:pollingEndpoints>
+ </ftp:component>
+ </sm:component>
+ </sm:activationSpec>
+
+ <sm:activationSpec id="receiver" service="test:receiver">
+ <sm:component>
+ <bean class="org.apache.servicemix.tck.ReceiverComponent" />
+ </sm:component>
+ </sm:activationSpec>
+
+ </sm:activationSpecs>
+ </sm:container>
+
+</beans>
Modified: incubator/servicemix/trunk/servicemix-ftp/src/test/resources/spring.xml
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-ftp/src/test/resources/spring.xml?view=diff&rev=469204&r1=469203&r2=469204
==============================================================================
--- incubator/servicemix/trunk/servicemix-ftp/src/test/resources/spring.xml (original)
+++ incubator/servicemix/trunk/servicemix-ftp/src/test/resources/spring.xml Mon Oct 30 09:35:29 2006
@@ -29,8 +29,10 @@
<sm:component>
<ftp:component>
<ftp:endpoints>
- <ftp:endpoint service="test:service" endpoint="endpoint" uri="ftp://servicemix:rocks@localhost/smx/test"/>
- </ftp:endpoints>
+ <ftp:endpoint service="test:service"
+ endpoint="endpoint"
+ uri="ftp://servicemix:rocks@localhost/smx/test" />
+ </ftp:endpoints>
</ftp:component>
</sm:component>
</sm:activationSpec>