You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/04/30 12:44:49 UTC
svn commit: r533687 - in
/activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file:
FileConsumer.java FileEndpoint.java FileExchange.java FileProducer.java
Author: rajdavies
Date: Mon Apr 30 03:44:48 2007
New Revision: 533687
URL: http://svn.apache.org/viewvc?view=rev&rev=533687
Log:
allow writing if file locking not selected as an option
Modified:
activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java
activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileExchange.java
activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileProducer.java
Modified: activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java?view=diff&rev=533687&r1=533686&r2=533687
==============================================================================
--- activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java (original)
+++ activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java Mon Apr 30 03:44:48 2007
@@ -17,20 +17,11 @@
*/
package org.apache.camel.component.file;
-import java.io.BufferedInputStream;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStream;
import java.io.RandomAccessFile;
-import java.net.SocketAddress;
import java.nio.channels.FileChannel;
-import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.locks.Lock;
-import javax.management.Query;
-import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.PollingConsumer;
import org.apache.commons.logging.Log;
@@ -39,134 +30,126 @@
/**
* @version $Revision: 523016 $
*/
-public class FileConsumer extends PollingConsumer<FileExchange> {
- private static final transient Log log = LogFactory.getLog(FileConsumer.class);
+public class FileConsumer extends PollingConsumer<FileExchange>{
+ private static final transient Log log=LogFactory.getLog(FileConsumer.class);
private final FileEndpoint endpoint;
private boolean recursive=true;
private boolean attemptFileLock=false;
- private String regexPattern = "";
- private long lastPollTime = 0l;
-
-
-
- public FileConsumer(final FileEndpoint endpoint, Processor<FileExchange> processor,ScheduledExecutorService executor) {
- super(endpoint, processor,executor);
- this.endpoint = endpoint;
-
-
+ private String regexPattern="";
+ private long lastPollTime=0l;
+
+ public FileConsumer(final FileEndpoint endpoint,Processor<FileExchange> processor,ScheduledExecutorService executor){
+ super(endpoint,processor,executor);
+ this.endpoint=endpoint;
}
- protected void poll() throws Exception {
+
+ protected void poll() throws Exception{
pollFileOrDirectory(endpoint.getFile(),isRecursive());
lastPollTime=System.currentTimeMillis();
}
-
-
- protected void pollFileOrDirectory(File fileOrDirectory, boolean processDir) {
- if (!fileOrDirectory.isDirectory()) {
+
+ protected void pollFileOrDirectory(File fileOrDirectory,boolean processDir){
+ if(!fileOrDirectory.isDirectory()){
pollFile(fileOrDirectory); // process the file
- }
- else if (processDir) {
- log.debug("Polling directory " + fileOrDirectory);
- File[] files = fileOrDirectory.listFiles();
- for (int i = 0; i < files.length; i++) {
- pollFileOrDirectory(files[i], isRecursive()); // self-recursion
+ }else if(processDir){
+ log.debug("Polling directory "+fileOrDirectory);
+ File[] files=fileOrDirectory.listFiles();
+ for(int i=0;i<files.length;i++){
+ pollFileOrDirectory(files[i],isRecursive()); // self-recursion
}
- }
- else {
- log.debug("Skipping directory " + fileOrDirectory);
+ }else{
+ log.debug("Skipping directory "+fileOrDirectory);
}
}
- protected void pollFile(final File file) {
- if (file.exists() && file.lastModified() > lastPollTime) {
- if (isValidFile(file)) {
+ protected void pollFile(final File file){
+ if(file.exists()&&file.lastModified()>lastPollTime){
+ if(isValidFile(file)){
processFile(file);
}
}
}
-
-
- protected void processFile(File file) {
+ protected void processFile(File file){
getProcessor().process(endpoint.createExchange(file));
}
-
-
-
+
protected boolean isValidFile(File file){
boolean result=false;
if(file!=null&&file.exists()){
- if (isMatched(file)) {
- if(isAttemptFileLock()){
- FileChannel fc=null;
- try{
- fc=new RandomAccessFile(file,"rw").getChannel();
- fc.lock();
- result=true;
- }catch(Throwable e){
- }finally{
- if(fc!=null){
- try{
- fc.close();
- }catch(IOException e){
+ if(isMatched(file)){
+ if(isAttemptFileLock()){
+ FileChannel fc=null;
+ try{
+ fc=new RandomAccessFile(file,"rw").getChannel();
+ fc.lock();
+ result=true;
+ }catch(Throwable e){
+ log.debug("Failed to get the lock on file: " + file,e);
+ }finally{
+ if(fc!=null){
+ try{
+ fc.close();
+ }catch(IOException e){
+ }
}
}
+ }else{
+ result=true;
}
}
- }
}
return result;
}
-
- protected boolean isMatched(File file) {
- boolean result = true;
- if ( regexPattern != null && regexPattern.length() > 0 ) {
- result = file.getName().matches(getRegexPattern());
+
+ protected boolean isMatched(File file){
+ boolean result=true;
+ if(regexPattern!=null&®exPattern.length()>0){
+ result=file.getName().matches(getRegexPattern());
}
return result;
}
-
+
/**
* @return the recursive
*/
public boolean isRecursive(){
return this.recursive;
}
-
+
/**
* @param recursive the recursive to set
*/
public void setRecursive(boolean recursive){
this.recursive=recursive;
}
-
+
/**
* @return the attemptFileLock
*/
public boolean isAttemptFileLock(){
return this.attemptFileLock;
}
-
+
/**
* @param attemptFileLock the attemptFileLock to set
*/
public void setAttemptFileLock(boolean checkAppending){
this.attemptFileLock=checkAppending;
}
-
+
/**
* @return the regexPattern
*/
public String getRegexPattern(){
return this.regexPattern;
}
-
+
/**
* @param regexPattern the regexPattern to set
*/
public void setRegexPattern(String regexPattern){
this.regexPattern=regexPattern;
}
-
}
Modified: activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java?view=diff&rev=533687&r1=533686&r2=533687
==============================================================================
--- activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java (original)
+++ activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java Mon Apr 30 03:44:48 2007
@@ -46,7 +46,7 @@
* @see org.apache.camel.Endpoint#createConsumer(org.apache.camel.Processor)
*/
public Consumer<FileExchange> createConsumer(Processor<FileExchange> file) throws Exception{
- return new FileConsumer(this, file, executor);
+ return new FileConsumer(this, file, getExecutor());
}
/**
@@ -103,4 +103,6 @@
return true;
}
+
+
}
Modified: activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileExchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileExchange.java?view=diff&rev=533687&r1=533686&r2=533687
==============================================================================
--- activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileExchange.java (original)
+++ activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileExchange.java Mon Apr 30 03:44:48 2007
@@ -23,7 +23,7 @@
import org.apache.camel.impl.DefaultExchange;
/**
- * A {@link Exchange} for MINA
+ * A {@link Exchange} for File
*
* @version $Revision: 520985 $
*/
Modified: activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileProducer.java?view=diff&rev=533687&r1=533686&r2=533687
==============================================================================
--- activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileProducer.java (original)
+++ activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileProducer.java Mon Apr 30 03:44:48 2007
@@ -24,30 +24,32 @@
import org.apache.commons.logging.LogFactory;
/**
- * A {@link Producer} implementation for MINA
+ * A {@link Producer} implementation for File
*
* @version $Revision: 523016 $
*/
public class FileProducer extends DefaultProducer<FileExchange>{
- private static final transient Log log = LogFactory.getLog(FileProducer.class);
+ private static final transient Log log=LogFactory.getLog(FileProducer.class);
private final FileEndpoint endpoint;
+
public FileProducer(FileEndpoint endpoint){
super(endpoint);
- this.endpoint = endpoint;
+ this.endpoint=endpoint;
}
/**
- * @param arg0
+ * @param exchange
* @see org.apache.camel.Processor#process(java.lang.Object)
*/
public void process(FileExchange exchange){
- ByteBuffer payload = exchange.getIn().getBody(ByteBuffer.class);
- File file = null;
- if (endpoint.getFile() != null && endpoint.getFile().isDirectory()) {
- file = new File(endpoint.getFile(),exchange.getFile().getName());
- }else {
- file = exchange.getFile();
+ ByteBuffer payload=exchange.getIn().getBody(ByteBuffer.class);
+ payload.flip();
+ File file=null;
+ if(endpoint.getFile()!=null&&endpoint.getFile().isDirectory()){
+ file=new File(endpoint.getFile(),exchange.getFile().getName());
+ }else{
+ file=exchange.getFile();
}
try{
FileChannel fc=new RandomAccessFile(file,"rw").getChannel();
@@ -55,7 +57,7 @@
fc.write(payload);
fc.close();
}catch(Throwable e){
- log.error("Failed to write to File: " + file,e);
+ log.error("Failed to write to File: "+file,e);
}
}
}