You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by hb...@apache.org on 2001/04/10 18:01:15 UTC
cvs commit: jakarta-james/src/org/apache/james/nntpserver/repository ArticleIDRepository.java NNTPArticle.java NNTPArticleImpl.java NNTPGroup.java NNTPGroupImpl.java NNTPLineReader.java NNTPLineReaderImpl.java NNTPRepository.java NNTPRepositoryImpl.java NNTPSpooler.java NNTPUtil.java
hbedi 01/04/10 09:01:15
Added: src/org/apache/james/nntpserver/repository
ArticleIDRepository.java NNTPArticle.java
NNTPArticleImpl.java NNTPGroup.java
NNTPGroupImpl.java NNTPLineReader.java
NNTPLineReaderImpl.java NNTPRepository.java
NNTPRepositoryImpl.java NNTPSpooler.java
NNTPUtil.java
Log:
NNTP implementation 1st pass
Revision Changes Path
1.1 jakarta-james/src/org/apache/james/nntpserver/repository/ArticleIDRepository.java
Index: ArticleIDRepository.java
===================================================================
/*
* Copyright (C) The Apache Software Foundation. All rights reserved.
*
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE file.
*/
package org.apache.james.nntpserver.repository;
import java.util.*;
import java.io.*;
import org.apache.avalon.util.io.ExtensionFileFilter;
import org.apache.avalon.util.io.InvertedFileFilter;
import org.apache.avalon.util.io.AndFileFilter;
import org.apache.james.nntpserver.NNTPException;
import org.apache.james.nntpserver.DateSinceFileFilter;
import sun.misc.BASE64Encoder;
/**
* ArticleIDRepository: contains one file for each article.
* the file name is Base64 encoded article ID
* The first line of the file is '# <create date of file>
* the rest of line have <newsgroup name>=<article number>
*
* this would allow fast lookup of a message by message id.
* allow a process to iterate and sycnhronize messages with other NNTP Servers.
* this may be inefficient, so could be used for sanity checks and an alternate
* more efficient process could be used for synchronization.
*/
public class ArticleIDRepository {
private final File root;
private final String articleIDDomainSuffix;
private int counter = 0;
ArticleIDRepository(File root,String articleIDDomainSuffix) {
this.root = root;
this.articleIDDomainSuffix = articleIDDomainSuffix;
}
public File getPath() {
return root;
}
String generateArticleID() {
int idx = Math.abs(counter++);
String unique = Thread.currentThread().hashCode()+"."+
System.currentTimeMillis()+"."+idx;
return "<"+unique+"@"+articleIDDomainSuffix+">";
}
/** @param prop contains the newsgroup name and article number */
void addArticle(String articleID,Properties prop) throws IOException {
if ( articleID == null )
articleID = generateArticleID();
FileOutputStream fout = new FileOutputStream(getFileFromID(articleID));
prop.store(fout,new Date().toString());
fout.close();
}
File getFileFromID(String articleID) {
return new File(root,new BASE64Encoder().encode(articleID.getBytes()));
}
boolean isExists(String articleID) {
return ( articleID == null ) ? false : getFileFromID(articleID).exists();
}
NNTPArticle getArticle(NNTPRepository repo,String id) throws IOException {
File f = getFileFromID(id);
if ( f.exists() == false )
return null;
FileInputStream fin = new FileInputStream(f);
Properties prop = new Properties();
prop.load(fin);
fin.close();
Enumeration enum = prop.keys();
NNTPArticle article = null;
while ( article == null && enum.hasMoreElements() ) {
String groupName = (String)enum.nextElement();
int number = Integer.parseInt(prop.getProperty(groupName));
NNTPGroup group = repo.getGroup(groupName);
if ( group != null )
article = group.getArticle(number);
}
return article;
}
}
1.1 jakarta-james/src/org/apache/james/nntpserver/repository/NNTPArticle.java
Index: NNTPArticle.java
===================================================================
/*
* Copyright (C) The Apache Software Foundation. All rights reserved.
*
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE file.
*/
package org.apache.james.nntpserver.repository;
import java.io.*;
import javax.mail.internet.InternetHeaders;
import org.apache.james.nntpserver.NNTPException;
public interface NNTPArticle {
NNTPGroup getGroup();
int getArticleNumber();
String getUniqueID();
void writeArticle(PrintWriter wrt);
void writeHead(PrintWriter wrt);
void writeBody(PrintWriter wrt);
void writeOverview(PrintWriter wrt);
String getHeader(String header);
}
1.1 jakarta-james/src/org/apache/james/nntpserver/repository/NNTPArticleImpl.java
Index: NNTPArticleImpl.java
===================================================================
/*
* Copyright (C) The Apache Software Foundation. All rights reserved.
*
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE file.
*/
package org.apache.james.nntpserver.repository;
import java.io.*;
import javax.mail.internet.InternetHeaders;
import org.apache.james.nntpserver.NNTPException;
class NNTPArticleImpl implements NNTPArticle {
private final File f;
NNTPArticleImpl(File f) {
this.f = f;
}
public NNTPGroup getGroup() {
return new NNTPGroupImpl(f.getParentFile());
}
public int getArticleNumber() {
return Integer.parseInt(f.getName());
}
public String getUniqueID() {
try {
FileInputStream fin = new FileInputStream(f);
InternetHeaders headers = new InternetHeaders(fin);
String[] idheader = headers.getHeader("Message-Id");
fin.close();
return ( idheader.length > 0 ) ? idheader[0] : null;
} catch(Exception ex) { throw new NNTPException(ex); }
}
public void writeArticle(PrintWriter prt) {
try {
BufferedReader reader = new BufferedReader(new FileReader(f));
String line = null;
while ( ( line = reader.readLine() ) != null )
prt.println(line);
reader.close();
} catch(IOException ex) { throw new NNTPException(ex); }
}
public void writeHead(PrintWriter prt) {
try {
BufferedReader reader = new BufferedReader(new FileReader(f));
String line = null;
while ( ( line = reader.readLine() ) != null ) {
if ( line.trim().length() == 0 )
break;
prt.println(line);
}
reader.close();
} catch(IOException ex) { throw new NNTPException(ex); }
}
public void writeBody(PrintWriter prt) {
try {
BufferedReader reader = new BufferedReader(new FileReader(f));
String line = null;
boolean startWriting = false;
while ( ( line = reader.readLine() ) != null ) {
if ( startWriting )
prt.println(line);
else
startWriting = ( line.trim().length() == 0 );
}
reader.close();
} catch(IOException ex) { throw new NNTPException(ex); }
}
public void writeOverview(PrintWriter prt) {
try {
FileInputStream fin = new FileInputStream(f);
InternetHeaders hdr = new InternetHeaders(fin);
fin.close();
int articleNumber = getArticleNumber();
String subject = hdr.getHeader("Subject",null);
String author = hdr.getHeader("From",null);
String date = hdr.getHeader("Date",null);
String msgId = hdr.getHeader("Message-Id",null);
String references = hdr.getHeader("References",null);
long byteCount = f.length();
long lineCount = -1;
prt.print(articleNumber+"\t");
prt.print((subject==null?"":subject)+"\t");
prt.print((author==null?"":author)+"\t");
prt.print((date==null?"":date)+"\t");
prt.print((msgId==null?"":msgId)+"\t");
prt.print((references==null?"":references)+"\t");
prt.print(byteCount+"\t");
prt.println(lineCount+"");
} catch(Exception ex) { throw new NNTPException(ex); }
}
public String getHeader(String header) {
try {
FileInputStream fin = new FileInputStream(f);
InternetHeaders hdr = new InternetHeaders(fin);
fin.close();
return hdr.getHeader(header,null);
} catch(Exception ex) { throw new NNTPException(ex); }
}
}
1.1 jakarta-james/src/org/apache/james/nntpserver/repository/NNTPGroup.java
Index: NNTPGroup.java
===================================================================
/*
* Copyright (C) The Apache Software Foundation. All rights reserved.
*
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE file.
*/
package org.apache.james.nntpserver.repository;
import java.util.*;
import java.io.*;
import org.apache.avalon.util.io.ExtensionFileFilter;
import org.apache.avalon.util.io.InvertedFileFilter;
import org.apache.avalon.util.io.AndFileFilter;
import org.apache.james.nntpserver.NNTPException;
import org.apache.james.nntpserver.DateSinceFileFilter;
public interface NNTPGroup {
String getName();
String getDescription();
boolean isPostAllowed();
// the current article pointer. <0 indicates invalid/unknown value
int getCurrentArticleNumber();
void setCurrentArticleNumber(int articleNumber);
int getNumberOfArticles();
int getFirstArticleNumber();
int getLastArticleNumber();
NNTPArticle getCurrentArticle();
NNTPArticle getArticle(int number);
//NNTPArticle getArticleFromID(String id);
Iterator getArticlesSince(Date dt);
Iterator getArticles();
Object getPath();
}
1.1 jakarta-james/src/org/apache/james/nntpserver/repository/NNTPGroupImpl.java
Index: NNTPGroupImpl.java
===================================================================
/*
* Copyright (C) The Apache Software Foundation. All rights reserved.
*
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE file.
*/
package org.apache.james.nntpserver.repository;
import java.util.*;
import java.io.*;
import org.apache.avalon.util.io.ExtensionFileFilter;
import org.apache.avalon.util.io.InvertedFileFilter;
import org.apache.avalon.util.io.AndFileFilter;
import org.apache.james.nntpserver.NNTPException;
import org.apache.james.nntpserver.DateSinceFileFilter;
// group is reprensted by a directory.
// articles are stored in files with the name of file == article number
class NNTPGroupImpl implements NNTPGroup {
private final File root;
private int currentArticle = -1;
private int lastArticle;
private int firstArticle;
// an instance may collect range info once. This involves disk I/O
private boolean articleRangeInfoCollected = false;
NNTPGroupImpl(File root) {
this.root = root;
}
public String getName() {
return root.getName();
}
public String getDescription() {
return getName();
}
public boolean isPostAllowed() {
return true;
}
private void collectArticleRangeInfo() {
if ( articleRangeInfoCollected )
return;
String[] list = root.list();
//new InvertedFileFilter(new ExtensionFileFilter(".id")));
int first = -1;
int last = -1;
for ( int i = 0 ; i < list.length ; i++ ) {
int num = Integer.parseInt(list[i]);
if ( first == -1 || num < first )
first = num;
if ( num > last )
last = num;
}
firstArticle = Math.max(first,0);
lastArticle = Math.max(last,0);
articleRangeInfoCollected = true;
}
public int getNumberOfArticles() {
return getLastArticleNumber() - getFirstArticleNumber();
}
public int getFirstArticleNumber() {
collectArticleRangeInfo();
return firstArticle;
}
public int getLastArticleNumber() {
collectArticleRangeInfo();
return lastArticle;
}
public int getCurrentArticleNumber() {
collectArticleRangeInfo();
// this is not as per RFC, but this is not significant.
if ( currentArticle == -1 && firstArticle > 0 )
currentArticle = firstArticle;
return currentArticle;
}
public void setCurrentArticleNumber(int articleNumber) {
this.currentArticle = articleNumber;
}
public NNTPArticle getCurrentArticle() {
return getArticle(getCurrentArticleNumber());
}
public NNTPArticle getArticle(int number) {
File f = new File(root,number+"");
return f.exists() ? new NNTPArticleImpl(f) : null;
}
// public NNTPArticle getArticleFromID(String id) {
// if ( id == null )
// return null;
// int idx = id.indexOf('@');
// if ( idx != -1 )
// id = id.substring(0,idx);
// File f = new File(root,id+".id");
// if ( f.exists() == false )
// return null;
// try {
// FileInputStream fin = new FileInputStream(f);
// int count = fin.available();
// byte[] ba = new byte[count];
// fin.read(ba);
// fin.close();
// String str = new String(ba);
// int num = Integer.parseInt(str);
// return getArticle(num);
// } catch(IOException ioe) {
// throw new NNTPException("could not fectch article: "+id,ioe);
// }
// }
public Iterator getArticlesSince(Date dt) {
File[] f = root.listFiles(new AndFileFilter
(new DateSinceFileFilter(dt.getTime()),
new InvertedFileFilter(new ExtensionFileFilter(".id"))));
List list = new ArrayList();
for ( int i = 0 ; i < f.length ; i++ )
list.add(new NNTPArticleImpl(f[i]));
return list.iterator();
}
public Iterator getArticles() {
File[] f = root.listFiles();
//(new InvertedFileFilter(new ExtensionFileFilter(".id")));
List list = new ArrayList();
for ( int i = 0 ; i < f.length ; i++ )
list.add(new NNTPArticleImpl(f[i]));
return list.iterator();
}
public Object getPath() {
return root;
}
}
1.1 jakarta-james/src/org/apache/james/nntpserver/repository/NNTPLineReader.java
Index: NNTPLineReader.java
===================================================================
/*
* Copyright (C) The Apache Software Foundation. All rights reserved.
*
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE file.
*/
package org.apache.james.nntpserver.repository;
import java.io.*;
// this interface is used to read the data from client and stream it
// into server repository
public interface NNTPLineReader {
// reads a line of data.
// @return null indicates end of data
String readLine();
}
1.1 jakarta-james/src/org/apache/james/nntpserver/repository/NNTPLineReaderImpl.java
Index: NNTPLineReaderImpl.java
===================================================================
/*
* Copyright (C) The Apache Software Foundation. All rights reserved.
*
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE file.
*/
package org.apache.james.nntpserver.repository;
import java.io.*;
import org.apache.james.nntpserver.NNTPException;
public class NNTPLineReaderImpl implements NNTPLineReader {
private final BufferedReader reader;
public NNTPLineReaderImpl(BufferedReader reader) {
this.reader = reader;
}
public String readLine() {
try {
String line = reader.readLine();
// check for end of article.
if ( line.equals(".") )
line = null;
else if ( line.startsWith(".") )
line = line.substring(1,line.length());
return line;
} catch(IOException ioe) {
throw new NNTPException("could not create article",ioe);
}
}
}
1.1 jakarta-james/src/org/apache/james/nntpserver/repository/NNTPRepository.java
Index: NNTPRepository.java
===================================================================
/*
* Copyright (C) The Apache Software Foundation. All rights reserved.
*
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE file.
*/
package org.apache.james.nntpserver.repository;
import org.apache.avalon.Initializable;
import org.apache.avalon.configuration.Configurable;
import org.apache.avalon.configuration.Configuration;
import org.apache.avalon.configuration.ConfigurationException;
import org.apache.avalon.AbstractLoggable;
import java.util.*;
import java.io.*;
import org.apache.avalon.util.io.AndFileFilter;
import org.apache.avalon.util.io.DirectoryFileFilter;
import org.apache.oro.io.GlobFilenameFilter;
public interface NNTPRepository {
NNTPGroup getGroup(String groupName);
NNTPArticle getArticleFromID(String id);
void createArticle(NNTPLineReader reader);
Iterator getMatchedGroups(String wildmat);
Iterator getGroupsSince(Date dt);
Iterator getArticlesSince(Date dt);
boolean isReadOnly();
}
1.1 jakarta-james/src/org/apache/james/nntpserver/repository/NNTPRepositoryImpl.java
Index: NNTPRepositoryImpl.java
===================================================================
/*
* Copyright (C) The Apache Software Foundation. All rights reserved.
*
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE file.
*/
package org.apache.james.nntpserver.repository;
import org.apache.avalon.Initializable;
import org.apache.avalon.Component;
import org.apache.avalon.configuration.Configurable;
import org.apache.avalon.configuration.Configuration;
import org.apache.avalon.configuration.ConfigurationException;
import org.apache.avalon.AbstractLoggable;
import java.util.*;
import java.io.*;
import org.apache.avalon.util.io.AndFileFilter;
import org.apache.avalon.util.io.DirectoryFileFilter;
import org.apache.oro.io.GlobFilenameFilter;
import org.apache.james.nntpserver.NNTPException;
import org.apache.james.nntpserver.DateSinceFileFilter;
public class NNTPRepositoryImpl extends AbstractLoggable
implements NNTPRepository, Configurable, Initializable, Component
{
private boolean readOnly;
// the groups are located under this path.
private File rootPath;
// articles are temprorily written here and then sent to the spooler.
private File tempPath;
private NNTPSpooler spool;
private ArticleIDRepository articleIDRepo;
private String[] addGroups = null;
public void configure( Configuration configuration ) throws ConfigurationException {
//System.out.println(getClass().getName()+": configure");
//NNTPUtil.show(configuration,System.out);
readOnly = configuration.getChild("readOnly").getValueAsBoolean(false);
rootPath = NNTPUtil.getDirectory(configuration,"rootPath");
tempPath = NNTPUtil.getDirectory(configuration,"tempPath");
File articleIDPath = NNTPUtil.getDirectory(configuration,"articleIDPath");
String articleIDDomainSuffix = configuration.getChild("articleIDDomainSuffix")
.getValue("foo.bar.sho.boo");
articleIDRepo = new ArticleIDRepository(articleIDPath,articleIDDomainSuffix);
spool = (NNTPSpooler)NNTPUtil.createInstance
(configuration.getChild("spool"),getLogger(),
"org.apache.james.nntpserver.repository.NNTPSpooler");
spool.setRepository(this);
spool.setArticleIDRepository(articleIDRepo);
getLogger().debug("repository:readOnly="+readOnly);
getLogger().debug("repository:rootPath="+rootPath.getAbsolutePath());
getLogger().debug("repository:tempPath="+tempPath.getAbsolutePath());
configuration = configuration.getChild("newsgroups");
List addGroupsList = new ArrayList();
if ( configuration != null ) {
Configuration[] children = configuration.getChildren("newsgroup");
if ( children != null )
for ( int i = 0 ; i < children.length ; i++ )
addGroupsList.add(children[i].getValue());
}
addGroups = (String[])addGroupsList.toArray(new String[0]);
getLogger().debug("repository configuration done");
}
public void init() throws Exception {
//System.out.println(getClass().getName()+": init");
if ( rootPath.exists() == false )
rootPath.mkdirs();
for ( int i = 0 ; i < addGroups.length ; i++ ) {
File groupF = new File(rootPath,addGroups[i]);
if ( groupF.exists() == false )
groupF.mkdirs();
}
if ( tempPath.exists() == false )
tempPath.mkdirs();
File articleIDPath = articleIDRepo.getPath();
if ( articleIDPath.exists() == false )
articleIDPath.mkdirs();
if ( spool instanceof Initializable )
((Initializable)spool).init();
getLogger().debug("repository initialization done");
}
public boolean isReadOnly() {
return readOnly;
}
public NNTPGroup getGroup(String groupName) {
File f = new File(rootPath,groupName);
return ( f.exists() && f.isDirectory() ) ? new NNTPGroupImpl(f) : null;
}
public NNTPArticle getArticleFromID(String id) {
try {
return articleIDRepo.getArticle(this,id);
} catch(Exception ex) {
ex.printStackTrace();
return null;
}
// int idx = id.indexOf('@');
// String name = id.substring(0,idx);
// String groupname = id.substring(idx+1);
// NNTPGroup group = getGroup(groupname);
// return ( group == null ) ? null : group.getArticleFromID(name);
}
public void createArticle(NNTPLineReader reader) {
File f = new File(tempPath,System.currentTimeMillis()+"."+Math.random());
try {
FileOutputStream fout = new FileOutputStream(f);
PrintStream prt = new PrintStream(fout,true);
String line;
while ( ( line = reader.readLine() ) != null )
prt.println(line);
prt.close();
f.renameTo(new File(spool.getSpoolPath(),f.getName()));
} catch(IOException ex) {
throw new NNTPException("create article failed",ex);
}
}
public Iterator getMatchedGroups(String wildmat) {
File[] f = rootPath.listFiles(new AndFileFilter
(new DirectoryFileFilter(),new GlobFilenameFilter(wildmat)));
return getGroups(f);
}
private Iterator getGroups(File[] f) {
List list = new ArrayList();
for ( int i = 0 ; i < f.length ; i++ )
list.add(new NNTPGroupImpl(f[i]));
return list.iterator();
}
public Iterator getGroupsSince(Date dt) {
File[] f = rootPath.listFiles(new AndFileFilter
(new DirectoryFileFilter(),new DateSinceFileFilter(dt.getTime())));
return getGroups(f);
}
// gets the list of groups.
// creates iterator that concatenates the article iterators in the list of groups.
// there is at most one article iterator reference for all the groups
public Iterator getArticlesSince(final Date dt) {
final Iterator giter = getGroupsSince(dt);
return new Iterator() {
private Iterator iter = null;
public boolean hasNext() {
if ( iter == null ) {
if ( giter.hasNext() ) {
NNTPGroup group = (NNTPGroup)giter.next();
iter = group.getArticlesSince(dt);
}
else
return false;
}
if ( iter.hasNext() )
return true;
else {
iter = null;
return hasNext();
}
}
public Object next() {
return iter.next();
}
public void remove() {
throw new UnsupportedOperationException("remove not supported");
}
};
}
}
1.1 jakarta-james/src/org/apache/james/nntpserver/repository/NNTPSpooler.java
Index: NNTPSpooler.java
===================================================================
/*
* Copyright (C) The Apache Software Foundation. All rights reserved.
*
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE file.
*/
package org.apache.james.nntpserver.repository;
import java.io.*;
import org.apache.avalon.AbstractLoggable;
import org.apache.avalon.Initializable;
import org.apache.avalon.Loggable;
import org.apache.avalon.configuration.Configurable;
import org.apache.avalon.configuration.Configuration;
import org.apache.avalon.configuration.ConfigurationException;
import org.apache.james.util.Lock;
import org.apache.avalon.util.io.IOUtil;
import javax.mail.internet.MimeMessage;
import java.util.*;
// processes entries and sends to appropriate groups.
// eats up inappropriate entries.
class NNTPSpooler extends AbstractLoggable implements Configurable, Initializable {
private Worker[] worker;
private File spoolPath;
public void configure( Configuration configuration ) throws ConfigurationException {
//System.out.println(getClass().getName()+": configure");
//NNTPUtil.show(configuration,System.out);
spoolPath = NNTPUtil.getDirectory(configuration,"spoolPath");
int threadCount = configuration.getChild("threadCount").getValueAsInt(1);
int threadIdleTime = configuration.getChild("threadIdleTime").getValueAsInt(1000);
//String tgName=configuration.getChild("threadGroupName").getValue("NNTPSpooler");
worker = new Worker[threadCount];
for ( int i = 0 ; i < worker.length ; i++ ) {
worker[i] = new Worker(threadIdleTime,spoolPath);
if ( worker[i] instanceof Loggable )
((Loggable)worker[i]).setLogger(getLogger());
}
}
void setRepository(NNTPRepository repo) {
for ( int i = 0 ; i < worker.length ; i++ )
worker[i].setRepository(repo);
}
void setArticleIDRepository(ArticleIDRepository articleIDRepo) {
for ( int i = 0 ; i < worker.length ; i++ )
worker[i].setArticleIDRepository(articleIDRepo);
}
File getSpoolPath() {
if ( spoolPath.exists() == false )
spoolPath.mkdirs();
return spoolPath;
}
public void init() throws Exception {
//System.out.println(getClass().getName()+": init");
for ( int i = 0 ; i < worker.length ; i++ )
new Thread(worker[i],"NNTPSpool-"+i).start();
}
static class Worker extends AbstractLoggable implements Runnable {
private static final Lock lock = new Lock();
private final File spoolPath;
private final int threadIdleTime;
private ArticleIDRepository articleIDRepo;
private NNTPRepository repo;
Worker(int threadIdleTime,File spoolPath) {
this.threadIdleTime = threadIdleTime;
this.spoolPath = spoolPath;
}
void setArticleIDRepository(ArticleIDRepository articleIDRepo) {
this.articleIDRepo = articleIDRepo;
}
void setRepository(NNTPRepository repo) {
this.repo = repo;
}
// the threads race to grab a lock. if a thread wins it processes the article,
// if it loses it tries to lock and process the next article
public void run() {
getLogger().debug("in spool thread");
while ( Thread.currentThread().isInterrupted() == false ) {
String[] list = spoolPath.list();
getLogger().debug("Files to process: "+list.length);
for ( int i = 0 ; i < list.length ; i++ )
if ( lock.lock(list[i]) ) {
File f = new File(spoolPath,list[i]).getAbsoluteFile();
getLogger().debug("processing file: "+f.getAbsolutePath());
try {
process(f);
} catch(Exception ex) {
getLogger().debug("exception occured in processing file: "+
f.getAbsolutePath(),ex);
} finally {
lock.unlock(list[i]);
}
}
getLogger().debug(" Sleeping...");
// this is good for other non idle threads
try { Thread.currentThread().sleep(threadIdleTime);
} catch(InterruptedException ex) { }
}
}
private void process(File f) throws Exception {
getLogger().debug("process: "+f.getAbsolutePath()+","+f.getCanonicalPath());
final MimeMessage msg;
String articleID;
{ // get the message for copying to destination groups.
FileInputStream fin = new FileInputStream(f);
msg = new MimeMessage(null,fin);
fin.close();
// ensure no duplicates exist.
String[] idheader = msg.getHeader("Message-Id");
articleID = (idheader!=null && idheader.length>0?idheader[0]:null);
if ( articleIDRepo.isExists(articleID) ) {
getLogger().debug("message already exists: "+articleID);
f.delete();
return;
}
if ( articleID == null ) {
articleID = articleIDRepo.generateArticleID();
msg.setHeader("Message-Id", articleID);
FileOutputStream fout = new FileOutputStream(f);
msg.writeTo(fout);
fout.close();
}
}
String[] headers = msg.getHeader("Newsgroups");
Properties prop = new Properties();
for ( int i = 0 ; i < headers.length ; i++ ) {
getLogger().debug("copying message to group: "+headers[i]);
NNTPGroup group = repo.getGroup(headers[i]);
if ( group == null ) {
getLogger().debug("group not found: "+headers[i]);
continue;
}
int artNum = group.getLastArticleNumber();
File root = (File)group.getPath();
File articleFile = null;
// this ensures that different threads do not create articles with
// same number
while( true ) {
articleFile = new File(root,(artNum+1)+"");
if (articleFile.createNewFile())
break;
}
getLogger().debug("copying message to: "+articleFile.getAbsolutePath());
prop.setProperty(group.getName(),articleFile.getName());
FileInputStream fin = new FileInputStream(f);
FileOutputStream fout = new FileOutputStream(articleFile);
IOUtil.copy(fin,fout);
fin.close();
fout.close();
}
articleIDRepo.addArticle(articleID,prop);
boolean delSuccess = f.delete();
if ( delSuccess == false )
getLogger().error("could not delete file: "+f.getAbsolutePath());
}
} // class Worker
}
1.1 jakarta-james/src/org/apache/james/nntpserver/repository/NNTPUtil.java
Index: NNTPUtil.java
===================================================================
/*
* Copyright (C) The Apache Software Foundation. All rights reserved.
*
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE file.
*/
package org.apache.james.nntpserver.repository;
import java.io.*;
import org.apache.avalon.Initializable;
import org.apache.avalon.Loggable;
import org.apache.log.Logger;
import org.apache.avalon.configuration.Configurable;
import org.apache.avalon.configuration.Configuration;
import org.apache.avalon.configuration.ConfigurationException;
import org.apache.james.nntpserver.NNTPException;
// processes entries and sends to appropriate groups.
// eats up inappropriate entries.
public class NNTPUtil {
static File getDirectory(Configuration configuration,String child)
throws ConfigurationException
{
String str = configuration.getChild(child).getValue();
File f = new File(str);
if ( f.exists() && f.isFile() )
throw new NNTPException("Expecting '"+f.getAbsolutePath()+"' directory");
if ( f.exists() == false )
f.mkdirs();
return f;
}
public static Object createInstance(Configuration configuration,Logger logger,
String clsName) throws ConfigurationException
{
try { clsName = configuration.getAttribute("class");
} catch(ConfigurationException ce) { }
try {
Object obj = Class.forName(clsName).newInstance();
if ( obj instanceof Loggable )
((Loggable)obj).setLogger( logger );
if ( obj instanceof Configurable )
((Configurable)obj).configure(configuration.getChild("configuration"));
return obj;
} catch(Exception ex) {
ex.printStackTrace();
throw new ConfigurationException("spooler initialization failed",ex);
}
}
public static void show(Configuration conf,PrintStream prt) {
prt.println("conf.getClass="+conf.getClass().getName());
prt.println("name="+conf.getName());
Configuration[] children = conf.getChildren();
for ( int i = 0 ; i < children.length ; i++ )
prt.println(i+". "+children[i].getName());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: james-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: james-dev-help@jakarta.apache.org