You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@servicemix.apache.org by netflexity <mf...@netflexity.com> on 2007/06/20 19:18:19 UTC
SFTP binding component with jcraft
Hello, all!
I have a working SFTP binding component. Please let me know what I need to
change to commit it.
Here is the sample poller configuration. Sender is available as well:
<sftp:poller service="sample:processSftpFile"
targetService="sample:logPollableSshFile" endpoint="soap"
uri="sftp://user:pass@sshhostname/sftp_dir">
<sftp:clientPool>
<ref bean="sshConnectionPool"/>
</sftp:clientPool>
</sftp:poller>
/**
* A polling endpoint which looks for a file or files in a directory
* and sends the files into the JBI bus as messages, deleting the files
* by default when they are processed.
*
* @org.apache.xbean.XBean element="poller"
*
* @version $Revision: 468487 $
*/
public class SftpPollerEndpoint extends PollingEndpoint implements
SftpEndpointType {
private KeyedObjectPool sshClientPool;
private JschPoolableKey sshServerInfo;
private FileFilter filter;
private boolean deleteFile = true;
private boolean recursive = true;
private FileMarshaler marshaler = new DefaultFileMarshaler();
private LockManager lockManager;
private URI uri;
public SftpPollerEndpoint() {
}
/**
* @param serviceUnit
* @param service
* @param endpoint
*/
public SftpPollerEndpoint(ServiceUnit serviceUnit, QName service, String
endpoint) {
super(serviceUnit, service, endpoint);
}
/**
* @param component
* @param endpoint
*/
public SftpPollerEndpoint(DefaultComponent component, ServiceEndpoint
endpoint) {
super(component, endpoint);
}
/* (non-Javadoc)
* @see org.apache.servicemix.common.endpoints.PollingEndpoint#poll()
*/
public void poll() throws Exception {
pollFileOrDirectory(getWorkingPath());
}
/* (non-Javadoc)
* @see
org.apache.servicemix.common.endpoints.ConsumerEndpoint#validate()
*/
public void validate() throws DeploymentException {
super.validate();
if (uri == null && (getSshServerInfo() == null ||
getSshServerInfo().getHost() == null)) {
throw new DeploymentException("Property uri or SSH Server
Information must be configured");
}
if (uri != null && getSshServerInfo() != null &&
getSshServerInfo().getHost() != null) {
throw new DeploymentException("Properties uri and SSH Server
Information can not be configured at the same time");
}
}
/* (non-Javadoc)
* @see org.apache.servicemix.common.endpoints.PollingEndpoint#start()
*/
public void start() throws Exception {
if (lockManager == null) {
lockManager = createLockManager();
}
if (uri != null) {
String host = uri.getHost();
int port = uri.getPort();
String password = "";
String username = "";
if (uri.getUserInfo() != null) {
String[] infos = uri.getUserInfo().split(":");
username = infos[0];
if (infos.length > 1) {
password = infos[1];
}
}
sshServerInfo = new JschPoolableKey(host, port, username,
password);
}
else {
String str = "sftp://" + sshServerInfo.getHost();
if (sshServerInfo.getPort() >= 0) {
str += ":" + sshServerInfo.getPort();
}
str += "/";
uri = new URI(str);
}
super.start();
}
/**
* @return
*/
protected LockManager createLockManager() {
return new SimpleLockManager();
}
/**
* @return
*/
private String getWorkingPath() {
return (uri != null && uri.getPath() != null) ? uri.getPath() : ".";
}
// Properties
//-------------------------------------------------------------------------
/**
* @return the clientPool
*/
public KeyedObjectPool getClientPool() {
return sshClientPool;
}
/**
* @param clientPool the clientPool to set
*/
public void setClientPool(KeyedObjectPool clientPool) {
this.sshClientPool = clientPool;
}
/**
* @return the sshServerInfo
*/
public JschPoolableKey getSshServerInfo() {
return sshServerInfo;
}
/**
* @param sshServerInfo the sshServerInfo to set
*/
public void setSshServerInfo(JschPoolableKey sshServerInfo) {
this.sshServerInfo = sshServerInfo;
}
/**
* @return the uri
*/
public URI getUri() {
return uri;
}
/**
* @param uri the uri to set
*/
public void setUri(URI uri) {
this.uri = uri;
}
public FileFilter getFilter() {
return filter;
}
/**
* Sets the optional filter to choose which files to process
*/
public void setFilter(FileFilter filter) {
this.filter = filter;
}
/**
* Returns whether or not we should delete the file when its processed
*/
public boolean isDeleteFile() {
return deleteFile;
}
/**
* @param deleteFile
*/
public void setDeleteFile(boolean deleteFile) {
this.deleteFile = deleteFile;
}
/**
* @return
*/
public boolean isRecursive() {
return recursive;
}
/**
* @param recursive
*/
public void setRecursive(boolean recursive) {
this.recursive = recursive;
}
/**
* @return
*/
public FileMarshaler getMarshaler() {
return marshaler;
}
/**
* @param marshaler
*/
public void setMarshaler(FileMarshaler marshaler) {
this.marshaler = marshaler;
}
// Implementation methods
//-------------------------------------------------------------------------
/**
* @param fileOrDirectory
* @throws Exception
*/
protected void pollFileOrDirectory(String fileOrDirectory) throws
Exception {
JschSessionWrapper sftp = borrowClient();
try {
logger.debug("Polling directory " + fileOrDirectory);
Channel channel = sftp.getSession().openChannel("sftp");
channel.connect();
ChannelSftp sftpChannel = (ChannelSftp)channel;
pollFileOrDirectory(sftpChannel, fileOrDirectory, true);
}
finally {
returnClient(sftp);
}
}
/**
* @param sftpChannel
* @param fileOrDirectory
* @param processDir
* @throws Exception
*/
protected void pollFileOrDirectory(ChannelSftp sftpChannel, String
fileOrDirectory, boolean processDir) throws Exception {
Vector files = sftpChannel.ls(fileOrDirectory);
if(files != null && !files.isEmpty()){
for(int i=0; i < files.size(); i++){
LsEntry entry = (LsEntry)files.elementAt(i);
SftpATTRS attrs = entry.getAttrs();
String name = entry.getLongname();
// Ignore "." and ".."
if (name.equals(".") || name.equals("..")) {
continue;
}
String file = fileOrDirectory + "/" + name;
if (!attrs.isDir()) { // This is a file, process it.
if (getFilter() == null || getFilter().accept(new
File(file))) {
pollFile(file); // Process the file.
}
}
else if (processDir) { // Only process directories if
processDir is true
logger.debug("Polling directory " + file);
pollFileOrDirectory(sftpChannel, file, isRecursive());
}
else {
logger.debug("Skipping directory " + file);
}
}
}
}
/**
* @param file
*/
protected void pollFile(final String file) {
logger.debug("Scheduling file " + file + " for processing");
getExecutor().execute(new Runnable() {
public void run() {
final Lock lock = lockManager.getLock(file);
if (lock.tryLock()) {
boolean unlock = true;
try {
unlock = processFileAndDelete(file);
}
finally {
if (unlock) {
lock.unlock();
}
}
}
}
});
}
/**
* @param file
* @return
*/
protected boolean processFileAndDelete(String file) {
JschSessionWrapper sftp = null;
boolean unlock = true;
try {
sftp = borrowClient();
Channel channel = sftp.getSession().openChannel("sftp");
channel.connect();
ChannelSftp sftpChannel = (ChannelSftp)channel;
// Process the file. If processing fails, an exception should be
thrown.
logger.debug("Processing file " + file);
processFile(sftpChannel, file);
// Processing is succesfull, we should not unlock until the file
has been deleted.
unlock = false;
if (isDeleteFile()) {
sftpChannel.rm(file);
unlock = true;
}
}
catch (Exception e) {
logger.error("Failed to process file: " + file + ". Reason: " +
e, e);
}
finally {
returnClient(sftp);
}
return unlock;
}
/**
* @param sftpChannel
* @param file
* @throws Exception
*/
protected void processFile(ChannelSftp sftpChannel, String file) throws
Exception {
InputStream in = sftpChannel.get(file);
InOnly exchange = getExchangeFactory().createInOnlyExchange();
configureExchangeTarget(exchange);
NormalizedMessage message = exchange.createMessage();
exchange.setInMessage(message);
marshaler.readMessage(exchange, message, in, file);
sendSync(exchange);
in.close();
if (exchange.getStatus() == ExchangeStatus.ERROR) {
Exception e = exchange.getError();
if (e == null) {
e = new JBIException("Unkown error");
}
throw e;
}
}
/* (non-Javadoc)
* @see
org.apache.servicemix.common.endpoints.ConsumerEndpoint#getLocationURI()
*/
public String getLocationURI() {
return uri.toString();
}
/* (non-Javadoc)
* @see
org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
*/
public void process(MessageExchange exchange) throws Exception {
// Do nothing. In our case, this method should never be called
// as we only send synchronous InOnly exchange
}
/**
* Retrieve ssh session from a pool.
*
* @return
* @throws JBIException
*/
protected JschSessionWrapper borrowClient() throws JBIException {
JschSessionWrapper session = null;
try {
session =
(JschSessionWrapper)getClientPool().borrowObject(sshServerInfo);
}
catch (Exception e) {
throw new JBIException("Failed to retrieve Jsch session from
pool.", e);
}
if (session != null && session.getSession().isConnected()) {
logger.info("Connected to jsch session at " +
session.getSession().getUserName() + "@" + session.getSession().getHost() +
":" + session.getSession().getPort());
}
return session;
}
/**
* @param client
*/
protected void returnClient(JschSessionWrapper client) {
if (client != null) {
try {
getClientPool().returnObject(client, sshServerInfo);
}
catch (Exception e) {
logger.error("Failed to return client to pool: " + e, e);
}
}
}
}
--
View this message in context: http://www.nabble.com/SFTP-binding-component-with-jcraft-tf3953802s12049.html#a11218136
Sent from the ServiceMix - User mailing list archive at Nabble.com.
Re: SFTP binding component with jcraft
Posted by netflexity <mf...@netflexity.com>.
JIRA opened: https://issues.apache.org/activemq/browse/SM-974
Thank you.
-Max
netflexity wrote:
>
> Thanks, will do.
>
>
> gnodet wrote:
>>
>> This is message for servicemix-dev btw.
>> The code looks good. Would you mind creating a JIRA and attach a patch
>> or
>> zip with the complete project ?
>> AFAIK, the license for jcraft is BSD, so there should be no problem on
>> this
>> side.
>>
>> On 6/20/07, netflexity <mf...@netflexity.com> wrote:
>>>
>>>
>>> Hello, all!
>>> I have a working SFTP binding component. Please let me know what I need
>>> to
>>> change to commit it.
>>>
>>> Here is the sample poller configuration. Sender is available as well:
>>> <sftp:poller service="sample:processSftpFile"
>>> targetService="sample:logPollableSshFile" endpoint="soap"
>>> uri="sftp://user:pass@sshhostname/sftp_dir">
>>> <sftp:clientPool>
>>> <ref bean="sshConnectionPool"/>
>>> </sftp:clientPool>
>>> </sftp:poller>
>>>
>>> /**
>>> * A polling endpoint which looks for a file or files in a directory
>>> * and sends the files into the JBI bus as messages, deleting the files
>>> * by default when they are processed.
>>> *
>>> * @org.apache.xbean.XBean element="poller"
>>> *
>>> * @version $Revision: 468487 $
>>> */
>>> public class SftpPollerEndpoint extends PollingEndpoint implements
>>> SftpEndpointType {
>>>
>>> private KeyedObjectPool sshClientPool;
>>> private JschPoolableKey sshServerInfo;
>>> private FileFilter filter;
>>> private boolean deleteFile = true;
>>> private boolean recursive = true;
>>> private FileMarshaler marshaler = new DefaultFileMarshaler();
>>> private LockManager lockManager;
>>> private URI uri;
>>>
>>> public SftpPollerEndpoint() {
>>> }
>>>
>>> /**
>>> * @param serviceUnit
>>> * @param service
>>> * @param endpoint
>>> */
>>> public SftpPollerEndpoint(ServiceUnit serviceUnit, QName service,
>>> String
>>> endpoint) {
>>> super(serviceUnit, service, endpoint);
>>> }
>>>
>>> /**
>>> * @param component
>>> * @param endpoint
>>> */
>>> public SftpPollerEndpoint(DefaultComponent component,
>>> ServiceEndpoint
>>> endpoint) {
>>> super(component, endpoint);
>>> }
>>>
>>> /* (non-Javadoc)
>>> * @see
>>> org.apache.servicemix.common.endpoints.PollingEndpoint#poll()
>>> */
>>> public void poll() throws Exception {
>>> pollFileOrDirectory(getWorkingPath());
>>> }
>>>
>>> /* (non-Javadoc)
>>> * @see
>>> org.apache.servicemix.common.endpoints.ConsumerEndpoint#validate()
>>> */
>>> public void validate() throws DeploymentException {
>>> super.validate();
>>> if (uri == null && (getSshServerInfo() == null ||
>>> getSshServerInfo().getHost() == null)) {
>>> throw new DeploymentException("Property uri or SSH Server
>>> Information must be configured");
>>> }
>>> if (uri != null && getSshServerInfo() != null &&
>>> getSshServerInfo().getHost() != null) {
>>> throw new DeploymentException("Properties uri and SSH Server
>>> Information can not be configured at the same time");
>>> }
>>> }
>>>
>>> /* (non-Javadoc)
>>> * @see
>>> org.apache.servicemix.common.endpoints.PollingEndpoint#start()
>>> */
>>> public void start() throws Exception {
>>> if (lockManager == null) {
>>> lockManager = createLockManager();
>>> }
>>>
>>> if (uri != null) {
>>> String host = uri.getHost();
>>> int port = uri.getPort();
>>> String password = "";
>>> String username = "";
>>> if (uri.getUserInfo() != null) {
>>> String[] infos = uri.getUserInfo().split(":");
>>> username = infos[0];
>>> if (infos.length > 1) {
>>> password = infos[1];
>>> }
>>> }
>>> sshServerInfo = new JschPoolableKey(host, port, username,
>>> password);
>>> }
>>> else {
>>> String str = "sftp://" + sshServerInfo.getHost();
>>> if (sshServerInfo.getPort() >= 0) {
>>> str += ":" + sshServerInfo.getPort();
>>> }
>>> str += "/";
>>> uri = new URI(str);
>>> }
>>> super.start();
>>> }
>>>
>>> /**
>>> * @return
>>> */
>>> protected LockManager createLockManager() {
>>> return new SimpleLockManager();
>>> }
>>>
>>> /**
>>> * @return
>>> */
>>> private String getWorkingPath() {
>>> return (uri != null && uri.getPath() != null) ? uri.getPath() :
>>> ".";
>>> }
>>>
>>> // Properties
>>>
>>>
>>> //-------------------------------------------------------------------------
>>> /**
>>> * @return the clientPool
>>> */
>>> public KeyedObjectPool getClientPool() {
>>> return sshClientPool;
>>> }
>>>
>>> /**
>>> * @param clientPool the clientPool to set
>>> */
>>> public void setClientPool(KeyedObjectPool clientPool) {
>>> this.sshClientPool = clientPool;
>>> }
>>>
>>> /**
>>> * @return the sshServerInfo
>>> */
>>> public JschPoolableKey getSshServerInfo() {
>>> return sshServerInfo;
>>> }
>>>
>>> /**
>>> * @param sshServerInfo the sshServerInfo to set
>>> */
>>> public void setSshServerInfo(JschPoolableKey sshServerInfo) {
>>> this.sshServerInfo = sshServerInfo;
>>> }
>>>
>>> /**
>>> * @return the uri
>>> */
>>> public URI getUri() {
>>> return uri;
>>> }
>>>
>>> /**
>>> * @param uri the uri to set
>>> */
>>> public void setUri(URI uri) {
>>> this.uri = uri;
>>> }
>>>
>>> public FileFilter getFilter() {
>>> return filter;
>>> }
>>>
>>> /**
>>> * Sets the optional filter to choose which files to process
>>> */
>>> public void setFilter(FileFilter filter) {
>>> this.filter = filter;
>>> }
>>>
>>> /**
>>> * Returns whether or not we should delete the file when its
>>> processed
>>> */
>>> public boolean isDeleteFile() {
>>> return deleteFile;
>>> }
>>>
>>> /**
>>> * @param deleteFile
>>> */
>>> public void setDeleteFile(boolean deleteFile) {
>>> this.deleteFile = deleteFile;
>>> }
>>>
>>> /**
>>> * @return
>>> */
>>> public boolean isRecursive() {
>>> return recursive;
>>> }
>>>
>>> /**
>>> * @param recursive
>>> */
>>> public void setRecursive(boolean recursive) {
>>> this.recursive = recursive;
>>> }
>>>
>>> /**
>>> * @return
>>> */
>>> public FileMarshaler getMarshaler() {
>>> return marshaler;
>>> }
>>>
>>> /**
>>> * @param marshaler
>>> */
>>> public void setMarshaler(FileMarshaler marshaler) {
>>> this.marshaler = marshaler;
>>> }
>>>
>>> // Implementation methods
>>>
>>>
>>> //-------------------------------------------------------------------------
>>>
>>>
>>> /**
>>> * @param fileOrDirectory
>>> * @throws Exception
>>> */
>>> protected void pollFileOrDirectory(String fileOrDirectory) throws
>>> Exception {
>>> JschSessionWrapper sftp = borrowClient();
>>> try {
>>> logger.debug("Polling directory " + fileOrDirectory);
>>> Channel channel = sftp.getSession().openChannel("sftp");
>>> channel.connect();
>>> ChannelSftp sftpChannel = (ChannelSftp)channel;
>>> pollFileOrDirectory(sftpChannel, fileOrDirectory, true);
>>> }
>>> finally {
>>> returnClient(sftp);
>>> }
>>> }
>>>
>>> /**
>>> * @param sftpChannel
>>> * @param fileOrDirectory
>>> * @param processDir
>>> * @throws Exception
>>> */
>>> protected void pollFileOrDirectory(ChannelSftp sftpChannel, String
>>> fileOrDirectory, boolean processDir) throws Exception {
>>> Vector files = sftpChannel.ls(fileOrDirectory);
>>> if(files != null && !files.isEmpty()){
>>> for(int i=0; i < files.size(); i++){
>>> LsEntry entry = (LsEntry)files.elementAt(i);
>>> SftpATTRS attrs = entry.getAttrs();
>>> String name = entry.getLongname();
>>>
>>> // Ignore "." and ".."
>>> if (name.equals(".") || name.equals("..")) {
>>> continue;
>>> }
>>>
>>> String file = fileOrDirectory + "/" + name;
>>> if (!attrs.isDir()) { // This is a file,
>>> process
>>> it.
>>> if (getFilter() == null ||
>>> getFilter().accept(new
>>> File(file))) {
>>> pollFile(file); // Process the file.
>>> }
>>> }
>>> else if (processDir) { // Only process
>>> directories if
>>> processDir is true
>>> logger.debug("Polling directory " + file);
>>> pollFileOrDirectory(sftpChannel, file,
>>> isRecursive());
>>> }
>>> else {
>>> logger.debug("Skipping directory " + file);
>>> }
>>> }
>>> }
>>> }
>>>
>>> /**
>>> * @param file
>>> */
>>> protected void pollFile(final String file) {
>>> logger.debug("Scheduling file " + file + " for processing");
>>> getExecutor().execute(new Runnable() {
>>> public void run() {
>>> final Lock lock = lockManager.getLock(file);
>>> if (lock.tryLock()) {
>>> boolean unlock = true;
>>> try {
>>> unlock = processFileAndDelete(file);
>>> }
>>> finally {
>>> if (unlock) {
>>> lock.unlock();
>>> }
>>> }
>>> }
>>> }
>>> });
>>> }
>>>
>>> /**
>>> * @param file
>>> * @return
>>> */
>>> protected boolean processFileAndDelete(String file) {
>>> JschSessionWrapper sftp = null;
>>> boolean unlock = true;
>>> try {
>>> sftp = borrowClient();
>>> Channel channel = sftp.getSession().openChannel("sftp");
>>> channel.connect();
>>> ChannelSftp sftpChannel = (ChannelSftp)channel;
>>>
>>> // Process the file. If processing fails, an exception
>>> should
>>> be
>>> thrown.
>>> logger.debug("Processing file " + file);
>>> processFile(sftpChannel, file);
>>>
>>> // Processing is succesfull, we should not unlock until the
>>> file
>>> has been deleted.
>>> unlock = false;
>>> if (isDeleteFile()) {
>>> sftpChannel.rm(file);
>>> unlock = true;
>>> }
>>> }
>>> catch (Exception e) {
>>> logger.error("Failed to process file: " + file + ". Reason:
>>> "
>>> +
>>> e, e);
>>> }
>>> finally {
>>> returnClient(sftp);
>>> }
>>> return unlock;
>>> }
>>>
>>> /**
>>> * @param sftpChannel
>>> * @param file
>>> * @throws Exception
>>> */
>>> protected void processFile(ChannelSftp sftpChannel, String file)
>>> throws
>>> Exception {
>>> InputStream in = sftpChannel.get(file);
>>> InOnly exchange = getExchangeFactory().createInOnlyExchange();
>>> configureExchangeTarget(exchange);
>>> NormalizedMessage message = exchange.createMessage();
>>> exchange.setInMessage(message);
>>> marshaler.readMessage(exchange, message, in, file);
>>> sendSync(exchange);
>>> in.close();
>>> if (exchange.getStatus() == ExchangeStatus.ERROR) {
>>> Exception e = exchange.getError();
>>> if (e == null) {
>>> e = new JBIException("Unkown error");
>>> }
>>> throw e;
>>> }
>>> }
>>>
>>> /* (non-Javadoc)
>>> * @see
>>> org.apache.servicemix.common.endpoints.ConsumerEndpoint#getLocationURI()
>>> */
>>> public String getLocationURI() {
>>> return uri.toString();
>>> }
>>>
>>> /* (non-Javadoc)
>>> * @see
>>> org.apache.servicemix.common.ExchangeProcessor#process(
>>> javax.jbi.messaging.MessageExchange)
>>> */
>>> public void process(MessageExchange exchange) throws Exception {
>>> // Do nothing. In our case, this method should never be called
>>> // as we only send synchronous InOnly exchange
>>> }
>>>
>>> /**
>>> * Retrieve ssh session from a pool.
>>> *
>>> * @return
>>> * @throws JBIException
>>> */
>>> protected JschSessionWrapper borrowClient() throws JBIException {
>>> JschSessionWrapper session = null;
>>> try {
>>> session =
>>> (JschSessionWrapper)getClientPool().borrowObject(sshServerInfo);
>>> }
>>> catch (Exception e) {
>>> throw new JBIException("Failed to retrieve Jsch session from
>>> pool.", e);
>>> }
>>>
>>> if (session != null && session.getSession().isConnected()) {
>>> logger.info("Connected to jsch session at " +
>>> session.getSession().getUserName() + "@" +
>>> session.getSession().getHost()
>>> +
>>> ":" + session.getSession().getPort());
>>> }
>>> return session;
>>> }
>>>
>>> /**
>>> * @param client
>>> */
>>> protected void returnClient(JschSessionWrapper client) {
>>> if (client != null) {
>>> try {
>>> getClientPool().returnObject(client, sshServerInfo);
>>> }
>>> catch (Exception e) {
>>> logger.error("Failed to return client to pool: " + e,
>>> e);
>>> }
>>> }
>>> }
>>> }
>>> --
>>> View this message in context:
>>> http://www.nabble.com/SFTP-binding-component-with-jcraft-tf3953802s12049.html#a11218136
>>> Sent from the ServiceMix - User mailing list archive at Nabble.com.
>>>
>>>
>>
>>
>> --
>> Cheers,
>> Guillaume Nodet
>> ------------------------
>> Principal Engineer, IONA
>> Blog: http://gnodet.blogspot.com/
>>
>>
>
>
--
View this message in context: http://www.nabble.com/SFTP-binding-component-with-jcraft-tf3953802s12049.html#a11541824
Sent from the ServiceMix - User mailing list archive at Nabble.com.
Re: SFTP binding component with jcraft
Posted by netflexity <mf...@netflexity.com>.
Thanks, will do.
gnodet wrote:
>
> This is message for servicemix-dev btw.
> The code looks good. Would you mind creating a JIRA and attach a patch or
> zip with the complete project ?
> AFAIK, the license for jcraft is BSD, so there should be no problem on
> this
> side.
>
> On 6/20/07, netflexity <mf...@netflexity.com> wrote:
>>
>>
>> Hello, all!
>> I have a working SFTP binding component. Please let me know what I need
>> to
>> change to commit it.
>>
>> Here is the sample poller configuration. Sender is available as well:
>> <sftp:poller service="sample:processSftpFile"
>> targetService="sample:logPollableSshFile" endpoint="soap"
>> uri="sftp://user:pass@sshhostname/sftp_dir">
>> <sftp:clientPool>
>> <ref bean="sshConnectionPool"/>
>> </sftp:clientPool>
>> </sftp:poller>
>>
>> /**
>> * A polling endpoint which looks for a file or files in a directory
>> * and sends the files into the JBI bus as messages, deleting the files
>> * by default when they are processed.
>> *
>> * @org.apache.xbean.XBean element="poller"
>> *
>> * @version $Revision: 468487 $
>> */
>> public class SftpPollerEndpoint extends PollingEndpoint implements
>> SftpEndpointType {
>>
>> private KeyedObjectPool sshClientPool;
>> private JschPoolableKey sshServerInfo;
>> private FileFilter filter;
>> private boolean deleteFile = true;
>> private boolean recursive = true;
>> private FileMarshaler marshaler = new DefaultFileMarshaler();
>> private LockManager lockManager;
>> private URI uri;
>>
>> public SftpPollerEndpoint() {
>> }
>>
>> /**
>> * @param serviceUnit
>> * @param service
>> * @param endpoint
>> */
>> public SftpPollerEndpoint(ServiceUnit serviceUnit, QName service,
>> String
>> endpoint) {
>> super(serviceUnit, service, endpoint);
>> }
>>
>> /**
>> * @param component
>> * @param endpoint
>> */
>> public SftpPollerEndpoint(DefaultComponent component, ServiceEndpoint
>> endpoint) {
>> super(component, endpoint);
>> }
>>
>> /* (non-Javadoc)
>> * @see org.apache.servicemix.common.endpoints.PollingEndpoint#poll()
>> */
>> public void poll() throws Exception {
>> pollFileOrDirectory(getWorkingPath());
>> }
>>
>> /* (non-Javadoc)
>> * @see
>> org.apache.servicemix.common.endpoints.ConsumerEndpoint#validate()
>> */
>> public void validate() throws DeploymentException {
>> super.validate();
>> if (uri == null && (getSshServerInfo() == null ||
>> getSshServerInfo().getHost() == null)) {
>> throw new DeploymentException("Property uri or SSH Server
>> Information must be configured");
>> }
>> if (uri != null && getSshServerInfo() != null &&
>> getSshServerInfo().getHost() != null) {
>> throw new DeploymentException("Properties uri and SSH Server
>> Information can not be configured at the same time");
>> }
>> }
>>
>> /* (non-Javadoc)
>> * @see
>> org.apache.servicemix.common.endpoints.PollingEndpoint#start()
>> */
>> public void start() throws Exception {
>> if (lockManager == null) {
>> lockManager = createLockManager();
>> }
>>
>> if (uri != null) {
>> String host = uri.getHost();
>> int port = uri.getPort();
>> String password = "";
>> String username = "";
>> if (uri.getUserInfo() != null) {
>> String[] infos = uri.getUserInfo().split(":");
>> username = infos[0];
>> if (infos.length > 1) {
>> password = infos[1];
>> }
>> }
>> sshServerInfo = new JschPoolableKey(host, port, username,
>> password);
>> }
>> else {
>> String str = "sftp://" + sshServerInfo.getHost();
>> if (sshServerInfo.getPort() >= 0) {
>> str += ":" + sshServerInfo.getPort();
>> }
>> str += "/";
>> uri = new URI(str);
>> }
>> super.start();
>> }
>>
>> /**
>> * @return
>> */
>> protected LockManager createLockManager() {
>> return new SimpleLockManager();
>> }
>>
>> /**
>> * @return
>> */
>> private String getWorkingPath() {
>> return (uri != null && uri.getPath() != null) ? uri.getPath() :
>> ".";
>> }
>>
>> // Properties
>>
>>
>> //-------------------------------------------------------------------------
>> /**
>> * @return the clientPool
>> */
>> public KeyedObjectPool getClientPool() {
>> return sshClientPool;
>> }
>>
>> /**
>> * @param clientPool the clientPool to set
>> */
>> public void setClientPool(KeyedObjectPool clientPool) {
>> this.sshClientPool = clientPool;
>> }
>>
>> /**
>> * @return the sshServerInfo
>> */
>> public JschPoolableKey getSshServerInfo() {
>> return sshServerInfo;
>> }
>>
>> /**
>> * @param sshServerInfo the sshServerInfo to set
>> */
>> public void setSshServerInfo(JschPoolableKey sshServerInfo) {
>> this.sshServerInfo = sshServerInfo;
>> }
>>
>> /**
>> * @return the uri
>> */
>> public URI getUri() {
>> return uri;
>> }
>>
>> /**
>> * @param uri the uri to set
>> */
>> public void setUri(URI uri) {
>> this.uri = uri;
>> }
>>
>> public FileFilter getFilter() {
>> return filter;
>> }
>>
>> /**
>> * Sets the optional filter to choose which files to process
>> */
>> public void setFilter(FileFilter filter) {
>> this.filter = filter;
>> }
>>
>> /**
>> * Returns whether or not we should delete the file when its
>> processed
>> */
>> public boolean isDeleteFile() {
>> return deleteFile;
>> }
>>
>> /**
>> * @param deleteFile
>> */
>> public void setDeleteFile(boolean deleteFile) {
>> this.deleteFile = deleteFile;
>> }
>>
>> /**
>> * @return
>> */
>> public boolean isRecursive() {
>> return recursive;
>> }
>>
>> /**
>> * @param recursive
>> */
>> public void setRecursive(boolean recursive) {
>> this.recursive = recursive;
>> }
>>
>> /**
>> * @return
>> */
>> public FileMarshaler getMarshaler() {
>> return marshaler;
>> }
>>
>> /**
>> * @param marshaler
>> */
>> public void setMarshaler(FileMarshaler marshaler) {
>> this.marshaler = marshaler;
>> }
>>
>> // Implementation methods
>>
>>
>> //-------------------------------------------------------------------------
>>
>>
>> /**
>> * @param fileOrDirectory
>> * @throws Exception
>> */
>> protected void pollFileOrDirectory(String fileOrDirectory) throws
>> Exception {
>> JschSessionWrapper sftp = borrowClient();
>> try {
>> logger.debug("Polling directory " + fileOrDirectory);
>> Channel channel = sftp.getSession().openChannel("sftp");
>> channel.connect();
>> ChannelSftp sftpChannel = (ChannelSftp)channel;
>> pollFileOrDirectory(sftpChannel, fileOrDirectory, true);
>> }
>> finally {
>> returnClient(sftp);
>> }
>> }
>>
>> /**
>> * @param sftpChannel
>> * @param fileOrDirectory
>> * @param processDir
>> * @throws Exception
>> */
>> protected void pollFileOrDirectory(ChannelSftp sftpChannel, String
>> fileOrDirectory, boolean processDir) throws Exception {
>> Vector files = sftpChannel.ls(fileOrDirectory);
>> if(files != null && !files.isEmpty()){
>> for(int i=0; i < files.size(); i++){
>> LsEntry entry = (LsEntry)files.elementAt(i);
>> SftpATTRS attrs = entry.getAttrs();
>> String name = entry.getLongname();
>>
>> // Ignore "." and ".."
>> if (name.equals(".") || name.equals("..")) {
>> continue;
>> }
>>
>> String file = fileOrDirectory + "/" + name;
>> if (!attrs.isDir()) { // This is a file,
>> process
>> it.
>> if (getFilter() == null || getFilter().accept(new
>> File(file))) {
>> pollFile(file); // Process the file.
>> }
>> }
>> else if (processDir) { // Only process
>> directories if
>> processDir is true
>> logger.debug("Polling directory " + file);
>> pollFileOrDirectory(sftpChannel, file,
>> isRecursive());
>> }
>> else {
>> logger.debug("Skipping directory " + file);
>> }
>> }
>> }
>> }
>>
>> /**
>> * @param file
>> */
>> protected void pollFile(final String file) {
>> logger.debug("Scheduling file " + file + " for processing");
>> getExecutor().execute(new Runnable() {
>> public void run() {
>> final Lock lock = lockManager.getLock(file);
>> if (lock.tryLock()) {
>> boolean unlock = true;
>> try {
>> unlock = processFileAndDelete(file);
>> }
>> finally {
>> if (unlock) {
>> lock.unlock();
>> }
>> }
>> }
>> }
>> });
>> }
>>
>> /**
>> * @param file
>> * @return
>> */
>> protected boolean processFileAndDelete(String file) {
>> JschSessionWrapper sftp = null;
>> boolean unlock = true;
>> try {
>> sftp = borrowClient();
>> Channel channel = sftp.getSession().openChannel("sftp");
>> channel.connect();
>> ChannelSftp sftpChannel = (ChannelSftp)channel;
>>
>> // Process the file. If processing fails, an exception should
>> be
>> thrown.
>> logger.debug("Processing file " + file);
>> processFile(sftpChannel, file);
>>
>> // Processing is succesfull, we should not unlock until the
>> file
>> has been deleted.
>> unlock = false;
>> if (isDeleteFile()) {
>> sftpChannel.rm(file);
>> unlock = true;
>> }
>> }
>> catch (Exception e) {
>> logger.error("Failed to process file: " + file + ". Reason: "
>> +
>> e, e);
>> }
>> finally {
>> returnClient(sftp);
>> }
>> return unlock;
>> }
>>
>> /**
>> * @param sftpChannel
>> * @param file
>> * @throws Exception
>> */
>> protected void processFile(ChannelSftp sftpChannel, String file)
>> throws
>> Exception {
>> InputStream in = sftpChannel.get(file);
>> InOnly exchange = getExchangeFactory().createInOnlyExchange();
>> configureExchangeTarget(exchange);
>> NormalizedMessage message = exchange.createMessage();
>> exchange.setInMessage(message);
>> marshaler.readMessage(exchange, message, in, file);
>> sendSync(exchange);
>> in.close();
>> if (exchange.getStatus() == ExchangeStatus.ERROR) {
>> Exception e = exchange.getError();
>> if (e == null) {
>> e = new JBIException("Unkown error");
>> }
>> throw e;
>> }
>> }
>>
>> /* (non-Javadoc)
>> * @see
>> org.apache.servicemix.common.endpoints.ConsumerEndpoint#getLocationURI()
>> */
>> public String getLocationURI() {
>> return uri.toString();
>> }
>>
>> /* (non-Javadoc)
>> * @see
>> org.apache.servicemix.common.ExchangeProcessor#process(
>> javax.jbi.messaging.MessageExchange)
>> */
>> public void process(MessageExchange exchange) throws Exception {
>> // Do nothing. In our case, this method should never be called
>> // as we only send synchronous InOnly exchange
>> }
>>
>> /**
>> * Retrieve ssh session from a pool.
>> *
>> * @return
>> * @throws JBIException
>> */
>> protected JschSessionWrapper borrowClient() throws JBIException {
>> JschSessionWrapper session = null;
>> try {
>> session =
>> (JschSessionWrapper)getClientPool().borrowObject(sshServerInfo);
>> }
>> catch (Exception e) {
>> throw new JBIException("Failed to retrieve Jsch session from
>> pool.", e);
>> }
>>
>> if (session != null && session.getSession().isConnected()) {
>> logger.info("Connected to jsch session at " +
>> session.getSession().getUserName() + "@" + session.getSession().getHost()
>> +
>> ":" + session.getSession().getPort());
>> }
>> return session;
>> }
>>
>> /**
>> * @param client
>> */
>> protected void returnClient(JschSessionWrapper client) {
>> if (client != null) {
>> try {
>> getClientPool().returnObject(client, sshServerInfo);
>> }
>> catch (Exception e) {
>> logger.error("Failed to return client to pool: " + e, e);
>> }
>> }
>> }
>> }
>> --
>> View this message in context:
>> http://www.nabble.com/SFTP-binding-component-with-jcraft-tf3953802s12049.html#a11218136
>> Sent from the ServiceMix - User mailing list archive at Nabble.com.
>>
>>
>
>
> --
> Cheers,
> Guillaume Nodet
> ------------------------
> Principal Engineer, IONA
> Blog: http://gnodet.blogspot.com/
>
>
--
View this message in context: http://www.nabble.com/SFTP-binding-component-with-jcraft-tf3953802s12049.html#a11220100
Sent from the ServiceMix - User mailing list archive at Nabble.com.
Re: SFTP binding component with jcraft
Posted by Guillaume Nodet <gn...@gmail.com>.
This is message for servicemix-dev btw.
The code looks good. Would you mind creating a JIRA and attach a patch or
zip with the complete project ?
AFAIK, the license for jcraft is BSD, so there should be no problem on this
side.
On 6/20/07, netflexity <mf...@netflexity.com> wrote:
>
>
> Hello, all!
> I have a working SFTP binding component. Please let me know what I need to
> change to commit it.
>
> Here is the sample poller configuration. Sender is available as well:
> <sftp:poller service="sample:processSftpFile"
> targetService="sample:logPollableSshFile" endpoint="soap"
> uri="sftp://user:pass@sshhostname/sftp_dir">
> <sftp:clientPool>
> <ref bean="sshConnectionPool"/>
> </sftp:clientPool>
> </sftp:poller>
>
> /**
> * A polling endpoint which looks for a file or files in a directory
> * and sends the files into the JBI bus as messages, deleting the files
> * by default when they are processed.
> *
> * @org.apache.xbean.XBean element="poller"
> *
> * @version $Revision: 468487 $
> */
> public class SftpPollerEndpoint extends PollingEndpoint implements
> SftpEndpointType {
>
> private KeyedObjectPool sshClientPool;
> private JschPoolableKey sshServerInfo;
> private FileFilter filter;
> private boolean deleteFile = true;
> private boolean recursive = true;
> private FileMarshaler marshaler = new DefaultFileMarshaler();
> private LockManager lockManager;
> private URI uri;
>
> public SftpPollerEndpoint() {
> }
>
> /**
> * @param serviceUnit
> * @param service
> * @param endpoint
> */
> public SftpPollerEndpoint(ServiceUnit serviceUnit, QName service,
> String
> endpoint) {
> super(serviceUnit, service, endpoint);
> }
>
> /**
> * @param component
> * @param endpoint
> */
> public SftpPollerEndpoint(DefaultComponent component, ServiceEndpoint
> endpoint) {
> super(component, endpoint);
> }
>
> /* (non-Javadoc)
> * @see org.apache.servicemix.common.endpoints.PollingEndpoint#poll()
> */
> public void poll() throws Exception {
> pollFileOrDirectory(getWorkingPath());
> }
>
> /* (non-Javadoc)
> * @see
> org.apache.servicemix.common.endpoints.ConsumerEndpoint#validate()
> */
> public void validate() throws DeploymentException {
> super.validate();
> if (uri == null && (getSshServerInfo() == null ||
> getSshServerInfo().getHost() == null)) {
> throw new DeploymentException("Property uri or SSH Server
> Information must be configured");
> }
> if (uri != null && getSshServerInfo() != null &&
> getSshServerInfo().getHost() != null) {
> throw new DeploymentException("Properties uri and SSH Server
> Information can not be configured at the same time");
> }
> }
>
> /* (non-Javadoc)
> * @see org.apache.servicemix.common.endpoints.PollingEndpoint#start()
> */
> public void start() throws Exception {
> if (lockManager == null) {
> lockManager = createLockManager();
> }
>
> if (uri != null) {
> String host = uri.getHost();
> int port = uri.getPort();
> String password = "";
> String username = "";
> if (uri.getUserInfo() != null) {
> String[] infos = uri.getUserInfo().split(":");
> username = infos[0];
> if (infos.length > 1) {
> password = infos[1];
> }
> }
> sshServerInfo = new JschPoolableKey(host, port, username,
> password);
> }
> else {
> String str = "sftp://" + sshServerInfo.getHost();
> if (sshServerInfo.getPort() >= 0) {
> str += ":" + sshServerInfo.getPort();
> }
> str += "/";
> uri = new URI(str);
> }
> super.start();
> }
>
> /**
> * @return
> */
> protected LockManager createLockManager() {
> return new SimpleLockManager();
> }
>
> /**
> * @return
> */
> private String getWorkingPath() {
> return (uri != null && uri.getPath() != null) ? uri.getPath() :
> ".";
> }
>
> // Properties
>
>
> //-------------------------------------------------------------------------
> /**
> * @return the clientPool
> */
> public KeyedObjectPool getClientPool() {
> return sshClientPool;
> }
>
> /**
> * @param clientPool the clientPool to set
> */
> public void setClientPool(KeyedObjectPool clientPool) {
> this.sshClientPool = clientPool;
> }
>
> /**
> * @return the sshServerInfo
> */
> public JschPoolableKey getSshServerInfo() {
> return sshServerInfo;
> }
>
> /**
> * @param sshServerInfo the sshServerInfo to set
> */
> public void setSshServerInfo(JschPoolableKey sshServerInfo) {
> this.sshServerInfo = sshServerInfo;
> }
>
> /**
> * @return the uri
> */
> public URI getUri() {
> return uri;
> }
>
> /**
> * @param uri the uri to set
> */
> public void setUri(URI uri) {
> this.uri = uri;
> }
>
> public FileFilter getFilter() {
> return filter;
> }
>
> /**
> * Sets the optional filter to choose which files to process
> */
> public void setFilter(FileFilter filter) {
> this.filter = filter;
> }
>
> /**
> * Returns whether or not we should delete the file when its processed
> */
> public boolean isDeleteFile() {
> return deleteFile;
> }
>
> /**
> * @param deleteFile
> */
> public void setDeleteFile(boolean deleteFile) {
> this.deleteFile = deleteFile;
> }
>
> /**
> * @return
> */
> public boolean isRecursive() {
> return recursive;
> }
>
> /**
> * @param recursive
> */
> public void setRecursive(boolean recursive) {
> this.recursive = recursive;
> }
>
> /**
> * @return
> */
> public FileMarshaler getMarshaler() {
> return marshaler;
> }
>
> /**
> * @param marshaler
> */
> public void setMarshaler(FileMarshaler marshaler) {
> this.marshaler = marshaler;
> }
>
> // Implementation methods
>
>
> //-------------------------------------------------------------------------
>
>
> /**
> * @param fileOrDirectory
> * @throws Exception
> */
> protected void pollFileOrDirectory(String fileOrDirectory) throws
> Exception {
> JschSessionWrapper sftp = borrowClient();
> try {
> logger.debug("Polling directory " + fileOrDirectory);
> Channel channel = sftp.getSession().openChannel("sftp");
> channel.connect();
> ChannelSftp sftpChannel = (ChannelSftp)channel;
> pollFileOrDirectory(sftpChannel, fileOrDirectory, true);
> }
> finally {
> returnClient(sftp);
> }
> }
>
> /**
> * @param sftpChannel
> * @param fileOrDirectory
> * @param processDir
> * @throws Exception
> */
> protected void pollFileOrDirectory(ChannelSftp sftpChannel, String
> fileOrDirectory, boolean processDir) throws Exception {
> Vector files = sftpChannel.ls(fileOrDirectory);
> if(files != null && !files.isEmpty()){
> for(int i=0; i < files.size(); i++){
> LsEntry entry = (LsEntry)files.elementAt(i);
> SftpATTRS attrs = entry.getAttrs();
> String name = entry.getLongname();
>
> // Ignore "." and ".."
> if (name.equals(".") || name.equals("..")) {
> continue;
> }
>
> String file = fileOrDirectory + "/" + name;
> if (!attrs.isDir()) { // This is a file, process
> it.
> if (getFilter() == null || getFilter().accept(new
> File(file))) {
> pollFile(file); // Process the file.
> }
> }
> else if (processDir) { // Only process
> directories if
> processDir is true
> logger.debug("Polling directory " + file);
> pollFileOrDirectory(sftpChannel, file,
> isRecursive());
> }
> else {
> logger.debug("Skipping directory " + file);
> }
> }
> }
> }
>
> /**
> * @param file
> */
> protected void pollFile(final String file) {
> logger.debug("Scheduling file " + file + " for processing");
> getExecutor().execute(new Runnable() {
> public void run() {
> final Lock lock = lockManager.getLock(file);
> if (lock.tryLock()) {
> boolean unlock = true;
> try {
> unlock = processFileAndDelete(file);
> }
> finally {
> if (unlock) {
> lock.unlock();
> }
> }
> }
> }
> });
> }
>
> /**
> * @param file
> * @return
> */
> protected boolean processFileAndDelete(String file) {
> JschSessionWrapper sftp = null;
> boolean unlock = true;
> try {
> sftp = borrowClient();
> Channel channel = sftp.getSession().openChannel("sftp");
> channel.connect();
> ChannelSftp sftpChannel = (ChannelSftp)channel;
>
> // Process the file. If processing fails, an exception should
> be
> thrown.
> logger.debug("Processing file " + file);
> processFile(sftpChannel, file);
>
> // Processing is succesfull, we should not unlock until the
> file
> has been deleted.
> unlock = false;
> if (isDeleteFile()) {
> sftpChannel.rm(file);
> unlock = true;
> }
> }
> catch (Exception e) {
> logger.error("Failed to process file: " + file + ". Reason: "
> +
> e, e);
> }
> finally {
> returnClient(sftp);
> }
> return unlock;
> }
>
> /**
> * @param sftpChannel
> * @param file
> * @throws Exception
> */
> protected void processFile(ChannelSftp sftpChannel, String file)
> throws
> Exception {
> InputStream in = sftpChannel.get(file);
> InOnly exchange = getExchangeFactory().createInOnlyExchange();
> configureExchangeTarget(exchange);
> NormalizedMessage message = exchange.createMessage();
> exchange.setInMessage(message);
> marshaler.readMessage(exchange, message, in, file);
> sendSync(exchange);
> in.close();
> if (exchange.getStatus() == ExchangeStatus.ERROR) {
> Exception e = exchange.getError();
> if (e == null) {
> e = new JBIException("Unkown error");
> }
> throw e;
> }
> }
>
> /* (non-Javadoc)
> * @see
> org.apache.servicemix.common.endpoints.ConsumerEndpoint#getLocationURI()
> */
> public String getLocationURI() {
> return uri.toString();
> }
>
> /* (non-Javadoc)
> * @see
> org.apache.servicemix.common.ExchangeProcessor#process(
> javax.jbi.messaging.MessageExchange)
> */
> public void process(MessageExchange exchange) throws Exception {
> // Do nothing. In our case, this method should never be called
> // as we only send synchronous InOnly exchange
> }
>
> /**
> * Retrieve ssh session from a pool.
> *
> * @return
> * @throws JBIException
> */
> protected JschSessionWrapper borrowClient() throws JBIException {
> JschSessionWrapper session = null;
> try {
> session =
> (JschSessionWrapper)getClientPool().borrowObject(sshServerInfo);
> }
> catch (Exception e) {
> throw new JBIException("Failed to retrieve Jsch session from
> pool.", e);
> }
>
> if (session != null && session.getSession().isConnected()) {
> logger.info("Connected to jsch session at " +
> session.getSession().getUserName() + "@" + session.getSession().getHost()
> +
> ":" + session.getSession().getPort());
> }
> return session;
> }
>
> /**
> * @param client
> */
> protected void returnClient(JschSessionWrapper client) {
> if (client != null) {
> try {
> getClientPool().returnObject(client, sshServerInfo);
> }
> catch (Exception e) {
> logger.error("Failed to return client to pool: " + e, e);
> }
> }
> }
> }
> --
> View this message in context:
> http://www.nabble.com/SFTP-binding-component-with-jcraft-tf3953802s12049.html#a11218136
> Sent from the ServiceMix - User mailing list archive at Nabble.com.
>
>
--
Cheers,
Guillaume Nodet
------------------------
Principal Engineer, IONA
Blog: http://gnodet.blogspot.com/