You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@servicemix.apache.org by "Gert Vanthienen (JIRA)" <ji...@apache.org> on 2007/07/16 09:46:45 UTC

[jira] Created: (SM-1004) File poller deletes files, even if errors occur while processing

File poller deletes files, even if errors occur while processing
----------------------------------------------------------------

                 Key: SM-1004
                 URL: https://issues.apache.org/activemq/browse/SM-1004
             Project: ServiceMix
          Issue Type: Bug
          Components: servicemix-file
    Affects Versions: 3.1.1
            Reporter: Gert Vanthienen


Other poller components (e.g. FTP poller) leave the message where it is in case of error/fault, so it can be retried afterwards (e.g. services unavailable, XML message content not yet complete, ...).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


RE: [jira] Created: (SM-1004) File poller deletes files, even iferrorsoccur while processing

Posted by ra...@wipro.com.
Yes you are right Gret.. We need to throw the exception based on the
sendsync results...

Regards,
Rabi Mishra, 
http://rabisblog.blogspot.com/
c++; /* this makes c bigger but returns the old value */ 
-----Original Message-----
From: Gert Vanthienen [mailto:gert.vanthienen@skynet.be] 
Sent: Monday, July 16, 2007 2:58 PM
To: servicemix-dev@geronimo.apache.org
Subject: Re: [jira] Created: (SM-1004) File poller deletes files, even
iferrorsoccur while processing

Rabi,


Shouldn't we add something like
	if (exchange.getStatus() == ExchangeStatus.ERROR) {
             Exception e = exchange.getError();
             if (e == null) {
                 e = new JBIException("Unkown error");
             }
             throw e;
         }
in the processFile() method as well, to make sure that an exception is
thrown for every MessageExchange that ends in error?


Gert

rabi.mishra@wipro.com wrote:
> Yes,
> 
> It is the lock handling... 
> 
> 1. Every polling therad creates lock with the file uri and then
> processes the file..
> 2. Lock is released once the file is processed and deleted. (Managed
by
> unlock flag)
> 3. If there is any exception in processing the file the lock is not
> released.
> 
> I have just changed it in similar lines with FtpPoller.
> 
> 
> 
> Regards,
> Rabi Mishra, 
> http://rabisblog.blogspot.com/
> c++; /* this makes c bigger but returns the old value */ 
> -----Original Message-----
> From: Gert Vanthienen [mailto:gert.vanthienen@skynet.be] 
> Sent: Monday, July 16, 2007 2:05 PM
> To: servicemix-dev@geronimo.apache.org
> Subject: Re: [jira] Created: (SM-1004) File poller deletes files, even
> iferrors occur while processing
> 
> Rabi,
> 
> 
> Thank you for helping out with this!  A lot of this patch seems to be
> formatting changes, but it already saves me the time/effort to figure
> things out.  You mainly changed the lock handling, right?
> 
> Can you explain what you have changed?  Just curious to know how
things
> work...
> 
> 
> Gert
> 
> rabi.mishra@wipro.com wrote:
>> Gret,
>>
>> Are you working on this patch? I had fixed it sometime back and
>> attaching the patch..
>>
>> -----Original Message-----
>> From: Gert Vanthienen (JIRA) [mailto:jira@apache.org] 
>> Sent: Monday, July 16, 2007 1:17 PM
>> To: servicemix-dev@geronimo.apache.org
>> Subject: [jira] Created: (SM-1004) File poller deletes files, even if
>> errors occur while processing
>>
>> File poller deletes files, even if errors occur while processing
>> ----------------------------------------------------------------
>>
>>                  Key: SM-1004
>>                  URL:
> https://issues.apache.org/activemq/browse/SM-1004
>>              Project: ServiceMix
>>           Issue Type: Bug
>>           Components: servicemix-file
>>     Affects Versions: 3.1.1
>>             Reporter: Gert Vanthienen
>>
>>
>> Other poller components (e.g. FTP poller) leave the message where it
> is
>> in case of error/fault, so it can be retried afterwards (e.g.
services
>> unavailable, XML message content not yet complete, ...).
>>
>> --
>> This message is automatically generated by JIRA.
>> -
>> You can reply to this email to add a comment to the issue online.
>>
>>
>>
>>
>> The information contained in this electronic message and any
> attachments to this message are intended for the exclusive use of the
> addressee(s) and may contain proprietary, confidential or privileged
> information. If you are not the intended recipient, you should not
> disseminate, distribute or copy this e-mail. Please notify the sender
> immediately and destroy all copies of this message and any
attachments. 
>> WARNING: Computer viruses can be transmitted via email. The recipient
> should check this email and any attachments for the presence of
viruses.
> The company accepts no liability for any damage caused by any virus
> transmitted by this email.
>>  
>> www.wipro.com
>>
>>
>>
>
------------------------------------------------------------------------
>> Index:
>
E:/Rabi/work/workspace/servicemix/deployables/bindingcomponents/servicem
>
ix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
>> ===================================================================
>> ---
>
E:/Rabi/work/workspace/servicemix/deployables/bindingcomponents/servicem
>
ix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
> (revision 555678)
>> +++
>
E:/Rabi/work/workspace/servicemix/deployables/bindingcomponents/servicem
>
ix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
> (working copy)
>> @@ -41,217 +41,229 @@
>>  import java.util.concurrent.locks.Lock;
>>  
>>  /**
>> - * A polling endpoint which looks for a file or files in a directory
>> - * and sends the files into the JBI bus as messages, deleting the
> files
>> - * by default when they are processed.
>> - *
>> + * A polling endpoint which looks for a file or files in a directory
> and sends
>> + * the files into the JBI bus as messages, deleting the files by
> default when
>> + * they are processed.
>> + * 
>>   * @org.apache.xbean.XBean element="poller"
>> - *
>> + * 
>>   * @version $Revision$
>>   */
>> -public class FilePollerEndpoint extends PollingEndpoint implements
> FileEndpointType {
>> +public class FilePollerEndpoint extends PollingEndpoint implements
>> +		FileEndpointType {
>>  
>> -    private File file;
>> -    private FileFilter filter;
>> -    private boolean deleteFile = true;
>> -    private boolean recursive = true;
>> -    private boolean autoCreateDirectory = true;
>> -    private FileMarshaler marshaler = new DefaultFileMarshaler();
>> -    private LockManager lockManager;
>> +	private File file;
>>  
>> -    public FilePollerEndpoint() {
>> -    }
>> +	private FileFilter filter;
>>  
>> -    public FilePollerEndpoint(ServiceUnit serviceUnit, QName
service,
> String endpoint) {
>> -        super(serviceUnit, service, endpoint);
>> -    }
>> +	private boolean deleteFile = true;
>>  
>> -    public FilePollerEndpoint(DefaultComponent component,
> ServiceEndpoint endpoint) {
>> -        super(component, endpoint);
>> -    }
>> +	private boolean recursive = true;
>>  
>> -    public void poll() throws Exception {
>> -        pollFileOrDirectory(file);
>> -    }
>> +	private boolean autoCreateDirectory = true;
>>  
>> -    public void validate() throws DeploymentException {
>> -        super.validate();
>> -        if (file == null) {
>> -            throw new DeploymentException("You must specify a file
> property");
>> -        }
>> -        if (isAutoCreateDirectory() && !file.exists()) {
>> -            file.mkdirs();
>> -        }
>> -        if (lockManager == null) {
>> -            lockManager = createLockManager();
>> -        }
>> -    }
>> -    
>> -    protected LockManager createLockManager() {
>> -        return new SimpleLockManager();
>> -    }
>> +	private FileMarshaler marshaler = new DefaultFileMarshaler();
>>  
>> +	private LockManager lockManager;
>>  
>> -    // Properties
>> -
>
//----------------------------------------------------------------------
> ---
>> -    public File getFile() {
>> -        return file;
>> -    }
>> +	public FilePollerEndpoint() {
>> +	}
>>  
>> -    /**
>> -     * Sets the file to poll, which can be a directory or a file.
>> -     *
>> -     * @param file
>> -     */
>> -    public void setFile(File file) {
>> -        this.file = file;
>> -    }
>> +	public FilePollerEndpoint(ServiceUnit serviceUnit, QName
> service,
>> +			String endpoint) {
>> +		super(serviceUnit, service, endpoint);
>> +	}
>>  
>> -    /**
>> -     * @return the lockManager
>> -     */
>> -    public LockManager getLockManager() {
>> -        return lockManager;
>> -    }
>> +	public FilePollerEndpoint(DefaultComponent component,
>> +			ServiceEndpoint endpoint) {
>> +		super(component, endpoint);
>> +	}
>>  
>> -    /**
>> -     * @param lockManager the lockManager to set
>> -     */
>> -    public void setLockManager(LockManager lockManager) {
>> -        this.lockManager = lockManager;
>> -    }
>> +	public void poll() throws Exception {
>> +		pollFileOrDirectory(file);
>> +	}
>>  
>> -    public FileFilter getFilter() {
>> -        return filter;
>> -    }
>> +	public void validate() throws DeploymentException {
>> +		super.validate();
>> +		if (file == null) {
>> +			throw new DeploymentException("You must specify
> a file property");
>> +		}
>> +		if (isAutoCreateDirectory() && !file.exists()) {
>> +			file.mkdirs();
>> +		}
>> +		if (lockManager == null) {
>> +			lockManager = createLockManager();
>> +		}
>> +	}
>>  
>> -    /**
>> -     * Sets the optional filter to choose which files to process
>> -     */
>> -    public void setFilter(FileFilter filter) {
>> -        this.filter = filter;
>> -    }
>> +	protected LockManager createLockManager() {
>> +		return new SimpleLockManager();
>> +	}
>>  
>> -    /**
>> -     * Returns whether or not we should delete the file when its
> processed
>> -     */
>> -    public boolean isDeleteFile() {
>> -        return deleteFile;
>> -    }
>> +	// Properties
>> +	//
>
------------------------------------------------------------------------
> -
>> +	public File getFile() {
>> +		return file;
>> +	}
>>  
>> -    public void setDeleteFile(boolean deleteFile) {
>> -        this.deleteFile = deleteFile;
>> -    }
>> +	/**
>> +	 * Sets the file to poll, which can be a directory or a file.
>> +	 * 
>> +	 * @param file
>> +	 */
>> +	public void setFile(File file) {
>> +		this.file = file;
>> +	}
>>  
>> -    public boolean isRecursive() {
>> -        return recursive;
>> -    }
>> +	/**
>> +	 * @return the lockManager
>> +	 */
>> +	public LockManager getLockManager() {
>> +		return lockManager;
>> +	}
>>  
>> -    public void setRecursive(boolean recursive) {
>> -        this.recursive = recursive;
>> -    }
>> +	/**
>> +	 * @param lockManager
>> +	 *            the lockManager to set
>> +	 */
>> +	public void setLockManager(LockManager lockManager) {
>> +		this.lockManager = lockManager;
>> +	}
>>  
>> -    public boolean isAutoCreateDirectory() {
>> -        return autoCreateDirectory;
>> -    }
>> +	public FileFilter getFilter() {
>> +		return filter;
>> +	}
>>  
>> -    public void setAutoCreateDirectory(boolean autoCreateDirectory)
{
>> -        this.autoCreateDirectory = autoCreateDirectory;
>> -    }
>> +	/**
>> +	 * Sets the optional filter to choose which files to process
>> +	 */
>> +	public void setFilter(FileFilter filter) {
>> +		this.filter = filter;
>> +	}
>>  
>> -    public FileMarshaler getMarshaler() {
>> -        return marshaler;
>> -    }
>> +	/**
>> +	 * Returns whether or not we should delete the file when its
> processed
>> +	 */
>> +	public boolean isDeleteFile() {
>> +		return deleteFile;
>> +	}
>>  
>> -    public void setMarshaler(FileMarshaler marshaler) {
>> -        this.marshaler = marshaler;
>> -    }
>> +	public void setDeleteFile(boolean deleteFile) {
>> +		this.deleteFile = deleteFile;
>> +	}
>>  
>> -    // Implementation methods
>> -
>
//----------------------------------------------------------------------
> ---
>> +	public boolean isRecursive() {
>> +		return recursive;
>> +	}
>>  
>> +	public void setRecursive(boolean recursive) {
>> +		this.recursive = recursive;
>> +	}
>>  
>> -    protected void pollFileOrDirectory(File fileOrDirectory) {
>> -        pollFileOrDirectory(fileOrDirectory, true);
>> -    }
>> +	public boolean isAutoCreateDirectory() {
>> +		return autoCreateDirectory;
>> +	}
>>  
>> -    protected void pollFileOrDirectory(File fileOrDirectory, boolean
> processDir) {
>> -        if (!fileOrDirectory.isDirectory()) {
>> -            pollFile(fileOrDirectory); // process the file
>> -        }
>> -        else if (processDir) {
>> -            logger.debug("Polling directory " + fileOrDirectory);
>> -            File[] files = fileOrDirectory.listFiles(getFilter());
>> -            for (int i = 0; i < files.length; i++) {
>> -                pollFileOrDirectory(files[i], isRecursive()); //
> self-recursion
>> -            }
>> -        }
>> -        else {
>> -            logger.debug("Skipping directory " + fileOrDirectory);
>> -        }
>> -    }
>> +	public void setAutoCreateDirectory(boolean autoCreateDirectory)
> {
>> +		this.autoCreateDirectory = autoCreateDirectory;
>> +	}
>>  
>> -    protected void pollFile(final File aFile) {
>> -        if (logger.isDebugEnabled()) {
>> -            logger.debug("Scheduling file " + aFile + " for
> processing");
>> -        }
>> -        getExecutor().execute(new Runnable() {
>> -            public void run() {
>> -                String uri =
> file.toURI().relativize(aFile.toURI()).toString();
>> -                Lock lock = lockManager.getLock(uri);
>> -                if (lock.tryLock()) {
>> -                    try {
>> -                        processFileAndDelete(aFile);
>> -                    }
>> -                    finally {
>> -                        lock.unlock();
>> -                    }
>> -                } else {
>> -                    if (logger.isDebugEnabled()) {
>> -                        logger.debug("Unable to acquire lock on " +
> aFile);
>> -                    }
>> -                }
>> -            }
>> -        });
>> -    }
>> +	public FileMarshaler getMarshaler() {
>> +		return marshaler;
>> +	}
>>  
>> -    protected void processFileAndDelete(File aFile) {
>> -        try {
>> -            if (logger.isDebugEnabled()) {
>> -                logger.debug("Processing file " + aFile);
>> -            }
>> -            if (aFile.exists()) {
>> -                processFile(aFile);
>> -                if (isDeleteFile()) {
>> -                    if (!aFile.delete()) {
>> -                        throw new IOException("Could not delete file
> " + aFile);
>> -                    }
>> -                }
>> -            }
>> -        }
>> -        catch (Exception e) {
>> -            logger.error("Failed to process file: " + aFile + ".
> Reason: " + e, e);
>> -        }
>> -    }
>> +	public void setMarshaler(FileMarshaler marshaler) {
>> +		this.marshaler = marshaler;
>> +	}
>>  
>> -    protected void processFile(File aFile) throws Exception {
>> -        String name = aFile.getCanonicalPath();
>> -        InputStream in = new BufferedInputStream(new
> FileInputStream(aFile));
>> -        InOnly exchange =
> getExchangeFactory().createInOnlyExchange();
>> -        configureExchangeTarget(exchange);
>> -        NormalizedMessage message = exchange.createMessage();
>> -        exchange.setInMessage(message);
>> -        marshaler.readMessage(exchange, message, in, name);
>> -        sendSync(exchange);
>> -        in.close();
>> -    }
>> +	// Implementation methods
>> +	//
>
------------------------------------------------------------------------
> -
>>  
>> -    public String getLocationURI() {
>> -        return file.toURI().toString();
>> -    }
>> +	protected void pollFileOrDirectory(File fileOrDirectory) {
>> +		pollFileOrDirectory(fileOrDirectory, true);
>> +	}
>>  
>> -    public void process(MessageExchange exchange) throws Exception {
>> -        // Do nothing. In our case, this method should never be
> called
>> -        // as we only send synchronous InOnly exchange
>> -    }
>> +	protected void pollFileOrDirectory(File fileOrDirectory, boolean
> processDir) {
>> +		if (!fileOrDirectory.isDirectory()) {
>> +			pollFile(fileOrDirectory); // process the file
>> +		} else if (processDir) {
>> +			logger.debug("Polling directory " +
> fileOrDirectory);
>> +			File[] files =
> fileOrDirectory.listFiles(getFilter());
>> +			for (int i = 0; i < files.length; i++) {
>> +				pollFileOrDirectory(files[i],
> isRecursive()); // self-recursion
>> +			}
>> +		} else {
>> +			logger.debug("Skipping directory " +
> fileOrDirectory);
>> +		}
>> +	}
>> +
>> +	protected void pollFile(final File aFile) {
>> +		if (logger.isDebugEnabled()) {
>> +			logger.debug("Scheduling file " + aFile + " for
> processing");
>> +		}
>> +		getExecutor().execute(new Runnable() {
>> +			public void run() {
>> +				String uri =
> file.toURI().relativize(aFile.toURI()).toString();
>> +				Lock lock = lockManager.getLock(uri);
>> +				if (lock.tryLock()) {
>> +					boolean unlock = true;
>> +					try {
>> +						unlock =
> processFileAndDelete(aFile);
>> +					} finally {
>> +						if (unlock) {
>> +							lock.unlock();
>> +						}
>> +					}
>> +				} else {
>> +					if (logger.isDebugEnabled()) {
>> +						logger.debug("Unable to
> acquire lock on " + aFile);
>> +					}
>> +				}
>> +			}
>> +		});
>> +	}
>> +
>> +	protected boolean processFileAndDelete(File aFile) {
>> +		boolean unlock = true;
>> +		try {
>> +			if (logger.isDebugEnabled()) {
>> +				logger.debug("Processing file " +
> aFile);
>> +			}
>> +			if (aFile.exists()) {
>> +				processFile(aFile);
>> +				unlock = false;
>> +				if (isDeleteFile()) {
>> +					if (!aFile.delete()) {
>> +						throw new
> IOException("Could not delete file " + aFile);
>> +					}
>> +					unlock = true;
>> +				}
>> +			}
>> +		} catch (Exception e) {
>> +			logger.error("Failed to process file: " + aFile
> + ". Reason: " + e,
>> +					e);
>> +		}
>> +		return unlock;
>> +	}
>> +
>> +	protected void processFile(File aFile) throws Exception {
>> +		String name = aFile.getCanonicalPath();
>> +		InputStream in = new BufferedInputStream(new
> FileInputStream(aFile));
>> +		InOnly exchange =
> getExchangeFactory().createInOnlyExchange();
>> +		configureExchangeTarget(exchange);
>> +		NormalizedMessage message = exchange.createMessage();
>> +		exchange.setInMessage(message);
>> +		marshaler.readMessage(exchange, message, in, name);
>> +		sendSync(exchange);
>> +		in.close();
>> +	}
>> +
>> +	public String getLocationURI() {
>> +		return file.toURI().toString();
>> +	}
>> +
>> +	public void process(MessageExchange exchange) throws Exception {
>> +		// Do nothing. In our case, this method should never be
> called
>> +		// as we only send synchronous InOnly exchange
>> +	}
>>  }
> 
> 
> The information contained in this electronic message and any
attachments to this message are intended for the exclusive use of the
addressee(s) and may contain proprietary, confidential or privileged
information. If you are not the intended recipient, you should not
disseminate, distribute or copy this e-mail. Please notify the sender
immediately and destroy all copies of this message and any attachments. 
> 
> WARNING: Computer viruses can be transmitted via email. The recipient
should check this email and any attachments for the presence of viruses.
The company accepts no liability for any damage caused by any virus
transmitted by this email.
>  
> www.wipro.com
> 
> 


The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. 

WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email.
 
www.wipro.com

Re: [jira] Created: (SM-1004) File poller deletes files, even iferrorsoccur while processing

Posted by Gert Vanthienen <ge...@skynet.be>.
Rabi,


Shouldn't we add something like
	if (exchange.getStatus() == ExchangeStatus.ERROR) {
             Exception e = exchange.getError();
             if (e == null) {
                 e = new JBIException("Unkown error");
             }
             throw e;
         }
in the processFile() method as well, to make sure that an exception is 
thrown for every MessageExchange that ends in error?


Gert

rabi.mishra@wipro.com wrote:
> Yes,
> 
> It is the lock handling... 
> 
> 1. Every polling therad creates lock with the file uri and then
> processes the file..
> 2. Lock is released once the file is processed and deleted. (Managed by
> unlock flag)
> 3. If there is any exception in processing the file the lock is not
> released.
> 
> I have just changed it in similar lines with FtpPoller.
> 
> 
> 
> Regards,
> Rabi Mishra, 
> http://rabisblog.blogspot.com/
> c++; /* this makes c bigger but returns the old value */ 
> -----Original Message-----
> From: Gert Vanthienen [mailto:gert.vanthienen@skynet.be] 
> Sent: Monday, July 16, 2007 2:05 PM
> To: servicemix-dev@geronimo.apache.org
> Subject: Re: [jira] Created: (SM-1004) File poller deletes files, even
> iferrors occur while processing
> 
> Rabi,
> 
> 
> Thank you for helping out with this!  A lot of this patch seems to be
> formatting changes, but it already saves me the time/effort to figure
> things out.  You mainly changed the lock handling, right?
> 
> Can you explain what you have changed?  Just curious to know how things
> work...
> 
> 
> Gert
> 
> rabi.mishra@wipro.com wrote:
>> Gret,
>>
>> Are you working on this patch? I had fixed it sometime back and
>> attaching the patch..
>>
>> -----Original Message-----
>> From: Gert Vanthienen (JIRA) [mailto:jira@apache.org] 
>> Sent: Monday, July 16, 2007 1:17 PM
>> To: servicemix-dev@geronimo.apache.org
>> Subject: [jira] Created: (SM-1004) File poller deletes files, even if
>> errors occur while processing
>>
>> File poller deletes files, even if errors occur while processing
>> ----------------------------------------------------------------
>>
>>                  Key: SM-1004
>>                  URL:
> https://issues.apache.org/activemq/browse/SM-1004
>>              Project: ServiceMix
>>           Issue Type: Bug
>>           Components: servicemix-file
>>     Affects Versions: 3.1.1
>>             Reporter: Gert Vanthienen
>>
>>
>> Other poller components (e.g. FTP poller) leave the message where it
> is
>> in case of error/fault, so it can be retried afterwards (e.g. services
>> unavailable, XML message content not yet complete, ...).
>>
>> --
>> This message is automatically generated by JIRA.
>> -
>> You can reply to this email to add a comment to the issue online.
>>
>>
>>
>>
>> The information contained in this electronic message and any
> attachments to this message are intended for the exclusive use of the
> addressee(s) and may contain proprietary, confidential or privileged
> information. If you are not the intended recipient, you should not
> disseminate, distribute or copy this e-mail. Please notify the sender
> immediately and destroy all copies of this message and any attachments. 
>> WARNING: Computer viruses can be transmitted via email. The recipient
> should check this email and any attachments for the presence of viruses.
> The company accepts no liability for any damage caused by any virus
> transmitted by this email.
>>  
>> www.wipro.com
>>
>>
>>
> ------------------------------------------------------------------------
>> Index:
> E:/Rabi/work/workspace/servicemix/deployables/bindingcomponents/servicem
> ix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
>> ===================================================================
>> ---
> E:/Rabi/work/workspace/servicemix/deployables/bindingcomponents/servicem
> ix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
> (revision 555678)
>> +++
> E:/Rabi/work/workspace/servicemix/deployables/bindingcomponents/servicem
> ix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
> (working copy)
>> @@ -41,217 +41,229 @@
>>  import java.util.concurrent.locks.Lock;
>>  
>>  /**
>> - * A polling endpoint which looks for a file or files in a directory
>> - * and sends the files into the JBI bus as messages, deleting the
> files
>> - * by default when they are processed.
>> - *
>> + * A polling endpoint which looks for a file or files in a directory
> and sends
>> + * the files into the JBI bus as messages, deleting the files by
> default when
>> + * they are processed.
>> + * 
>>   * @org.apache.xbean.XBean element="poller"
>> - *
>> + * 
>>   * @version $Revision$
>>   */
>> -public class FilePollerEndpoint extends PollingEndpoint implements
> FileEndpointType {
>> +public class FilePollerEndpoint extends PollingEndpoint implements
>> +		FileEndpointType {
>>  
>> -    private File file;
>> -    private FileFilter filter;
>> -    private boolean deleteFile = true;
>> -    private boolean recursive = true;
>> -    private boolean autoCreateDirectory = true;
>> -    private FileMarshaler marshaler = new DefaultFileMarshaler();
>> -    private LockManager lockManager;
>> +	private File file;
>>  
>> -    public FilePollerEndpoint() {
>> -    }
>> +	private FileFilter filter;
>>  
>> -    public FilePollerEndpoint(ServiceUnit serviceUnit, QName service,
> String endpoint) {
>> -        super(serviceUnit, service, endpoint);
>> -    }
>> +	private boolean deleteFile = true;
>>  
>> -    public FilePollerEndpoint(DefaultComponent component,
> ServiceEndpoint endpoint) {
>> -        super(component, endpoint);
>> -    }
>> +	private boolean recursive = true;
>>  
>> -    public void poll() throws Exception {
>> -        pollFileOrDirectory(file);
>> -    }
>> +	private boolean autoCreateDirectory = true;
>>  
>> -    public void validate() throws DeploymentException {
>> -        super.validate();
>> -        if (file == null) {
>> -            throw new DeploymentException("You must specify a file
> property");
>> -        }
>> -        if (isAutoCreateDirectory() && !file.exists()) {
>> -            file.mkdirs();
>> -        }
>> -        if (lockManager == null) {
>> -            lockManager = createLockManager();
>> -        }
>> -    }
>> -    
>> -    protected LockManager createLockManager() {
>> -        return new SimpleLockManager();
>> -    }
>> +	private FileMarshaler marshaler = new DefaultFileMarshaler();
>>  
>> +	private LockManager lockManager;
>>  
>> -    // Properties
>> -
> //----------------------------------------------------------------------
> ---
>> -    public File getFile() {
>> -        return file;
>> -    }
>> +	public FilePollerEndpoint() {
>> +	}
>>  
>> -    /**
>> -     * Sets the file to poll, which can be a directory or a file.
>> -     *
>> -     * @param file
>> -     */
>> -    public void setFile(File file) {
>> -        this.file = file;
>> -    }
>> +	public FilePollerEndpoint(ServiceUnit serviceUnit, QName
> service,
>> +			String endpoint) {
>> +		super(serviceUnit, service, endpoint);
>> +	}
>>  
>> -    /**
>> -     * @return the lockManager
>> -     */
>> -    public LockManager getLockManager() {
>> -        return lockManager;
>> -    }
>> +	public FilePollerEndpoint(DefaultComponent component,
>> +			ServiceEndpoint endpoint) {
>> +		super(component, endpoint);
>> +	}
>>  
>> -    /**
>> -     * @param lockManager the lockManager to set
>> -     */
>> -    public void setLockManager(LockManager lockManager) {
>> -        this.lockManager = lockManager;
>> -    }
>> +	public void poll() throws Exception {
>> +		pollFileOrDirectory(file);
>> +	}
>>  
>> -    public FileFilter getFilter() {
>> -        return filter;
>> -    }
>> +	public void validate() throws DeploymentException {
>> +		super.validate();
>> +		if (file == null) {
>> +			throw new DeploymentException("You must specify
> a file property");
>> +		}
>> +		if (isAutoCreateDirectory() && !file.exists()) {
>> +			file.mkdirs();
>> +		}
>> +		if (lockManager == null) {
>> +			lockManager = createLockManager();
>> +		}
>> +	}
>>  
>> -    /**
>> -     * Sets the optional filter to choose which files to process
>> -     */
>> -    public void setFilter(FileFilter filter) {
>> -        this.filter = filter;
>> -    }
>> +	protected LockManager createLockManager() {
>> +		return new SimpleLockManager();
>> +	}
>>  
>> -    /**
>> -     * Returns whether or not we should delete the file when its
> processed
>> -     */
>> -    public boolean isDeleteFile() {
>> -        return deleteFile;
>> -    }
>> +	// Properties
>> +	//
> ------------------------------------------------------------------------
> -
>> +	public File getFile() {
>> +		return file;
>> +	}
>>  
>> -    public void setDeleteFile(boolean deleteFile) {
>> -        this.deleteFile = deleteFile;
>> -    }
>> +	/**
>> +	 * Sets the file to poll, which can be a directory or a file.
>> +	 * 
>> +	 * @param file
>> +	 */
>> +	public void setFile(File file) {
>> +		this.file = file;
>> +	}
>>  
>> -    public boolean isRecursive() {
>> -        return recursive;
>> -    }
>> +	/**
>> +	 * @return the lockManager
>> +	 */
>> +	public LockManager getLockManager() {
>> +		return lockManager;
>> +	}
>>  
>> -    public void setRecursive(boolean recursive) {
>> -        this.recursive = recursive;
>> -    }
>> +	/**
>> +	 * @param lockManager
>> +	 *            the lockManager to set
>> +	 */
>> +	public void setLockManager(LockManager lockManager) {
>> +		this.lockManager = lockManager;
>> +	}
>>  
>> -    public boolean isAutoCreateDirectory() {
>> -        return autoCreateDirectory;
>> -    }
>> +	public FileFilter getFilter() {
>> +		return filter;
>> +	}
>>  
>> -    public void setAutoCreateDirectory(boolean autoCreateDirectory) {
>> -        this.autoCreateDirectory = autoCreateDirectory;
>> -    }
>> +	/**
>> +	 * Sets the optional filter to choose which files to process
>> +	 */
>> +	public void setFilter(FileFilter filter) {
>> +		this.filter = filter;
>> +	}
>>  
>> -    public FileMarshaler getMarshaler() {
>> -        return marshaler;
>> -    }
>> +	/**
>> +	 * Returns whether or not we should delete the file when its
> processed
>> +	 */
>> +	public boolean isDeleteFile() {
>> +		return deleteFile;
>> +	}
>>  
>> -    public void setMarshaler(FileMarshaler marshaler) {
>> -        this.marshaler = marshaler;
>> -    }
>> +	public void setDeleteFile(boolean deleteFile) {
>> +		this.deleteFile = deleteFile;
>> +	}
>>  
>> -    // Implementation methods
>> -
> //----------------------------------------------------------------------
> ---
>> +	public boolean isRecursive() {
>> +		return recursive;
>> +	}
>>  
>> +	public void setRecursive(boolean recursive) {
>> +		this.recursive = recursive;
>> +	}
>>  
>> -    protected void pollFileOrDirectory(File fileOrDirectory) {
>> -        pollFileOrDirectory(fileOrDirectory, true);
>> -    }
>> +	public boolean isAutoCreateDirectory() {
>> +		return autoCreateDirectory;
>> +	}
>>  
>> -    protected void pollFileOrDirectory(File fileOrDirectory, boolean
> processDir) {
>> -        if (!fileOrDirectory.isDirectory()) {
>> -            pollFile(fileOrDirectory); // process the file
>> -        }
>> -        else if (processDir) {
>> -            logger.debug("Polling directory " + fileOrDirectory);
>> -            File[] files = fileOrDirectory.listFiles(getFilter());
>> -            for (int i = 0; i < files.length; i++) {
>> -                pollFileOrDirectory(files[i], isRecursive()); //
> self-recursion
>> -            }
>> -        }
>> -        else {
>> -            logger.debug("Skipping directory " + fileOrDirectory);
>> -        }
>> -    }
>> +	public void setAutoCreateDirectory(boolean autoCreateDirectory)
> {
>> +		this.autoCreateDirectory = autoCreateDirectory;
>> +	}
>>  
>> -    protected void pollFile(final File aFile) {
>> -        if (logger.isDebugEnabled()) {
>> -            logger.debug("Scheduling file " + aFile + " for
> processing");
>> -        }
>> -        getExecutor().execute(new Runnable() {
>> -            public void run() {
>> -                String uri =
> file.toURI().relativize(aFile.toURI()).toString();
>> -                Lock lock = lockManager.getLock(uri);
>> -                if (lock.tryLock()) {
>> -                    try {
>> -                        processFileAndDelete(aFile);
>> -                    }
>> -                    finally {
>> -                        lock.unlock();
>> -                    }
>> -                } else {
>> -                    if (logger.isDebugEnabled()) {
>> -                        logger.debug("Unable to acquire lock on " +
> aFile);
>> -                    }
>> -                }
>> -            }
>> -        });
>> -    }
>> +	public FileMarshaler getMarshaler() {
>> +		return marshaler;
>> +	}
>>  
>> -    protected void processFileAndDelete(File aFile) {
>> -        try {
>> -            if (logger.isDebugEnabled()) {
>> -                logger.debug("Processing file " + aFile);
>> -            }
>> -            if (aFile.exists()) {
>> -                processFile(aFile);
>> -                if (isDeleteFile()) {
>> -                    if (!aFile.delete()) {
>> -                        throw new IOException("Could not delete file
> " + aFile);
>> -                    }
>> -                }
>> -            }
>> -        }
>> -        catch (Exception e) {
>> -            logger.error("Failed to process file: " + aFile + ".
> Reason: " + e, e);
>> -        }
>> -    }
>> +	public void setMarshaler(FileMarshaler marshaler) {
>> +		this.marshaler = marshaler;
>> +	}
>>  
>> -    protected void processFile(File aFile) throws Exception {
>> -        String name = aFile.getCanonicalPath();
>> -        InputStream in = new BufferedInputStream(new
> FileInputStream(aFile));
>> -        InOnly exchange =
> getExchangeFactory().createInOnlyExchange();
>> -        configureExchangeTarget(exchange);
>> -        NormalizedMessage message = exchange.createMessage();
>> -        exchange.setInMessage(message);
>> -        marshaler.readMessage(exchange, message, in, name);
>> -        sendSync(exchange);
>> -        in.close();
>> -    }
>> +	// Implementation methods
>> +	//
> ------------------------------------------------------------------------
> -
>>  
>> -    public String getLocationURI() {
>> -        return file.toURI().toString();
>> -    }
>> +	protected void pollFileOrDirectory(File fileOrDirectory) {
>> +		pollFileOrDirectory(fileOrDirectory, true);
>> +	}
>>  
>> -    public void process(MessageExchange exchange) throws Exception {
>> -        // Do nothing. In our case, this method should never be
> called
>> -        // as we only send synchronous InOnly exchange
>> -    }
>> +	protected void pollFileOrDirectory(File fileOrDirectory, boolean
> processDir) {
>> +		if (!fileOrDirectory.isDirectory()) {
>> +			pollFile(fileOrDirectory); // process the file
>> +		} else if (processDir) {
>> +			logger.debug("Polling directory " +
> fileOrDirectory);
>> +			File[] files =
> fileOrDirectory.listFiles(getFilter());
>> +			for (int i = 0; i < files.length; i++) {
>> +				pollFileOrDirectory(files[i],
> isRecursive()); // self-recursion
>> +			}
>> +		} else {
>> +			logger.debug("Skipping directory " +
> fileOrDirectory);
>> +		}
>> +	}
>> +
>> +	protected void pollFile(final File aFile) {
>> +		if (logger.isDebugEnabled()) {
>> +			logger.debug("Scheduling file " + aFile + " for
> processing");
>> +		}
>> +		getExecutor().execute(new Runnable() {
>> +			public void run() {
>> +				String uri =
> file.toURI().relativize(aFile.toURI()).toString();
>> +				Lock lock = lockManager.getLock(uri);
>> +				if (lock.tryLock()) {
>> +					boolean unlock = true;
>> +					try {
>> +						unlock =
> processFileAndDelete(aFile);
>> +					} finally {
>> +						if (unlock) {
>> +							lock.unlock();
>> +						}
>> +					}
>> +				} else {
>> +					if (logger.isDebugEnabled()) {
>> +						logger.debug("Unable to
> acquire lock on " + aFile);
>> +					}
>> +				}
>> +			}
>> +		});
>> +	}
>> +
>> +	protected boolean processFileAndDelete(File aFile) {
>> +		boolean unlock = true;
>> +		try {
>> +			if (logger.isDebugEnabled()) {
>> +				logger.debug("Processing file " +
> aFile);
>> +			}
>> +			if (aFile.exists()) {
>> +				processFile(aFile);
>> +				unlock = false;
>> +				if (isDeleteFile()) {
>> +					if (!aFile.delete()) {
>> +						throw new
> IOException("Could not delete file " + aFile);
>> +					}
>> +					unlock = true;
>> +				}
>> +			}
>> +		} catch (Exception e) {
>> +			logger.error("Failed to process file: " + aFile
> + ". Reason: " + e,
>> +					e);
>> +		}
>> +		return unlock;
>> +	}
>> +
>> +	protected void processFile(File aFile) throws Exception {
>> +		String name = aFile.getCanonicalPath();
>> +		InputStream in = new BufferedInputStream(new
> FileInputStream(aFile));
>> +		InOnly exchange =
> getExchangeFactory().createInOnlyExchange();
>> +		configureExchangeTarget(exchange);
>> +		NormalizedMessage message = exchange.createMessage();
>> +		exchange.setInMessage(message);
>> +		marshaler.readMessage(exchange, message, in, name);
>> +		sendSync(exchange);
>> +		in.close();
>> +	}
>> +
>> +	public String getLocationURI() {
>> +		return file.toURI().toString();
>> +	}
>> +
>> +	public void process(MessageExchange exchange) throws Exception {
>> +		// Do nothing. In our case, this method should never be
> called
>> +		// as we only send synchronous InOnly exchange
>> +	}
>>  }
> 
> 
> The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. 
> 
> WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email.
>  
> www.wipro.com
> 
> 

RE: [jira] Created: (SM-1004) File poller deletes files, even iferrors occur while processing

Posted by ra...@wipro.com.
Yes,

It is the lock handling... 

1. Every polling therad creates lock with the file uri and then
processes the file..
2. Lock is released once the file is processed and deleted. (Managed by
unlock flag)
3. If there is any exception in processing the file the lock is not
released.

I have just changed it in similar lines with FtpPoller.



Regards,
Rabi Mishra, 
http://rabisblog.blogspot.com/
c++; /* this makes c bigger but returns the old value */ 
-----Original Message-----
From: Gert Vanthienen [mailto:gert.vanthienen@skynet.be] 
Sent: Monday, July 16, 2007 2:05 PM
To: servicemix-dev@geronimo.apache.org
Subject: Re: [jira] Created: (SM-1004) File poller deletes files, even
iferrors occur while processing

Rabi,


Thank you for helping out with this!  A lot of this patch seems to be
formatting changes, but it already saves me the time/effort to figure
things out.  You mainly changed the lock handling, right?

Can you explain what you have changed?  Just curious to know how things
work...


Gert

rabi.mishra@wipro.com wrote:
> Gret,
> 
> Are you working on this patch? I had fixed it sometime back and
> attaching the patch..
> 
> -----Original Message-----
> From: Gert Vanthienen (JIRA) [mailto:jira@apache.org] 
> Sent: Monday, July 16, 2007 1:17 PM
> To: servicemix-dev@geronimo.apache.org
> Subject: [jira] Created: (SM-1004) File poller deletes files, even if
> errors occur while processing
> 
> File poller deletes files, even if errors occur while processing
> ----------------------------------------------------------------
> 
>                  Key: SM-1004
>                  URL:
https://issues.apache.org/activemq/browse/SM-1004
>              Project: ServiceMix
>           Issue Type: Bug
>           Components: servicemix-file
>     Affects Versions: 3.1.1
>             Reporter: Gert Vanthienen
> 
> 
> Other poller components (e.g. FTP poller) leave the message where it
is
> in case of error/fault, so it can be retried afterwards (e.g. services
> unavailable, XML message content not yet complete, ...).
> 
> --
> This message is automatically generated by JIRA.
> -
> You can reply to this email to add a comment to the issue online.
> 
> 
> 
> 
> The information contained in this electronic message and any
attachments to this message are intended for the exclusive use of the
addressee(s) and may contain proprietary, confidential or privileged
information. If you are not the intended recipient, you should not
disseminate, distribute or copy this e-mail. Please notify the sender
immediately and destroy all copies of this message and any attachments. 
> 
> WARNING: Computer viruses can be transmitted via email. The recipient
should check this email and any attachments for the presence of viruses.
The company accepts no liability for any damage caused by any virus
transmitted by this email.
>  
> www.wipro.com
> 
> 
>
------------------------------------------------------------------------
> 
> Index:
E:/Rabi/work/workspace/servicemix/deployables/bindingcomponents/servicem
ix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
> ===================================================================
> ---
E:/Rabi/work/workspace/servicemix/deployables/bindingcomponents/servicem
ix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
(revision 555678)
> +++
E:/Rabi/work/workspace/servicemix/deployables/bindingcomponents/servicem
ix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
(working copy)
> @@ -41,217 +41,229 @@
>  import java.util.concurrent.locks.Lock;
>  
>  /**
> - * A polling endpoint which looks for a file or files in a directory
> - * and sends the files into the JBI bus as messages, deleting the
files
> - * by default when they are processed.
> - *
> + * A polling endpoint which looks for a file or files in a directory
and sends
> + * the files into the JBI bus as messages, deleting the files by
default when
> + * they are processed.
> + * 
>   * @org.apache.xbean.XBean element="poller"
> - *
> + * 
>   * @version $Revision$
>   */
> -public class FilePollerEndpoint extends PollingEndpoint implements
FileEndpointType {
> +public class FilePollerEndpoint extends PollingEndpoint implements
> +		FileEndpointType {
>  
> -    private File file;
> -    private FileFilter filter;
> -    private boolean deleteFile = true;
> -    private boolean recursive = true;
> -    private boolean autoCreateDirectory = true;
> -    private FileMarshaler marshaler = new DefaultFileMarshaler();
> -    private LockManager lockManager;
> +	private File file;
>  
> -    public FilePollerEndpoint() {
> -    }
> +	private FileFilter filter;
>  
> -    public FilePollerEndpoint(ServiceUnit serviceUnit, QName service,
String endpoint) {
> -        super(serviceUnit, service, endpoint);
> -    }
> +	private boolean deleteFile = true;
>  
> -    public FilePollerEndpoint(DefaultComponent component,
ServiceEndpoint endpoint) {
> -        super(component, endpoint);
> -    }
> +	private boolean recursive = true;
>  
> -    public void poll() throws Exception {
> -        pollFileOrDirectory(file);
> -    }
> +	private boolean autoCreateDirectory = true;
>  
> -    public void validate() throws DeploymentException {
> -        super.validate();
> -        if (file == null) {
> -            throw new DeploymentException("You must specify a file
property");
> -        }
> -        if (isAutoCreateDirectory() && !file.exists()) {
> -            file.mkdirs();
> -        }
> -        if (lockManager == null) {
> -            lockManager = createLockManager();
> -        }
> -    }
> -    
> -    protected LockManager createLockManager() {
> -        return new SimpleLockManager();
> -    }
> +	private FileMarshaler marshaler = new DefaultFileMarshaler();
>  
> +	private LockManager lockManager;
>  
> -    // Properties
> -
//----------------------------------------------------------------------
---
> -    public File getFile() {
> -        return file;
> -    }
> +	public FilePollerEndpoint() {
> +	}
>  
> -    /**
> -     * Sets the file to poll, which can be a directory or a file.
> -     *
> -     * @param file
> -     */
> -    public void setFile(File file) {
> -        this.file = file;
> -    }
> +	public FilePollerEndpoint(ServiceUnit serviceUnit, QName
service,
> +			String endpoint) {
> +		super(serviceUnit, service, endpoint);
> +	}
>  
> -    /**
> -     * @return the lockManager
> -     */
> -    public LockManager getLockManager() {
> -        return lockManager;
> -    }
> +	public FilePollerEndpoint(DefaultComponent component,
> +			ServiceEndpoint endpoint) {
> +		super(component, endpoint);
> +	}
>  
> -    /**
> -     * @param lockManager the lockManager to set
> -     */
> -    public void setLockManager(LockManager lockManager) {
> -        this.lockManager = lockManager;
> -    }
> +	public void poll() throws Exception {
> +		pollFileOrDirectory(file);
> +	}
>  
> -    public FileFilter getFilter() {
> -        return filter;
> -    }
> +	public void validate() throws DeploymentException {
> +		super.validate();
> +		if (file == null) {
> +			throw new DeploymentException("You must specify
a file property");
> +		}
> +		if (isAutoCreateDirectory() && !file.exists()) {
> +			file.mkdirs();
> +		}
> +		if (lockManager == null) {
> +			lockManager = createLockManager();
> +		}
> +	}
>  
> -    /**
> -     * Sets the optional filter to choose which files to process
> -     */
> -    public void setFilter(FileFilter filter) {
> -        this.filter = filter;
> -    }
> +	protected LockManager createLockManager() {
> +		return new SimpleLockManager();
> +	}
>  
> -    /**
> -     * Returns whether or not we should delete the file when its
processed
> -     */
> -    public boolean isDeleteFile() {
> -        return deleteFile;
> -    }
> +	// Properties
> +	//
------------------------------------------------------------------------
-
> +	public File getFile() {
> +		return file;
> +	}
>  
> -    public void setDeleteFile(boolean deleteFile) {
> -        this.deleteFile = deleteFile;
> -    }
> +	/**
> +	 * Sets the file to poll, which can be a directory or a file.
> +	 * 
> +	 * @param file
> +	 */
> +	public void setFile(File file) {
> +		this.file = file;
> +	}
>  
> -    public boolean isRecursive() {
> -        return recursive;
> -    }
> +	/**
> +	 * @return the lockManager
> +	 */
> +	public LockManager getLockManager() {
> +		return lockManager;
> +	}
>  
> -    public void setRecursive(boolean recursive) {
> -        this.recursive = recursive;
> -    }
> +	/**
> +	 * @param lockManager
> +	 *            the lockManager to set
> +	 */
> +	public void setLockManager(LockManager lockManager) {
> +		this.lockManager = lockManager;
> +	}
>  
> -    public boolean isAutoCreateDirectory() {
> -        return autoCreateDirectory;
> -    }
> +	public FileFilter getFilter() {
> +		return filter;
> +	}
>  
> -    public void setAutoCreateDirectory(boolean autoCreateDirectory) {
> -        this.autoCreateDirectory = autoCreateDirectory;
> -    }
> +	/**
> +	 * Sets the optional filter to choose which files to process
> +	 */
> +	public void setFilter(FileFilter filter) {
> +		this.filter = filter;
> +	}
>  
> -    public FileMarshaler getMarshaler() {
> -        return marshaler;
> -    }
> +	/**
> +	 * Returns whether or not we should delete the file when its
processed
> +	 */
> +	public boolean isDeleteFile() {
> +		return deleteFile;
> +	}
>  
> -    public void setMarshaler(FileMarshaler marshaler) {
> -        this.marshaler = marshaler;
> -    }
> +	public void setDeleteFile(boolean deleteFile) {
> +		this.deleteFile = deleteFile;
> +	}
>  
> -    // Implementation methods
> -
//----------------------------------------------------------------------
---
> +	public boolean isRecursive() {
> +		return recursive;
> +	}
>  
> +	public void setRecursive(boolean recursive) {
> +		this.recursive = recursive;
> +	}
>  
> -    protected void pollFileOrDirectory(File fileOrDirectory) {
> -        pollFileOrDirectory(fileOrDirectory, true);
> -    }
> +	public boolean isAutoCreateDirectory() {
> +		return autoCreateDirectory;
> +	}
>  
> -    protected void pollFileOrDirectory(File fileOrDirectory, boolean
processDir) {
> -        if (!fileOrDirectory.isDirectory()) {
> -            pollFile(fileOrDirectory); // process the file
> -        }
> -        else if (processDir) {
> -            logger.debug("Polling directory " + fileOrDirectory);
> -            File[] files = fileOrDirectory.listFiles(getFilter());
> -            for (int i = 0; i < files.length; i++) {
> -                pollFileOrDirectory(files[i], isRecursive()); //
self-recursion
> -            }
> -        }
> -        else {
> -            logger.debug("Skipping directory " + fileOrDirectory);
> -        }
> -    }
> +	public void setAutoCreateDirectory(boolean autoCreateDirectory)
{
> +		this.autoCreateDirectory = autoCreateDirectory;
> +	}
>  
> -    protected void pollFile(final File aFile) {
> -        if (logger.isDebugEnabled()) {
> -            logger.debug("Scheduling file " + aFile + " for
processing");
> -        }
> -        getExecutor().execute(new Runnable() {
> -            public void run() {
> -                String uri =
file.toURI().relativize(aFile.toURI()).toString();
> -                Lock lock = lockManager.getLock(uri);
> -                if (lock.tryLock()) {
> -                    try {
> -                        processFileAndDelete(aFile);
> -                    }
> -                    finally {
> -                        lock.unlock();
> -                    }
> -                } else {
> -                    if (logger.isDebugEnabled()) {
> -                        logger.debug("Unable to acquire lock on " +
aFile);
> -                    }
> -                }
> -            }
> -        });
> -    }
> +	public FileMarshaler getMarshaler() {
> +		return marshaler;
> +	}
>  
> -    protected void processFileAndDelete(File aFile) {
> -        try {
> -            if (logger.isDebugEnabled()) {
> -                logger.debug("Processing file " + aFile);
> -            }
> -            if (aFile.exists()) {
> -                processFile(aFile);
> -                if (isDeleteFile()) {
> -                    if (!aFile.delete()) {
> -                        throw new IOException("Could not delete file
" + aFile);
> -                    }
> -                }
> -            }
> -        }
> -        catch (Exception e) {
> -            logger.error("Failed to process file: " + aFile + ".
Reason: " + e, e);
> -        }
> -    }
> +	public void setMarshaler(FileMarshaler marshaler) {
> +		this.marshaler = marshaler;
> +	}
>  
> -    protected void processFile(File aFile) throws Exception {
> -        String name = aFile.getCanonicalPath();
> -        InputStream in = new BufferedInputStream(new
FileInputStream(aFile));
> -        InOnly exchange =
getExchangeFactory().createInOnlyExchange();
> -        configureExchangeTarget(exchange);
> -        NormalizedMessage message = exchange.createMessage();
> -        exchange.setInMessage(message);
> -        marshaler.readMessage(exchange, message, in, name);
> -        sendSync(exchange);
> -        in.close();
> -    }
> +	// Implementation methods
> +	//
------------------------------------------------------------------------
-
>  
> -    public String getLocationURI() {
> -        return file.toURI().toString();
> -    }
> +	protected void pollFileOrDirectory(File fileOrDirectory) {
> +		pollFileOrDirectory(fileOrDirectory, true);
> +	}
>  
> -    public void process(MessageExchange exchange) throws Exception {
> -        // Do nothing. In our case, this method should never be
called
> -        // as we only send synchronous InOnly exchange
> -    }
> +	protected void pollFileOrDirectory(File fileOrDirectory, boolean
processDir) {
> +		if (!fileOrDirectory.isDirectory()) {
> +			pollFile(fileOrDirectory); // process the file
> +		} else if (processDir) {
> +			logger.debug("Polling directory " +
fileOrDirectory);
> +			File[] files =
fileOrDirectory.listFiles(getFilter());
> +			for (int i = 0; i < files.length; i++) {
> +				pollFileOrDirectory(files[i],
isRecursive()); // self-recursion
> +			}
> +		} else {
> +			logger.debug("Skipping directory " +
fileOrDirectory);
> +		}
> +	}
> +
> +	protected void pollFile(final File aFile) {
> +		if (logger.isDebugEnabled()) {
> +			logger.debug("Scheduling file " + aFile + " for
processing");
> +		}
> +		getExecutor().execute(new Runnable() {
> +			public void run() {
> +				String uri =
file.toURI().relativize(aFile.toURI()).toString();
> +				Lock lock = lockManager.getLock(uri);
> +				if (lock.tryLock()) {
> +					boolean unlock = true;
> +					try {
> +						unlock =
processFileAndDelete(aFile);
> +					} finally {
> +						if (unlock) {
> +							lock.unlock();
> +						}
> +					}
> +				} else {
> +					if (logger.isDebugEnabled()) {
> +						logger.debug("Unable to
acquire lock on " + aFile);
> +					}
> +				}
> +			}
> +		});
> +	}
> +
> +	protected boolean processFileAndDelete(File aFile) {
> +		boolean unlock = true;
> +		try {
> +			if (logger.isDebugEnabled()) {
> +				logger.debug("Processing file " +
aFile);
> +			}
> +			if (aFile.exists()) {
> +				processFile(aFile);
> +				unlock = false;
> +				if (isDeleteFile()) {
> +					if (!aFile.delete()) {
> +						throw new
IOException("Could not delete file " + aFile);
> +					}
> +					unlock = true;
> +				}
> +			}
> +		} catch (Exception e) {
> +			logger.error("Failed to process file: " + aFile
+ ". Reason: " + e,
> +					e);
> +		}
> +		return unlock;
> +	}
> +
> +	protected void processFile(File aFile) throws Exception {
> +		String name = aFile.getCanonicalPath();
> +		InputStream in = new BufferedInputStream(new
FileInputStream(aFile));
> +		InOnly exchange =
getExchangeFactory().createInOnlyExchange();
> +		configureExchangeTarget(exchange);
> +		NormalizedMessage message = exchange.createMessage();
> +		exchange.setInMessage(message);
> +		marshaler.readMessage(exchange, message, in, name);
> +		sendSync(exchange);
> +		in.close();
> +	}
> +
> +	public String getLocationURI() {
> +		return file.toURI().toString();
> +	}
> +
> +	public void process(MessageExchange exchange) throws Exception {
> +		// Do nothing. In our case, this method should never be
called
> +		// as we only send synchronous InOnly exchange
> +	}
>  }


The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. 

WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email.
 
www.wipro.com

Re: [jira] Created: (SM-1004) File poller deletes files, even iferrors occur while processing

Posted by Gert Vanthienen <ge...@skynet.be>.
Rabi,


Thank you for helping out with this!  A lot of this patch seems to be 
formatting changes, but it already saves me the time/effort to figure 
things out.  You mainly changed the lock handling, right?

Can you explain what you have changed?  Just curious to know how things 
work...


Gert

rabi.mishra@wipro.com wrote:
> Gret,
> 
> Are you working on this patch? I had fixed it sometime back and
> attaching the patch..
> 
> -----Original Message-----
> From: Gert Vanthienen (JIRA) [mailto:jira@apache.org] 
> Sent: Monday, July 16, 2007 1:17 PM
> To: servicemix-dev@geronimo.apache.org
> Subject: [jira] Created: (SM-1004) File poller deletes files, even if
> errors occur while processing
> 
> File poller deletes files, even if errors occur while processing
> ----------------------------------------------------------------
> 
>                  Key: SM-1004
>                  URL: https://issues.apache.org/activemq/browse/SM-1004
>              Project: ServiceMix
>           Issue Type: Bug
>           Components: servicemix-file
>     Affects Versions: 3.1.1
>             Reporter: Gert Vanthienen
> 
> 
> Other poller components (e.g. FTP poller) leave the message where it is
> in case of error/fault, so it can be retried afterwards (e.g. services
> unavailable, XML message content not yet complete, ...).
> 
> --
> This message is automatically generated by JIRA.
> -
> You can reply to this email to add a comment to the issue online.
> 
> 
> 
> 
> The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. 
> 
> WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email.
>  
> www.wipro.com
> 
> 
> ------------------------------------------------------------------------
> 
> Index: E:/Rabi/work/workspace/servicemix/deployables/bindingcomponents/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
> ===================================================================
> --- E:/Rabi/work/workspace/servicemix/deployables/bindingcomponents/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java	(revision 555678)
> +++ E:/Rabi/work/workspace/servicemix/deployables/bindingcomponents/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java	(working copy)
> @@ -41,217 +41,229 @@
>  import java.util.concurrent.locks.Lock;
>  
>  /**
> - * A polling endpoint which looks for a file or files in a directory
> - * and sends the files into the JBI bus as messages, deleting the files
> - * by default when they are processed.
> - *
> + * A polling endpoint which looks for a file or files in a directory and sends
> + * the files into the JBI bus as messages, deleting the files by default when
> + * they are processed.
> + * 
>   * @org.apache.xbean.XBean element="poller"
> - *
> + * 
>   * @version $Revision$
>   */
> -public class FilePollerEndpoint extends PollingEndpoint implements FileEndpointType {
> +public class FilePollerEndpoint extends PollingEndpoint implements
> +		FileEndpointType {
>  
> -    private File file;
> -    private FileFilter filter;
> -    private boolean deleteFile = true;
> -    private boolean recursive = true;
> -    private boolean autoCreateDirectory = true;
> -    private FileMarshaler marshaler = new DefaultFileMarshaler();
> -    private LockManager lockManager;
> +	private File file;
>  
> -    public FilePollerEndpoint() {
> -    }
> +	private FileFilter filter;
>  
> -    public FilePollerEndpoint(ServiceUnit serviceUnit, QName service, String endpoint) {
> -        super(serviceUnit, service, endpoint);
> -    }
> +	private boolean deleteFile = true;
>  
> -    public FilePollerEndpoint(DefaultComponent component, ServiceEndpoint endpoint) {
> -        super(component, endpoint);
> -    }
> +	private boolean recursive = true;
>  
> -    public void poll() throws Exception {
> -        pollFileOrDirectory(file);
> -    }
> +	private boolean autoCreateDirectory = true;
>  
> -    public void validate() throws DeploymentException {
> -        super.validate();
> -        if (file == null) {
> -            throw new DeploymentException("You must specify a file property");
> -        }
> -        if (isAutoCreateDirectory() && !file.exists()) {
> -            file.mkdirs();
> -        }
> -        if (lockManager == null) {
> -            lockManager = createLockManager();
> -        }
> -    }
> -    
> -    protected LockManager createLockManager() {
> -        return new SimpleLockManager();
> -    }
> +	private FileMarshaler marshaler = new DefaultFileMarshaler();
>  
> +	private LockManager lockManager;
>  
> -    // Properties
> -    //-------------------------------------------------------------------------
> -    public File getFile() {
> -        return file;
> -    }
> +	public FilePollerEndpoint() {
> +	}
>  
> -    /**
> -     * Sets the file to poll, which can be a directory or a file.
> -     *
> -     * @param file
> -     */
> -    public void setFile(File file) {
> -        this.file = file;
> -    }
> +	public FilePollerEndpoint(ServiceUnit serviceUnit, QName service,
> +			String endpoint) {
> +		super(serviceUnit, service, endpoint);
> +	}
>  
> -    /**
> -     * @return the lockManager
> -     */
> -    public LockManager getLockManager() {
> -        return lockManager;
> -    }
> +	public FilePollerEndpoint(DefaultComponent component,
> +			ServiceEndpoint endpoint) {
> +		super(component, endpoint);
> +	}
>  
> -    /**
> -     * @param lockManager the lockManager to set
> -     */
> -    public void setLockManager(LockManager lockManager) {
> -        this.lockManager = lockManager;
> -    }
> +	public void poll() throws Exception {
> +		pollFileOrDirectory(file);
> +	}
>  
> -    public FileFilter getFilter() {
> -        return filter;
> -    }
> +	public void validate() throws DeploymentException {
> +		super.validate();
> +		if (file == null) {
> +			throw new DeploymentException("You must specify a file property");
> +		}
> +		if (isAutoCreateDirectory() && !file.exists()) {
> +			file.mkdirs();
> +		}
> +		if (lockManager == null) {
> +			lockManager = createLockManager();
> +		}
> +	}
>  
> -    /**
> -     * Sets the optional filter to choose which files to process
> -     */
> -    public void setFilter(FileFilter filter) {
> -        this.filter = filter;
> -    }
> +	protected LockManager createLockManager() {
> +		return new SimpleLockManager();
> +	}
>  
> -    /**
> -     * Returns whether or not we should delete the file when its processed
> -     */
> -    public boolean isDeleteFile() {
> -        return deleteFile;
> -    }
> +	// Properties
> +	// -------------------------------------------------------------------------
> +	public File getFile() {
> +		return file;
> +	}
>  
> -    public void setDeleteFile(boolean deleteFile) {
> -        this.deleteFile = deleteFile;
> -    }
> +	/**
> +	 * Sets the file to poll, which can be a directory or a file.
> +	 * 
> +	 * @param file
> +	 */
> +	public void setFile(File file) {
> +		this.file = file;
> +	}
>  
> -    public boolean isRecursive() {
> -        return recursive;
> -    }
> +	/**
> +	 * @return the lockManager
> +	 */
> +	public LockManager getLockManager() {
> +		return lockManager;
> +	}
>  
> -    public void setRecursive(boolean recursive) {
> -        this.recursive = recursive;
> -    }
> +	/**
> +	 * @param lockManager
> +	 *            the lockManager to set
> +	 */
> +	public void setLockManager(LockManager lockManager) {
> +		this.lockManager = lockManager;
> +	}
>  
> -    public boolean isAutoCreateDirectory() {
> -        return autoCreateDirectory;
> -    }
> +	public FileFilter getFilter() {
> +		return filter;
> +	}
>  
> -    public void setAutoCreateDirectory(boolean autoCreateDirectory) {
> -        this.autoCreateDirectory = autoCreateDirectory;
> -    }
> +	/**
> +	 * Sets the optional filter to choose which files to process
> +	 */
> +	public void setFilter(FileFilter filter) {
> +		this.filter = filter;
> +	}
>  
> -    public FileMarshaler getMarshaler() {
> -        return marshaler;
> -    }
> +	/**
> +	 * Returns whether or not we should delete the file when its processed
> +	 */
> +	public boolean isDeleteFile() {
> +		return deleteFile;
> +	}
>  
> -    public void setMarshaler(FileMarshaler marshaler) {
> -        this.marshaler = marshaler;
> -    }
> +	public void setDeleteFile(boolean deleteFile) {
> +		this.deleteFile = deleteFile;
> +	}
>  
> -    // Implementation methods
> -    //-------------------------------------------------------------------------
> +	public boolean isRecursive() {
> +		return recursive;
> +	}
>  
> +	public void setRecursive(boolean recursive) {
> +		this.recursive = recursive;
> +	}
>  
> -    protected void pollFileOrDirectory(File fileOrDirectory) {
> -        pollFileOrDirectory(fileOrDirectory, true);
> -    }
> +	public boolean isAutoCreateDirectory() {
> +		return autoCreateDirectory;
> +	}
>  
> -    protected void pollFileOrDirectory(File fileOrDirectory, boolean processDir) {
> -        if (!fileOrDirectory.isDirectory()) {
> -            pollFile(fileOrDirectory); // process the file
> -        }
> -        else if (processDir) {
> -            logger.debug("Polling directory " + fileOrDirectory);
> -            File[] files = fileOrDirectory.listFiles(getFilter());
> -            for (int i = 0; i < files.length; i++) {
> -                pollFileOrDirectory(files[i], isRecursive()); // self-recursion
> -            }
> -        }
> -        else {
> -            logger.debug("Skipping directory " + fileOrDirectory);
> -        }
> -    }
> +	public void setAutoCreateDirectory(boolean autoCreateDirectory) {
> +		this.autoCreateDirectory = autoCreateDirectory;
> +	}
>  
> -    protected void pollFile(final File aFile) {
> -        if (logger.isDebugEnabled()) {
> -            logger.debug("Scheduling file " + aFile + " for processing");
> -        }
> -        getExecutor().execute(new Runnable() {
> -            public void run() {
> -                String uri = file.toURI().relativize(aFile.toURI()).toString();
> -                Lock lock = lockManager.getLock(uri);
> -                if (lock.tryLock()) {
> -                    try {
> -                        processFileAndDelete(aFile);
> -                    }
> -                    finally {
> -                        lock.unlock();
> -                    }
> -                } else {
> -                    if (logger.isDebugEnabled()) {
> -                        logger.debug("Unable to acquire lock on " + aFile);
> -                    }
> -                }
> -            }
> -        });
> -    }
> +	public FileMarshaler getMarshaler() {
> +		return marshaler;
> +	}
>  
> -    protected void processFileAndDelete(File aFile) {
> -        try {
> -            if (logger.isDebugEnabled()) {
> -                logger.debug("Processing file " + aFile);
> -            }
> -            if (aFile.exists()) {
> -                processFile(aFile);
> -                if (isDeleteFile()) {
> -                    if (!aFile.delete()) {
> -                        throw new IOException("Could not delete file " + aFile);
> -                    }
> -                }
> -            }
> -        }
> -        catch (Exception e) {
> -            logger.error("Failed to process file: " + aFile + ". Reason: " + e, e);
> -        }
> -    }
> +	public void setMarshaler(FileMarshaler marshaler) {
> +		this.marshaler = marshaler;
> +	}
>  
> -    protected void processFile(File aFile) throws Exception {
> -        String name = aFile.getCanonicalPath();
> -        InputStream in = new BufferedInputStream(new FileInputStream(aFile));
> -        InOnly exchange = getExchangeFactory().createInOnlyExchange();
> -        configureExchangeTarget(exchange);
> -        NormalizedMessage message = exchange.createMessage();
> -        exchange.setInMessage(message);
> -        marshaler.readMessage(exchange, message, in, name);
> -        sendSync(exchange);
> -        in.close();
> -    }
> +	// Implementation methods
> +	// -------------------------------------------------------------------------
>  
> -    public String getLocationURI() {
> -        return file.toURI().toString();
> -    }
> +	protected void pollFileOrDirectory(File fileOrDirectory) {
> +		pollFileOrDirectory(fileOrDirectory, true);
> +	}
>  
> -    public void process(MessageExchange exchange) throws Exception {
> -        // Do nothing. In our case, this method should never be called
> -        // as we only send synchronous InOnly exchange
> -    }
> +	protected void pollFileOrDirectory(File fileOrDirectory, boolean processDir) {
> +		if (!fileOrDirectory.isDirectory()) {
> +			pollFile(fileOrDirectory); // process the file
> +		} else if (processDir) {
> +			logger.debug("Polling directory " + fileOrDirectory);
> +			File[] files = fileOrDirectory.listFiles(getFilter());
> +			for (int i = 0; i < files.length; i++) {
> +				pollFileOrDirectory(files[i], isRecursive()); // self-recursion
> +			}
> +		} else {
> +			logger.debug("Skipping directory " + fileOrDirectory);
> +		}
> +	}
> +
> +	protected void pollFile(final File aFile) {
> +		if (logger.isDebugEnabled()) {
> +			logger.debug("Scheduling file " + aFile + " for processing");
> +		}
> +		getExecutor().execute(new Runnable() {
> +			public void run() {
> +				String uri = file.toURI().relativize(aFile.toURI()).toString();
> +				Lock lock = lockManager.getLock(uri);
> +				if (lock.tryLock()) {
> +					boolean unlock = true;
> +					try {
> +						unlock = processFileAndDelete(aFile);
> +					} finally {
> +						if (unlock) {
> +							lock.unlock();
> +						}
> +					}
> +				} else {
> +					if (logger.isDebugEnabled()) {
> +						logger.debug("Unable to acquire lock on " + aFile);
> +					}
> +				}
> +			}
> +		});
> +	}
> +
> +	protected boolean processFileAndDelete(File aFile) {
> +		boolean unlock = true;
> +		try {
> +			if (logger.isDebugEnabled()) {
> +				logger.debug("Processing file " + aFile);
> +			}
> +			if (aFile.exists()) {
> +				processFile(aFile);
> +				unlock = false;
> +				if (isDeleteFile()) {
> +					if (!aFile.delete()) {
> +						throw new IOException("Could not delete file " + aFile);
> +					}
> +					unlock = true;
> +				}
> +			}
> +		} catch (Exception e) {
> +			logger.error("Failed to process file: " + aFile + ". Reason: " + e,
> +					e);
> +		}
> +		return unlock;
> +	}
> +
> +	protected void processFile(File aFile) throws Exception {
> +		String name = aFile.getCanonicalPath();
> +		InputStream in = new BufferedInputStream(new FileInputStream(aFile));
> +		InOnly exchange = getExchangeFactory().createInOnlyExchange();
> +		configureExchangeTarget(exchange);
> +		NormalizedMessage message = exchange.createMessage();
> +		exchange.setInMessage(message);
> +		marshaler.readMessage(exchange, message, in, name);
> +		sendSync(exchange);
> +		in.close();
> +	}
> +
> +	public String getLocationURI() {
> +		return file.toURI().toString();
> +	}
> +
> +	public void process(MessageExchange exchange) throws Exception {
> +		// Do nothing. In our case, this method should never be called
> +		// as we only send synchronous InOnly exchange
> +	}
>  }

RE: [jira] Created: (SM-1004) File poller deletes files, even if errors occur while processing

Posted by ra...@wipro.com.
Gret,

Are you working on this patch? I had fixed it sometime back and
attaching the patch..

-----Original Message-----
From: Gert Vanthienen (JIRA) [mailto:jira@apache.org] 
Sent: Monday, July 16, 2007 1:17 PM
To: servicemix-dev@geronimo.apache.org
Subject: [jira] Created: (SM-1004) File poller deletes files, even if
errors occur while processing

File poller deletes files, even if errors occur while processing
----------------------------------------------------------------

                 Key: SM-1004
                 URL: https://issues.apache.org/activemq/browse/SM-1004
             Project: ServiceMix
          Issue Type: Bug
          Components: servicemix-file
    Affects Versions: 3.1.1
            Reporter: Gert Vanthienen


Other poller components (e.g. FTP poller) leave the message where it is
in case of error/fault, so it can be retried afterwards (e.g. services
unavailable, XML message content not yet complete, ...).

--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.




The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. 

WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email.
 
www.wipro.com

[jira] Resolved: (SM-1004) File poller deletes files, even if errors occur while processing

Posted by "Gert Vanthienen (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/activemq/browse/SM-1004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Gert Vanthienen resolved SM-1004.
---------------------------------

       Resolution: Fixed
    Fix Version/s: 3.2
                   3.1.2

Thanks to Rabi Mishra for the initial patch and help in resolving this issue!

http://svn.apache.org/viewvc?view=rev&revision=556580
http://svn.apache.org/viewvc?view=rev&revision=556595

> File poller deletes files, even if errors occur while processing
> ----------------------------------------------------------------
>
>                 Key: SM-1004
>                 URL: https://issues.apache.org/activemq/browse/SM-1004
>             Project: ServiceMix
>          Issue Type: Bug
>          Components: servicemix-file
>    Affects Versions: 3.1.1
>            Reporter: Gert Vanthienen
>             Fix For: 3.1.2, 3.2
>
>
> Other poller components (e.g. FTP poller) leave the message where it is in case of error/fault, so it can be retried afterwards (e.g. services unavailable, XML message content not yet complete, ...).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.