You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@servicemix.apache.org by netflexity <mf...@netflexity.com> on 2007/06/20 19:18:19 UTC

SFTP binding component with jcraft

Hello, all!
I have a working SFTP binding component. Please let me know what I need to
change to commit it.

Here is the sample poller configuration. Sender is available as well:
<sftp:poller service="sample:processSftpFile"
targetService="sample:logPollableSshFile" endpoint="soap"
uri="sftp://user:pass@sshhostname/sftp_dir">
		<sftp:clientPool>
			<ref bean="sshConnectionPool"/>
		</sftp:clientPool>
	</sftp:poller>

/**
 * A polling endpoint which looks for a file or files in a directory
 * and sends the files into the JBI bus as messages, deleting the files
 * by default when they are processed.
 *
 * @org.apache.xbean.XBean element="poller"
 *
 * @version $Revision: 468487 $
 */
public class SftpPollerEndpoint extends PollingEndpoint implements
SftpEndpointType {

    private KeyedObjectPool sshClientPool;
    private JschPoolableKey sshServerInfo;
    private FileFilter filter;
    private boolean deleteFile = true;
    private boolean recursive = true;
    private FileMarshaler marshaler = new DefaultFileMarshaler();
    private LockManager lockManager;
    private URI uri;

    public SftpPollerEndpoint() {
    }

    /**
     * @param serviceUnit
     * @param service
     * @param endpoint
     */
    public SftpPollerEndpoint(ServiceUnit serviceUnit, QName service, String
endpoint) {
        super(serviceUnit, service, endpoint);
    }

    /**
     * @param component
     * @param endpoint
     */
    public SftpPollerEndpoint(DefaultComponent component, ServiceEndpoint
endpoint) {
        super(component, endpoint);
    }

    /* (non-Javadoc)
     * @see org.apache.servicemix.common.endpoints.PollingEndpoint#poll()
     */
    public void poll() throws Exception {
        pollFileOrDirectory(getWorkingPath());
    }

    /* (non-Javadoc)
     * @see
org.apache.servicemix.common.endpoints.ConsumerEndpoint#validate()
     */
    public void validate() throws DeploymentException {
        super.validate();
        if (uri == null && (getSshServerInfo() == null ||
getSshServerInfo().getHost() == null)) {
            throw new DeploymentException("Property uri or SSH Server
Information must be configured");
        }
        if (uri != null && getSshServerInfo() != null &&
getSshServerInfo().getHost() != null) {
            throw new DeploymentException("Properties uri and SSH Server
Information can not be configured at the same time");
        }
    }
    
    /* (non-Javadoc)
     * @see org.apache.servicemix.common.endpoints.PollingEndpoint#start()
     */
    public void start() throws Exception {
        if (lockManager == null) {
            lockManager = createLockManager();
        }
        
        if (uri != null) {
        	String host = uri.getHost();
            int port = uri.getPort();
            String password = "";
            String username = "";
            if (uri.getUserInfo() != null) {
                String[] infos = uri.getUserInfo().split(":");
                username = infos[0];
                if (infos.length > 1) {
                    password = infos[1];
                }
            }
            sshServerInfo = new JschPoolableKey(host, port, username,
password);
        } 
        else {
            String str = "sftp://" + sshServerInfo.getHost();
            if (sshServerInfo.getPort() >= 0) {
                str += ":" + sshServerInfo.getPort();
            }
            str += "/";
            uri = new URI(str);
        }
        super.start();
    }
    
    /**
     * @return
     */
    protected LockManager createLockManager() {
        return new SimpleLockManager();
    }

    /**
     * @return
     */
    private String getWorkingPath() {
        return (uri != null && uri.getPath() != null) ? uri.getPath() : ".";
    }

    // Properties
   
//-------------------------------------------------------------------------
    /**
     * @return the clientPool
     */
    public KeyedObjectPool getClientPool() {
        return sshClientPool;
    }

    /**
     * @param clientPool the clientPool to set
     */
    public void setClientPool(KeyedObjectPool clientPool) {
        this.sshClientPool = clientPool;
    }

    /**
	 * @return the sshServerInfo
	 */
	public JschPoolableKey getSshServerInfo() {
		return sshServerInfo;
	}

	/**
	 * @param sshServerInfo the sshServerInfo to set
	 */
	public void setSshServerInfo(JschPoolableKey sshServerInfo) {
		this.sshServerInfo = sshServerInfo;
	}

	/**
     * @return the uri
     */
    public URI getUri() {
        return uri;
    }

    /**
     * @param uri the uri to set
     */
    public void setUri(URI uri) {
        this.uri = uri;
    }

    public FileFilter getFilter() {
        return filter;
    }

    /**
     * Sets the optional filter to choose which files to process
     */
    public void setFilter(FileFilter filter) {
        this.filter = filter;
    }

    /**
     * Returns whether or not we should delete the file when its processed
     */
    public boolean isDeleteFile() {
        return deleteFile;
    }

    /**
     * @param deleteFile
     */
    public void setDeleteFile(boolean deleteFile) {
        this.deleteFile = deleteFile;
    }

    /**
     * @return
     */
    public boolean isRecursive() {
        return recursive;
    }

    /**
     * @param recursive
     */
    public void setRecursive(boolean recursive) {
        this.recursive = recursive;
    }

    /**
     * @return
     */
    public FileMarshaler getMarshaler() {
        return marshaler;
    }

    /**
     * @param marshaler
     */
    public void setMarshaler(FileMarshaler marshaler) {
        this.marshaler = marshaler;
    }

    // Implementation methods
   
//-------------------------------------------------------------------------


    /**
     * @param fileOrDirectory
     * @throws Exception
     */
    protected void pollFileOrDirectory(String fileOrDirectory) throws
Exception {
    	JschSessionWrapper sftp = borrowClient();
        try {
            logger.debug("Polling directory " + fileOrDirectory);
            Channel channel = sftp.getSession().openChannel("sftp");
            channel.connect();
            ChannelSftp sftpChannel = (ChannelSftp)channel;
            pollFileOrDirectory(sftpChannel, fileOrDirectory, true);
        }
        finally {
            returnClient(sftp);
        }
    }

    /**
     * @param sftpChannel
     * @param fileOrDirectory
     * @param processDir
     * @throws Exception
     */
    protected void pollFileOrDirectory(ChannelSftp sftpChannel, String
fileOrDirectory, boolean processDir) throws Exception {
        Vector files = sftpChannel.ls(fileOrDirectory);
        if(files != null && !files.isEmpty()){
  	      	for(int i=0; i < files.size(); i++){
  	      		LsEntry entry = (LsEntry)files.elementAt(i);
	  	      	SftpATTRS attrs = entry.getAttrs();
	  	      	String name = entry.getLongname();
	  	      	
	  	      	// Ignore "." and ".."
	  	      	if (name.equals(".") || name.equals("..")) {
	                continue;
	            }
	            
	  	      	String file = fileOrDirectory + "/" + name;
	            if (!attrs.isDir()) {	// This is a file, process it.
	                if (getFilter() == null || getFilter().accept(new
File(file))) {
	                    pollFile(file); // Process the file.
	                }
	            } 
	            else if (processDir) { 	// Only process directories if
processDir is true
	                logger.debug("Polling directory " + file);
	                pollFileOrDirectory(sftpChannel, file, isRecursive());
	            } 
	            else {
	                logger.debug("Skipping directory " + file);
	            }
  	      	}
  	    }
    }

    /**
     * @param file
     */
    protected void pollFile(final String file) {
        logger.debug("Scheduling file " + file + " for processing");
        getExecutor().execute(new Runnable() {
            public void run() {
                final Lock lock = lockManager.getLock(file);
                if (lock.tryLock()) {
                    boolean unlock = true;
                    try {
                        unlock = processFileAndDelete(file);
                    }
                    finally {
                        if (unlock) {
                            lock.unlock();
                        }
                    }
                }
            }
        });
    }

    /**
     * @param file
     * @return
     */
    protected boolean processFileAndDelete(String file) {
    	JschSessionWrapper sftp = null;
        boolean unlock = true;
        try {
        	sftp = borrowClient();
        	Channel channel = sftp.getSession().openChannel("sftp");
            channel.connect();
            ChannelSftp sftpChannel = (ChannelSftp)channel;
            
            // Process the file. If processing fails, an exception should be
thrown.
            logger.debug("Processing file " + file);
            processFile(sftpChannel, file);
            
            // Processing is succesfull, we should not unlock until the file
has been deleted.
            unlock = false;
            if (isDeleteFile()) {
                sftpChannel.rm(file);
                unlock = true;
            }
        }
        catch (Exception e) {
            logger.error("Failed to process file: " + file + ". Reason: " +
e, e);
        } 
        finally {
            returnClient(sftp);
        }
        return unlock;
    }

    /**
     * @param sftpChannel
     * @param file
     * @throws Exception
     */
    protected void processFile(ChannelSftp sftpChannel, String file) throws
Exception {
        InputStream in = sftpChannel.get(file);
        InOnly exchange = getExchangeFactory().createInOnlyExchange();
        configureExchangeTarget(exchange);
        NormalizedMessage message = exchange.createMessage();
        exchange.setInMessage(message);
        marshaler.readMessage(exchange, message, in, file);
        sendSync(exchange);
        in.close();
        if (exchange.getStatus() == ExchangeStatus.ERROR) {
            Exception e = exchange.getError();
            if (e == null) {
                e = new JBIException("Unkown error");
            }
            throw e;
        }
    }

    /* (non-Javadoc)
     * @see
org.apache.servicemix.common.endpoints.ConsumerEndpoint#getLocationURI()
     */
    public String getLocationURI() {
        return uri.toString();
    }

    /* (non-Javadoc)
     * @see
org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
     */
    public void process(MessageExchange exchange) throws Exception {
        // Do nothing. In our case, this method should never be called
        // as we only send synchronous InOnly exchange
    }
    
    /**
     * Retrieve ssh session from a pool.
     * 
     * @return
     * @throws JBIException
     */
    protected JschSessionWrapper borrowClient() throws JBIException {
        JschSessionWrapper session = null;
        try {
            session =
(JschSessionWrapper)getClientPool().borrowObject(sshServerInfo);
        }
        catch (Exception e) {
            throw new JBIException("Failed to retrieve Jsch session from
pool.", e);
        }
        
        if (session != null && session.getSession().isConnected()) {
            logger.info("Connected to jsch session at " +
session.getSession().getUserName() + "@" + session.getSession().getHost() +
":" + session.getSession().getPort());
        }
        return session;
    }

    /**
     * @param client
     */
    protected void returnClient(JschSessionWrapper client) {
        if (client != null) {
            try {
                getClientPool().returnObject(client, sshServerInfo);
            }
            catch (Exception e) {
                logger.error("Failed to return client to pool: " + e, e);
            }
        }
    }
}
-- 
View this message in context: http://www.nabble.com/SFTP-binding-component-with-jcraft-tf3953802s12049.html#a11218136
Sent from the ServiceMix - User mailing list archive at Nabble.com.


Re: SFTP binding component with jcraft

Posted by netflexity <mf...@netflexity.com>.
JIRA opened: https://issues.apache.org/activemq/browse/SM-974

Thank you.
-Max


netflexity wrote:
> 
> Thanks, will do.
> 
> 
> gnodet wrote:
>> 
>> This is message for servicemix-dev btw.
>> The code looks good.  Would you mind creating a JIRA and attach a patch
>> or
>> zip with the complete project ?
>> AFAIK, the license for jcraft is BSD, so there should be no problem on
>> this
>> side.
>> 
>> On 6/20/07, netflexity <mf...@netflexity.com> wrote:
>>>
>>>
>>> Hello, all!
>>> I have a working SFTP binding component. Please let me know what I need
>>> to
>>> change to commit it.
>>>
>>> Here is the sample poller configuration. Sender is available as well:
>>> <sftp:poller service="sample:processSftpFile"
>>> targetService="sample:logPollableSshFile" endpoint="soap"
>>> uri="sftp://user:pass@sshhostname/sftp_dir">
>>>                 <sftp:clientPool>
>>>                         <ref bean="sshConnectionPool"/>
>>>                 </sftp:clientPool>
>>>         </sftp:poller>
>>>
>>> /**
>>> * A polling endpoint which looks for a file or files in a directory
>>> * and sends the files into the JBI bus as messages, deleting the files
>>> * by default when they are processed.
>>> *
>>> * @org.apache.xbean.XBean element="poller"
>>> *
>>> * @version $Revision: 468487 $
>>> */
>>> public class SftpPollerEndpoint extends PollingEndpoint implements
>>> SftpEndpointType {
>>>
>>>     private KeyedObjectPool sshClientPool;
>>>     private JschPoolableKey sshServerInfo;
>>>     private FileFilter filter;
>>>     private boolean deleteFile = true;
>>>     private boolean recursive = true;
>>>     private FileMarshaler marshaler = new DefaultFileMarshaler();
>>>     private LockManager lockManager;
>>>     private URI uri;
>>>
>>>     public SftpPollerEndpoint() {
>>>     }
>>>
>>>     /**
>>>      * @param serviceUnit
>>>      * @param service
>>>      * @param endpoint
>>>      */
>>>     public SftpPollerEndpoint(ServiceUnit serviceUnit, QName service,
>>> String
>>> endpoint) {
>>>         super(serviceUnit, service, endpoint);
>>>     }
>>>
>>>     /**
>>>      * @param component
>>>      * @param endpoint
>>>      */
>>>     public SftpPollerEndpoint(DefaultComponent component,
>>> ServiceEndpoint
>>> endpoint) {
>>>         super(component, endpoint);
>>>     }
>>>
>>>     /* (non-Javadoc)
>>>      * @see
>>> org.apache.servicemix.common.endpoints.PollingEndpoint#poll()
>>>      */
>>>     public void poll() throws Exception {
>>>         pollFileOrDirectory(getWorkingPath());
>>>     }
>>>
>>>     /* (non-Javadoc)
>>>      * @see
>>> org.apache.servicemix.common.endpoints.ConsumerEndpoint#validate()
>>>      */
>>>     public void validate() throws DeploymentException {
>>>         super.validate();
>>>         if (uri == null && (getSshServerInfo() == null ||
>>> getSshServerInfo().getHost() == null)) {
>>>             throw new DeploymentException("Property uri or SSH Server
>>> Information must be configured");
>>>         }
>>>         if (uri != null && getSshServerInfo() != null &&
>>> getSshServerInfo().getHost() != null) {
>>>             throw new DeploymentException("Properties uri and SSH Server
>>> Information can not be configured at the same time");
>>>         }
>>>     }
>>>
>>>     /* (non-Javadoc)
>>>      * @see
>>> org.apache.servicemix.common.endpoints.PollingEndpoint#start()
>>>      */
>>>     public void start() throws Exception {
>>>         if (lockManager == null) {
>>>             lockManager = createLockManager();
>>>         }
>>>
>>>         if (uri != null) {
>>>                 String host = uri.getHost();
>>>             int port = uri.getPort();
>>>             String password = "";
>>>             String username = "";
>>>             if (uri.getUserInfo() != null) {
>>>                 String[] infos = uri.getUserInfo().split(":");
>>>                 username = infos[0];
>>>                 if (infos.length > 1) {
>>>                     password = infos[1];
>>>                 }
>>>             }
>>>             sshServerInfo = new JschPoolableKey(host, port, username,
>>> password);
>>>         }
>>>         else {
>>>             String str = "sftp://" + sshServerInfo.getHost();
>>>             if (sshServerInfo.getPort() >= 0) {
>>>                 str += ":" + sshServerInfo.getPort();
>>>             }
>>>             str += "/";
>>>             uri = new URI(str);
>>>         }
>>>         super.start();
>>>     }
>>>
>>>     /**
>>>      * @return
>>>      */
>>>     protected LockManager createLockManager() {
>>>         return new SimpleLockManager();
>>>     }
>>>
>>>     /**
>>>      * @return
>>>      */
>>>     private String getWorkingPath() {
>>>         return (uri != null && uri.getPath() != null) ? uri.getPath() :
>>> ".";
>>>     }
>>>
>>>     // Properties
>>>
>>>
>>> //-------------------------------------------------------------------------
>>>     /**
>>>      * @return the clientPool
>>>      */
>>>     public KeyedObjectPool getClientPool() {
>>>         return sshClientPool;
>>>     }
>>>
>>>     /**
>>>      * @param clientPool the clientPool to set
>>>      */
>>>     public void setClientPool(KeyedObjectPool clientPool) {
>>>         this.sshClientPool = clientPool;
>>>     }
>>>
>>>     /**
>>>          * @return the sshServerInfo
>>>          */
>>>         public JschPoolableKey getSshServerInfo() {
>>>                 return sshServerInfo;
>>>         }
>>>
>>>         /**
>>>          * @param sshServerInfo the sshServerInfo to set
>>>          */
>>>         public void setSshServerInfo(JschPoolableKey sshServerInfo) {
>>>                 this.sshServerInfo = sshServerInfo;
>>>         }
>>>
>>>         /**
>>>      * @return the uri
>>>      */
>>>     public URI getUri() {
>>>         return uri;
>>>     }
>>>
>>>     /**
>>>      * @param uri the uri to set
>>>      */
>>>     public void setUri(URI uri) {
>>>         this.uri = uri;
>>>     }
>>>
>>>     public FileFilter getFilter() {
>>>         return filter;
>>>     }
>>>
>>>     /**
>>>      * Sets the optional filter to choose which files to process
>>>      */
>>>     public void setFilter(FileFilter filter) {
>>>         this.filter = filter;
>>>     }
>>>
>>>     /**
>>>      * Returns whether or not we should delete the file when its
>>> processed
>>>      */
>>>     public boolean isDeleteFile() {
>>>         return deleteFile;
>>>     }
>>>
>>>     /**
>>>      * @param deleteFile
>>>      */
>>>     public void setDeleteFile(boolean deleteFile) {
>>>         this.deleteFile = deleteFile;
>>>     }
>>>
>>>     /**
>>>      * @return
>>>      */
>>>     public boolean isRecursive() {
>>>         return recursive;
>>>     }
>>>
>>>     /**
>>>      * @param recursive
>>>      */
>>>     public void setRecursive(boolean recursive) {
>>>         this.recursive = recursive;
>>>     }
>>>
>>>     /**
>>>      * @return
>>>      */
>>>     public FileMarshaler getMarshaler() {
>>>         return marshaler;
>>>     }
>>>
>>>     /**
>>>      * @param marshaler
>>>      */
>>>     public void setMarshaler(FileMarshaler marshaler) {
>>>         this.marshaler = marshaler;
>>>     }
>>>
>>>     // Implementation methods
>>>
>>>
>>> //-------------------------------------------------------------------------
>>>
>>>
>>>     /**
>>>      * @param fileOrDirectory
>>>      * @throws Exception
>>>      */
>>>     protected void pollFileOrDirectory(String fileOrDirectory) throws
>>> Exception {
>>>         JschSessionWrapper sftp = borrowClient();
>>>         try {
>>>             logger.debug("Polling directory " + fileOrDirectory);
>>>             Channel channel = sftp.getSession().openChannel("sftp");
>>>             channel.connect();
>>>             ChannelSftp sftpChannel = (ChannelSftp)channel;
>>>             pollFileOrDirectory(sftpChannel, fileOrDirectory, true);
>>>         }
>>>         finally {
>>>             returnClient(sftp);
>>>         }
>>>     }
>>>
>>>     /**
>>>      * @param sftpChannel
>>>      * @param fileOrDirectory
>>>      * @param processDir
>>>      * @throws Exception
>>>      */
>>>     protected void pollFileOrDirectory(ChannelSftp sftpChannel, String
>>> fileOrDirectory, boolean processDir) throws Exception {
>>>         Vector files = sftpChannel.ls(fileOrDirectory);
>>>         if(files != null && !files.isEmpty()){
>>>                 for(int i=0; i < files.size(); i++){
>>>                         LsEntry entry = (LsEntry)files.elementAt(i);
>>>                         SftpATTRS attrs = entry.getAttrs();
>>>                         String name = entry.getLongname();
>>>
>>>                         // Ignore "." and ".."
>>>                         if (name.equals(".") || name.equals("..")) {
>>>                         continue;
>>>                     }
>>>
>>>                         String file = fileOrDirectory + "/" + name;
>>>                     if (!attrs.isDir()) {       // This is a file,
>>> process
>>> it.
>>>                         if (getFilter() == null ||
>>> getFilter().accept(new
>>> File(file))) {
>>>                             pollFile(file); // Process the file.
>>>                         }
>>>                     }
>>>                     else if (processDir) {      // Only process
>>> directories if
>>> processDir is true
>>>                         logger.debug("Polling directory " + file);
>>>                         pollFileOrDirectory(sftpChannel, file,
>>> isRecursive());
>>>                     }
>>>                     else {
>>>                         logger.debug("Skipping directory " + file);
>>>                     }
>>>                 }
>>>             }
>>>     }
>>>
>>>     /**
>>>      * @param file
>>>      */
>>>     protected void pollFile(final String file) {
>>>         logger.debug("Scheduling file " + file + " for processing");
>>>         getExecutor().execute(new Runnable() {
>>>             public void run() {
>>>                 final Lock lock = lockManager.getLock(file);
>>>                 if (lock.tryLock()) {
>>>                     boolean unlock = true;
>>>                     try {
>>>                         unlock = processFileAndDelete(file);
>>>                     }
>>>                     finally {
>>>                         if (unlock) {
>>>                             lock.unlock();
>>>                         }
>>>                     }
>>>                 }
>>>             }
>>>         });
>>>     }
>>>
>>>     /**
>>>      * @param file
>>>      * @return
>>>      */
>>>     protected boolean processFileAndDelete(String file) {
>>>         JschSessionWrapper sftp = null;
>>>         boolean unlock = true;
>>>         try {
>>>                 sftp = borrowClient();
>>>                 Channel channel = sftp.getSession().openChannel("sftp");
>>>             channel.connect();
>>>             ChannelSftp sftpChannel = (ChannelSftp)channel;
>>>
>>>             // Process the file. If processing fails, an exception
>>> should
>>> be
>>> thrown.
>>>             logger.debug("Processing file " + file);
>>>             processFile(sftpChannel, file);
>>>
>>>             // Processing is succesfull, we should not unlock until the
>>> file
>>> has been deleted.
>>>             unlock = false;
>>>             if (isDeleteFile()) {
>>>                 sftpChannel.rm(file);
>>>                 unlock = true;
>>>             }
>>>         }
>>>         catch (Exception e) {
>>>             logger.error("Failed to process file: " + file + ". Reason:
>>> "
>>> +
>>> e, e);
>>>         }
>>>         finally {
>>>             returnClient(sftp);
>>>         }
>>>         return unlock;
>>>     }
>>>
>>>     /**
>>>      * @param sftpChannel
>>>      * @param file
>>>      * @throws Exception
>>>      */
>>>     protected void processFile(ChannelSftp sftpChannel, String file)
>>> throws
>>> Exception {
>>>         InputStream in = sftpChannel.get(file);
>>>         InOnly exchange = getExchangeFactory().createInOnlyExchange();
>>>         configureExchangeTarget(exchange);
>>>         NormalizedMessage message = exchange.createMessage();
>>>         exchange.setInMessage(message);
>>>         marshaler.readMessage(exchange, message, in, file);
>>>         sendSync(exchange);
>>>         in.close();
>>>         if (exchange.getStatus() == ExchangeStatus.ERROR) {
>>>             Exception e = exchange.getError();
>>>             if (e == null) {
>>>                 e = new JBIException("Unkown error");
>>>             }
>>>             throw e;
>>>         }
>>>     }
>>>
>>>     /* (non-Javadoc)
>>>      * @see
>>> org.apache.servicemix.common.endpoints.ConsumerEndpoint#getLocationURI()
>>>      */
>>>     public String getLocationURI() {
>>>         return uri.toString();
>>>     }
>>>
>>>     /* (non-Javadoc)
>>>      * @see
>>> org.apache.servicemix.common.ExchangeProcessor#process(
>>> javax.jbi.messaging.MessageExchange)
>>>      */
>>>     public void process(MessageExchange exchange) throws Exception {
>>>         // Do nothing. In our case, this method should never be called
>>>         // as we only send synchronous InOnly exchange
>>>     }
>>>
>>>     /**
>>>      * Retrieve ssh session from a pool.
>>>      *
>>>      * @return
>>>      * @throws JBIException
>>>      */
>>>     protected JschSessionWrapper borrowClient() throws JBIException {
>>>         JschSessionWrapper session = null;
>>>         try {
>>>             session =
>>> (JschSessionWrapper)getClientPool().borrowObject(sshServerInfo);
>>>         }
>>>         catch (Exception e) {
>>>             throw new JBIException("Failed to retrieve Jsch session from
>>> pool.", e);
>>>         }
>>>
>>>         if (session != null && session.getSession().isConnected()) {
>>>             logger.info("Connected to jsch session at " +
>>> session.getSession().getUserName() + "@" +
>>> session.getSession().getHost()
>>> +
>>> ":" + session.getSession().getPort());
>>>         }
>>>         return session;
>>>     }
>>>
>>>     /**
>>>      * @param client
>>>      */
>>>     protected void returnClient(JschSessionWrapper client) {
>>>         if (client != null) {
>>>             try {
>>>                 getClientPool().returnObject(client, sshServerInfo);
>>>             }
>>>             catch (Exception e) {
>>>                 logger.error("Failed to return client to pool: " + e,
>>> e);
>>>             }
>>>         }
>>>     }
>>> }
>>> --
>>> View this message in context:
>>> http://www.nabble.com/SFTP-binding-component-with-jcraft-tf3953802s12049.html#a11218136
>>> Sent from the ServiceMix - User mailing list archive at Nabble.com.
>>>
>>>
>> 
>> 
>> -- 
>> Cheers,
>> Guillaume Nodet
>> ------------------------
>> Principal Engineer, IONA
>> Blog: http://gnodet.blogspot.com/
>> 
>> 
> 
> 

-- 
View this message in context: http://www.nabble.com/SFTP-binding-component-with-jcraft-tf3953802s12049.html#a11541824
Sent from the ServiceMix - User mailing list archive at Nabble.com.


Re: SFTP binding component with jcraft

Posted by netflexity <mf...@netflexity.com>.
Thanks, will do.


gnodet wrote:
> 
> This is message for servicemix-dev btw.
> The code looks good.  Would you mind creating a JIRA and attach a patch or
> zip with the complete project ?
> AFAIK, the license for jcraft is BSD, so there should be no problem on
> this
> side.
> 
> On 6/20/07, netflexity <mf...@netflexity.com> wrote:
>>
>>
>> Hello, all!
>> I have a working SFTP binding component. Please let me know what I need
>> to
>> change to commit it.
>>
>> Here is the sample poller configuration. Sender is available as well:
>> <sftp:poller service="sample:processSftpFile"
>> targetService="sample:logPollableSshFile" endpoint="soap"
>> uri="sftp://user:pass@sshhostname/sftp_dir">
>>                 <sftp:clientPool>
>>                         <ref bean="sshConnectionPool"/>
>>                 </sftp:clientPool>
>>         </sftp:poller>
>>
>> /**
>> * A polling endpoint which looks for a file or files in a directory
>> * and sends the files into the JBI bus as messages, deleting the files
>> * by default when they are processed.
>> *
>> * @org.apache.xbean.XBean element="poller"
>> *
>> * @version $Revision: 468487 $
>> */
>> public class SftpPollerEndpoint extends PollingEndpoint implements
>> SftpEndpointType {
>>
>>     private KeyedObjectPool sshClientPool;
>>     private JschPoolableKey sshServerInfo;
>>     private FileFilter filter;
>>     private boolean deleteFile = true;
>>     private boolean recursive = true;
>>     private FileMarshaler marshaler = new DefaultFileMarshaler();
>>     private LockManager lockManager;
>>     private URI uri;
>>
>>     public SftpPollerEndpoint() {
>>     }
>>
>>     /**
>>      * @param serviceUnit
>>      * @param service
>>      * @param endpoint
>>      */
>>     public SftpPollerEndpoint(ServiceUnit serviceUnit, QName service,
>> String
>> endpoint) {
>>         super(serviceUnit, service, endpoint);
>>     }
>>
>>     /**
>>      * @param component
>>      * @param endpoint
>>      */
>>     public SftpPollerEndpoint(DefaultComponent component, ServiceEndpoint
>> endpoint) {
>>         super(component, endpoint);
>>     }
>>
>>     /* (non-Javadoc)
>>      * @see org.apache.servicemix.common.endpoints.PollingEndpoint#poll()
>>      */
>>     public void poll() throws Exception {
>>         pollFileOrDirectory(getWorkingPath());
>>     }
>>
>>     /* (non-Javadoc)
>>      * @see
>> org.apache.servicemix.common.endpoints.ConsumerEndpoint#validate()
>>      */
>>     public void validate() throws DeploymentException {
>>         super.validate();
>>         if (uri == null && (getSshServerInfo() == null ||
>> getSshServerInfo().getHost() == null)) {
>>             throw new DeploymentException("Property uri or SSH Server
>> Information must be configured");
>>         }
>>         if (uri != null && getSshServerInfo() != null &&
>> getSshServerInfo().getHost() != null) {
>>             throw new DeploymentException("Properties uri and SSH Server
>> Information can not be configured at the same time");
>>         }
>>     }
>>
>>     /* (non-Javadoc)
>>      * @see
>> org.apache.servicemix.common.endpoints.PollingEndpoint#start()
>>      */
>>     public void start() throws Exception {
>>         if (lockManager == null) {
>>             lockManager = createLockManager();
>>         }
>>
>>         if (uri != null) {
>>                 String host = uri.getHost();
>>             int port = uri.getPort();
>>             String password = "";
>>             String username = "";
>>             if (uri.getUserInfo() != null) {
>>                 String[] infos = uri.getUserInfo().split(":");
>>                 username = infos[0];
>>                 if (infos.length > 1) {
>>                     password = infos[1];
>>                 }
>>             }
>>             sshServerInfo = new JschPoolableKey(host, port, username,
>> password);
>>         }
>>         else {
>>             String str = "sftp://" + sshServerInfo.getHost();
>>             if (sshServerInfo.getPort() >= 0) {
>>                 str += ":" + sshServerInfo.getPort();
>>             }
>>             str += "/";
>>             uri = new URI(str);
>>         }
>>         super.start();
>>     }
>>
>>     /**
>>      * @return
>>      */
>>     protected LockManager createLockManager() {
>>         return new SimpleLockManager();
>>     }
>>
>>     /**
>>      * @return
>>      */
>>     private String getWorkingPath() {
>>         return (uri != null && uri.getPath() != null) ? uri.getPath() :
>> ".";
>>     }
>>
>>     // Properties
>>
>>
>> //-------------------------------------------------------------------------
>>     /**
>>      * @return the clientPool
>>      */
>>     public KeyedObjectPool getClientPool() {
>>         return sshClientPool;
>>     }
>>
>>     /**
>>      * @param clientPool the clientPool to set
>>      */
>>     public void setClientPool(KeyedObjectPool clientPool) {
>>         this.sshClientPool = clientPool;
>>     }
>>
>>     /**
>>          * @return the sshServerInfo
>>          */
>>         public JschPoolableKey getSshServerInfo() {
>>                 return sshServerInfo;
>>         }
>>
>>         /**
>>          * @param sshServerInfo the sshServerInfo to set
>>          */
>>         public void setSshServerInfo(JschPoolableKey sshServerInfo) {
>>                 this.sshServerInfo = sshServerInfo;
>>         }
>>
>>         /**
>>      * @return the uri
>>      */
>>     public URI getUri() {
>>         return uri;
>>     }
>>
>>     /**
>>      * @param uri the uri to set
>>      */
>>     public void setUri(URI uri) {
>>         this.uri = uri;
>>     }
>>
>>     public FileFilter getFilter() {
>>         return filter;
>>     }
>>
>>     /**
>>      * Sets the optional filter to choose which files to process
>>      */
>>     public void setFilter(FileFilter filter) {
>>         this.filter = filter;
>>     }
>>
>>     /**
>>      * Returns whether or not we should delete the file when its
>> processed
>>      */
>>     public boolean isDeleteFile() {
>>         return deleteFile;
>>     }
>>
>>     /**
>>      * @param deleteFile
>>      */
>>     public void setDeleteFile(boolean deleteFile) {
>>         this.deleteFile = deleteFile;
>>     }
>>
>>     /**
>>      * @return
>>      */
>>     public boolean isRecursive() {
>>         return recursive;
>>     }
>>
>>     /**
>>      * @param recursive
>>      */
>>     public void setRecursive(boolean recursive) {
>>         this.recursive = recursive;
>>     }
>>
>>     /**
>>      * @return
>>      */
>>     public FileMarshaler getMarshaler() {
>>         return marshaler;
>>     }
>>
>>     /**
>>      * @param marshaler
>>      */
>>     public void setMarshaler(FileMarshaler marshaler) {
>>         this.marshaler = marshaler;
>>     }
>>
>>     // Implementation methods
>>
>>
>> //-------------------------------------------------------------------------
>>
>>
>>     /**
>>      * @param fileOrDirectory
>>      * @throws Exception
>>      */
>>     protected void pollFileOrDirectory(String fileOrDirectory) throws
>> Exception {
>>         JschSessionWrapper sftp = borrowClient();
>>         try {
>>             logger.debug("Polling directory " + fileOrDirectory);
>>             Channel channel = sftp.getSession().openChannel("sftp");
>>             channel.connect();
>>             ChannelSftp sftpChannel = (ChannelSftp)channel;
>>             pollFileOrDirectory(sftpChannel, fileOrDirectory, true);
>>         }
>>         finally {
>>             returnClient(sftp);
>>         }
>>     }
>>
>>     /**
>>      * @param sftpChannel
>>      * @param fileOrDirectory
>>      * @param processDir
>>      * @throws Exception
>>      */
>>     protected void pollFileOrDirectory(ChannelSftp sftpChannel, String
>> fileOrDirectory, boolean processDir) throws Exception {
>>         Vector files = sftpChannel.ls(fileOrDirectory);
>>         if(files != null && !files.isEmpty()){
>>                 for(int i=0; i < files.size(); i++){
>>                         LsEntry entry = (LsEntry)files.elementAt(i);
>>                         SftpATTRS attrs = entry.getAttrs();
>>                         String name = entry.getLongname();
>>
>>                         // Ignore "." and ".."
>>                         if (name.equals(".") || name.equals("..")) {
>>                         continue;
>>                     }
>>
>>                         String file = fileOrDirectory + "/" + name;
>>                     if (!attrs.isDir()) {       // This is a file,
>> process
>> it.
>>                         if (getFilter() == null || getFilter().accept(new
>> File(file))) {
>>                             pollFile(file); // Process the file.
>>                         }
>>                     }
>>                     else if (processDir) {      // Only process
>> directories if
>> processDir is true
>>                         logger.debug("Polling directory " + file);
>>                         pollFileOrDirectory(sftpChannel, file,
>> isRecursive());
>>                     }
>>                     else {
>>                         logger.debug("Skipping directory " + file);
>>                     }
>>                 }
>>             }
>>     }
>>
>>     /**
>>      * @param file
>>      */
>>     protected void pollFile(final String file) {
>>         logger.debug("Scheduling file " + file + " for processing");
>>         getExecutor().execute(new Runnable() {
>>             public void run() {
>>                 final Lock lock = lockManager.getLock(file);
>>                 if (lock.tryLock()) {
>>                     boolean unlock = true;
>>                     try {
>>                         unlock = processFileAndDelete(file);
>>                     }
>>                     finally {
>>                         if (unlock) {
>>                             lock.unlock();
>>                         }
>>                     }
>>                 }
>>             }
>>         });
>>     }
>>
>>     /**
>>      * @param file
>>      * @return
>>      */
>>     protected boolean processFileAndDelete(String file) {
>>         JschSessionWrapper sftp = null;
>>         boolean unlock = true;
>>         try {
>>                 sftp = borrowClient();
>>                 Channel channel = sftp.getSession().openChannel("sftp");
>>             channel.connect();
>>             ChannelSftp sftpChannel = (ChannelSftp)channel;
>>
>>             // Process the file. If processing fails, an exception should
>> be
>> thrown.
>>             logger.debug("Processing file " + file);
>>             processFile(sftpChannel, file);
>>
>>             // Processing is succesfull, we should not unlock until the
>> file
>> has been deleted.
>>             unlock = false;
>>             if (isDeleteFile()) {
>>                 sftpChannel.rm(file);
>>                 unlock = true;
>>             }
>>         }
>>         catch (Exception e) {
>>             logger.error("Failed to process file: " + file + ". Reason: "
>> +
>> e, e);
>>         }
>>         finally {
>>             returnClient(sftp);
>>         }
>>         return unlock;
>>     }
>>
>>     /**
>>      * @param sftpChannel
>>      * @param file
>>      * @throws Exception
>>      */
>>     protected void processFile(ChannelSftp sftpChannel, String file)
>> throws
>> Exception {
>>         InputStream in = sftpChannel.get(file);
>>         InOnly exchange = getExchangeFactory().createInOnlyExchange();
>>         configureExchangeTarget(exchange);
>>         NormalizedMessage message = exchange.createMessage();
>>         exchange.setInMessage(message);
>>         marshaler.readMessage(exchange, message, in, file);
>>         sendSync(exchange);
>>         in.close();
>>         if (exchange.getStatus() == ExchangeStatus.ERROR) {
>>             Exception e = exchange.getError();
>>             if (e == null) {
>>                 e = new JBIException("Unkown error");
>>             }
>>             throw e;
>>         }
>>     }
>>
>>     /* (non-Javadoc)
>>      * @see
>> org.apache.servicemix.common.endpoints.ConsumerEndpoint#getLocationURI()
>>      */
>>     public String getLocationURI() {
>>         return uri.toString();
>>     }
>>
>>     /* (non-Javadoc)
>>      * @see
>> org.apache.servicemix.common.ExchangeProcessor#process(
>> javax.jbi.messaging.MessageExchange)
>>      */
>>     public void process(MessageExchange exchange) throws Exception {
>>         // Do nothing. In our case, this method should never be called
>>         // as we only send synchronous InOnly exchange
>>     }
>>
>>     /**
>>      * Retrieve ssh session from a pool.
>>      *
>>      * @return
>>      * @throws JBIException
>>      */
>>     protected JschSessionWrapper borrowClient() throws JBIException {
>>         JschSessionWrapper session = null;
>>         try {
>>             session =
>> (JschSessionWrapper)getClientPool().borrowObject(sshServerInfo);
>>         }
>>         catch (Exception e) {
>>             throw new JBIException("Failed to retrieve Jsch session from
>> pool.", e);
>>         }
>>
>>         if (session != null && session.getSession().isConnected()) {
>>             logger.info("Connected to jsch session at " +
>> session.getSession().getUserName() + "@" + session.getSession().getHost()
>> +
>> ":" + session.getSession().getPort());
>>         }
>>         return session;
>>     }
>>
>>     /**
>>      * @param client
>>      */
>>     protected void returnClient(JschSessionWrapper client) {
>>         if (client != null) {
>>             try {
>>                 getClientPool().returnObject(client, sshServerInfo);
>>             }
>>             catch (Exception e) {
>>                 logger.error("Failed to return client to pool: " + e, e);
>>             }
>>         }
>>     }
>> }
>> --
>> View this message in context:
>> http://www.nabble.com/SFTP-binding-component-with-jcraft-tf3953802s12049.html#a11218136
>> Sent from the ServiceMix - User mailing list archive at Nabble.com.
>>
>>
> 
> 
> -- 
> Cheers,
> Guillaume Nodet
> ------------------------
> Principal Engineer, IONA
> Blog: http://gnodet.blogspot.com/
> 
> 

-- 
View this message in context: http://www.nabble.com/SFTP-binding-component-with-jcraft-tf3953802s12049.html#a11220100
Sent from the ServiceMix - User mailing list archive at Nabble.com.


Re: SFTP binding component with jcraft

Posted by Guillaume Nodet <gn...@gmail.com>.
This is message for servicemix-dev btw.
The code looks good.  Would you mind creating a JIRA and attach a patch or
zip with the complete project ?
AFAIK, the license for jcraft is BSD, so there should be no problem on this
side.

On 6/20/07, netflexity <mf...@netflexity.com> wrote:
>
>
> Hello, all!
> I have a working SFTP binding component. Please let me know what I need to
> change to commit it.
>
> Here is the sample poller configuration. Sender is available as well:
> <sftp:poller service="sample:processSftpFile"
> targetService="sample:logPollableSshFile" endpoint="soap"
> uri="sftp://user:pass@sshhostname/sftp_dir">
>                 <sftp:clientPool>
>                         <ref bean="sshConnectionPool"/>
>                 </sftp:clientPool>
>         </sftp:poller>
>
> /**
> * A polling endpoint which looks for a file or files in a directory
> * and sends the files into the JBI bus as messages, deleting the files
> * by default when they are processed.
> *
> * @org.apache.xbean.XBean element="poller"
> *
> * @version $Revision: 468487 $
> */
> public class SftpPollerEndpoint extends PollingEndpoint implements
> SftpEndpointType {
>
>     private KeyedObjectPool sshClientPool;
>     private JschPoolableKey sshServerInfo;
>     private FileFilter filter;
>     private boolean deleteFile = true;
>     private boolean recursive = true;
>     private FileMarshaler marshaler = new DefaultFileMarshaler();
>     private LockManager lockManager;
>     private URI uri;
>
>     public SftpPollerEndpoint() {
>     }
>
>     /**
>      * @param serviceUnit
>      * @param service
>      * @param endpoint
>      */
>     public SftpPollerEndpoint(ServiceUnit serviceUnit, QName service,
> String
> endpoint) {
>         super(serviceUnit, service, endpoint);
>     }
>
>     /**
>      * @param component
>      * @param endpoint
>      */
>     public SftpPollerEndpoint(DefaultComponent component, ServiceEndpoint
> endpoint) {
>         super(component, endpoint);
>     }
>
>     /* (non-Javadoc)
>      * @see org.apache.servicemix.common.endpoints.PollingEndpoint#poll()
>      */
>     public void poll() throws Exception {
>         pollFileOrDirectory(getWorkingPath());
>     }
>
>     /* (non-Javadoc)
>      * @see
> org.apache.servicemix.common.endpoints.ConsumerEndpoint#validate()
>      */
>     public void validate() throws DeploymentException {
>         super.validate();
>         if (uri == null && (getSshServerInfo() == null ||
> getSshServerInfo().getHost() == null)) {
>             throw new DeploymentException("Property uri or SSH Server
> Information must be configured");
>         }
>         if (uri != null && getSshServerInfo() != null &&
> getSshServerInfo().getHost() != null) {
>             throw new DeploymentException("Properties uri and SSH Server
> Information can not be configured at the same time");
>         }
>     }
>
>     /* (non-Javadoc)
>      * @see org.apache.servicemix.common.endpoints.PollingEndpoint#start()
>      */
>     public void start() throws Exception {
>         if (lockManager == null) {
>             lockManager = createLockManager();
>         }
>
>         if (uri != null) {
>                 String host = uri.getHost();
>             int port = uri.getPort();
>             String password = "";
>             String username = "";
>             if (uri.getUserInfo() != null) {
>                 String[] infos = uri.getUserInfo().split(":");
>                 username = infos[0];
>                 if (infos.length > 1) {
>                     password = infos[1];
>                 }
>             }
>             sshServerInfo = new JschPoolableKey(host, port, username,
> password);
>         }
>         else {
>             String str = "sftp://" + sshServerInfo.getHost();
>             if (sshServerInfo.getPort() >= 0) {
>                 str += ":" + sshServerInfo.getPort();
>             }
>             str += "/";
>             uri = new URI(str);
>         }
>         super.start();
>     }
>
>     /**
>      * @return
>      */
>     protected LockManager createLockManager() {
>         return new SimpleLockManager();
>     }
>
>     /**
>      * @return
>      */
>     private String getWorkingPath() {
>         return (uri != null && uri.getPath() != null) ? uri.getPath() :
> ".";
>     }
>
>     // Properties
>
>
> //-------------------------------------------------------------------------
>     /**
>      * @return the clientPool
>      */
>     public KeyedObjectPool getClientPool() {
>         return sshClientPool;
>     }
>
>     /**
>      * @param clientPool the clientPool to set
>      */
>     public void setClientPool(KeyedObjectPool clientPool) {
>         this.sshClientPool = clientPool;
>     }
>
>     /**
>          * @return the sshServerInfo
>          */
>         public JschPoolableKey getSshServerInfo() {
>                 return sshServerInfo;
>         }
>
>         /**
>          * @param sshServerInfo the sshServerInfo to set
>          */
>         public void setSshServerInfo(JschPoolableKey sshServerInfo) {
>                 this.sshServerInfo = sshServerInfo;
>         }
>
>         /**
>      * @return the uri
>      */
>     public URI getUri() {
>         return uri;
>     }
>
>     /**
>      * @param uri the uri to set
>      */
>     public void setUri(URI uri) {
>         this.uri = uri;
>     }
>
>     public FileFilter getFilter() {
>         return filter;
>     }
>
>     /**
>      * Sets the optional filter to choose which files to process
>      */
>     public void setFilter(FileFilter filter) {
>         this.filter = filter;
>     }
>
>     /**
>      * Returns whether or not we should delete the file when its processed
>      */
>     public boolean isDeleteFile() {
>         return deleteFile;
>     }
>
>     /**
>      * @param deleteFile
>      */
>     public void setDeleteFile(boolean deleteFile) {
>         this.deleteFile = deleteFile;
>     }
>
>     /**
>      * @return
>      */
>     public boolean isRecursive() {
>         return recursive;
>     }
>
>     /**
>      * @param recursive
>      */
>     public void setRecursive(boolean recursive) {
>         this.recursive = recursive;
>     }
>
>     /**
>      * @return
>      */
>     public FileMarshaler getMarshaler() {
>         return marshaler;
>     }
>
>     /**
>      * @param marshaler
>      */
>     public void setMarshaler(FileMarshaler marshaler) {
>         this.marshaler = marshaler;
>     }
>
>     // Implementation methods
>
>
> //-------------------------------------------------------------------------
>
>
>     /**
>      * @param fileOrDirectory
>      * @throws Exception
>      */
>     protected void pollFileOrDirectory(String fileOrDirectory) throws
> Exception {
>         JschSessionWrapper sftp = borrowClient();
>         try {
>             logger.debug("Polling directory " + fileOrDirectory);
>             Channel channel = sftp.getSession().openChannel("sftp");
>             channel.connect();
>             ChannelSftp sftpChannel = (ChannelSftp)channel;
>             pollFileOrDirectory(sftpChannel, fileOrDirectory, true);
>         }
>         finally {
>             returnClient(sftp);
>         }
>     }
>
>     /**
>      * @param sftpChannel
>      * @param fileOrDirectory
>      * @param processDir
>      * @throws Exception
>      */
>     protected void pollFileOrDirectory(ChannelSftp sftpChannel, String
> fileOrDirectory, boolean processDir) throws Exception {
>         Vector files = sftpChannel.ls(fileOrDirectory);
>         if(files != null && !files.isEmpty()){
>                 for(int i=0; i < files.size(); i++){
>                         LsEntry entry = (LsEntry)files.elementAt(i);
>                         SftpATTRS attrs = entry.getAttrs();
>                         String name = entry.getLongname();
>
>                         // Ignore "." and ".."
>                         if (name.equals(".") || name.equals("..")) {
>                         continue;
>                     }
>
>                         String file = fileOrDirectory + "/" + name;
>                     if (!attrs.isDir()) {       // This is a file, process
> it.
>                         if (getFilter() == null || getFilter().accept(new
> File(file))) {
>                             pollFile(file); // Process the file.
>                         }
>                     }
>                     else if (processDir) {      // Only process
> directories if
> processDir is true
>                         logger.debug("Polling directory " + file);
>                         pollFileOrDirectory(sftpChannel, file,
> isRecursive());
>                     }
>                     else {
>                         logger.debug("Skipping directory " + file);
>                     }
>                 }
>             }
>     }
>
>     /**
>      * @param file
>      */
>     protected void pollFile(final String file) {
>         logger.debug("Scheduling file " + file + " for processing");
>         getExecutor().execute(new Runnable() {
>             public void run() {
>                 final Lock lock = lockManager.getLock(file);
>                 if (lock.tryLock()) {
>                     boolean unlock = true;
>                     try {
>                         unlock = processFileAndDelete(file);
>                     }
>                     finally {
>                         if (unlock) {
>                             lock.unlock();
>                         }
>                     }
>                 }
>             }
>         });
>     }
>
>     /**
>      * @param file
>      * @return
>      */
>     protected boolean processFileAndDelete(String file) {
>         JschSessionWrapper sftp = null;
>         boolean unlock = true;
>         try {
>                 sftp = borrowClient();
>                 Channel channel = sftp.getSession().openChannel("sftp");
>             channel.connect();
>             ChannelSftp sftpChannel = (ChannelSftp)channel;
>
>             // Process the file. If processing fails, an exception should
> be
> thrown.
>             logger.debug("Processing file " + file);
>             processFile(sftpChannel, file);
>
>             // Processing is succesfull, we should not unlock until the
> file
> has been deleted.
>             unlock = false;
>             if (isDeleteFile()) {
>                 sftpChannel.rm(file);
>                 unlock = true;
>             }
>         }
>         catch (Exception e) {
>             logger.error("Failed to process file: " + file + ". Reason: "
> +
> e, e);
>         }
>         finally {
>             returnClient(sftp);
>         }
>         return unlock;
>     }
>
>     /**
>      * @param sftpChannel
>      * @param file
>      * @throws Exception
>      */
>     protected void processFile(ChannelSftp sftpChannel, String file)
> throws
> Exception {
>         InputStream in = sftpChannel.get(file);
>         InOnly exchange = getExchangeFactory().createInOnlyExchange();
>         configureExchangeTarget(exchange);
>         NormalizedMessage message = exchange.createMessage();
>         exchange.setInMessage(message);
>         marshaler.readMessage(exchange, message, in, file);
>         sendSync(exchange);
>         in.close();
>         if (exchange.getStatus() == ExchangeStatus.ERROR) {
>             Exception e = exchange.getError();
>             if (e == null) {
>                 e = new JBIException("Unkown error");
>             }
>             throw e;
>         }
>     }
>
>     /* (non-Javadoc)
>      * @see
> org.apache.servicemix.common.endpoints.ConsumerEndpoint#getLocationURI()
>      */
>     public String getLocationURI() {
>         return uri.toString();
>     }
>
>     /* (non-Javadoc)
>      * @see
> org.apache.servicemix.common.ExchangeProcessor#process(
> javax.jbi.messaging.MessageExchange)
>      */
>     public void process(MessageExchange exchange) throws Exception {
>         // Do nothing. In our case, this method should never be called
>         // as we only send synchronous InOnly exchange
>     }
>
>     /**
>      * Retrieve ssh session from a pool.
>      *
>      * @return
>      * @throws JBIException
>      */
>     protected JschSessionWrapper borrowClient() throws JBIException {
>         JschSessionWrapper session = null;
>         try {
>             session =
> (JschSessionWrapper)getClientPool().borrowObject(sshServerInfo);
>         }
>         catch (Exception e) {
>             throw new JBIException("Failed to retrieve Jsch session from
> pool.", e);
>         }
>
>         if (session != null && session.getSession().isConnected()) {
>             logger.info("Connected to jsch session at " +
> session.getSession().getUserName() + "@" + session.getSession().getHost()
> +
> ":" + session.getSession().getPort());
>         }
>         return session;
>     }
>
>     /**
>      * @param client
>      */
>     protected void returnClient(JschSessionWrapper client) {
>         if (client != null) {
>             try {
>                 getClientPool().returnObject(client, sshServerInfo);
>             }
>             catch (Exception e) {
>                 logger.error("Failed to return client to pool: " + e, e);
>             }
>         }
>     }
> }
> --
> View this message in context:
> http://www.nabble.com/SFTP-binding-component-with-jcraft-tf3953802s12049.html#a11218136
> Sent from the ServiceMix - User mailing list archive at Nabble.com.
>
>


-- 
Cheers,
Guillaume Nodet
------------------------
Principal Engineer, IONA
Blog: http://gnodet.blogspot.com/