You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ab...@apache.org on 2006/01/31 17:13:17 UTC

svn commit: r373853 [2/6] - in /lucene/nutch/trunk/src: java/org/apache/nutch/analysis/ java/org/apache/nutch/clustering/ java/org/apache/nutch/crawl/ java/org/apache/nutch/fetcher/ java/org/apache/nutch/fs/ java/org/apache/nutch/indexer/ java/org/apac...

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/indexer/NdfsDirectory.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/indexer/NdfsDirectory.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/indexer/NdfsDirectory.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/indexer/NdfsDirectory.java Tue Jan 31 08:08:58 2006
@@ -19,19 +19,22 @@
 import java.io.*;
 import org.apache.lucene.store.*;
 import org.apache.nutch.fs.*;
+import org.apache.nutch.util.NutchConf;
 
 /** Reads a Lucene index stored in NDFS. */
 public class NdfsDirectory extends Directory {
 
   private NutchFileSystem fs;
   private File directory;
+  private int ioFileBufferSize;
 
-  public NdfsDirectory(NutchFileSystem fs, File directory, boolean create)
+  public NdfsDirectory(NutchFileSystem fs, File directory, boolean create, NutchConf nutchConf)
     throws IOException {
 
     this.fs = fs;
     this.directory = directory;
-
+    this.ioFileBufferSize = nutchConf.getInt("io.file.buffer.size", 4096);
+    
     if (create) {
       create();
     }
@@ -103,12 +106,12 @@
     if (fs.exists(file) && !fs.delete(file))      // delete existing, if any
       throw new IOException("Cannot overwrite: " + file);
 
-    return new NdfsIndexOutput(file);
+    return new NdfsIndexOutput(file, this.ioFileBufferSize);
   }
 
 
   public IndexInput openInput(String name) throws IOException {
-    return new NdfsIndexInput(new File(directory, name));
+    return new NdfsIndexInput(new File(directory, name), this.ioFileBufferSize);
   }
 
   public Lock makeLock(final String name) {
@@ -152,7 +155,7 @@
     private class Descriptor {
       public NFSDataInputStream in;
       public long position;                       // cache of in.getPos()
-      public Descriptor(File file) throws IOException {
+      public Descriptor(File file, int ioFileBufferSize) throws IOException {
         this.in = fs.open(file);
       }
     }
@@ -161,8 +164,8 @@
     private final long length;
     private boolean isClone;
 
-    public NdfsIndexInput(File path) throws IOException {
-      descriptor = new Descriptor(path);
+    public NdfsIndexInput(File path, int ioFileBufferSize) throws IOException {
+      descriptor = new Descriptor(path,ioFileBufferSize);
       length = fs.getLength(path);
     }
 
@@ -211,7 +214,7 @@
   private class NdfsIndexOutput extends BufferedIndexOutput {
     private NFSDataOutputStream out;
 
-    public NdfsIndexOutput(File path) throws IOException {
+    public NdfsIndexOutput(File path, int ioFileBufferSize) throws IOException {
       out = fs.create(path);
     }
 

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/io/ArrayFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/io/ArrayFile.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/io/ArrayFile.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/io/ArrayFile.java Tue Jan 31 08:08:58 2006
@@ -46,8 +46,8 @@
     private LongWritable key = new LongWritable();
 
     /** Construct an array reader for the named file.*/
-    public Reader(NutchFileSystem nfs, String file) throws IOException {
-      super(nfs, file);
+    public Reader(NutchFileSystem nfs, String file, NutchConf nutchConf) throws IOException {
+      super(nfs, file, nutchConf);
     }
 
     /** Positions the reader before its <code>n</code>th value. */

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/io/MapFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/io/MapFile.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/io/MapFile.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/io/MapFile.java Tue Jan 31 08:08:58 2006
@@ -44,11 +44,8 @@
   /** The name of the data file. */
   public static final String DATA_FILE_NAME = "data";
 
-  /** Number of index entries to skip between each entry.  Zero by default.
-   * Setting this to values larger than zero can facilitate opening large map
-   * files using less memory. */
-  public static final int INDEX_SKIP =
-    NutchConf.get().getInt("io.map.index.skip", 0);
+
+
 
   protected MapFile() {}                          // no public ctor
 
@@ -160,6 +157,12 @@
   
   /** Provide access to an existing map. */
   public static class Reader {
+      
+    /** Number of index entries to skip between each entry.  Zero by default.
+    * Setting this to values larger than zero can facilitate opening large map
+    * files using less memory. */
+    private int INDEX_SKIP = 0;
+      
     private WritableComparator comparator;
 
     private DataOutputBuffer keyBuf = new DataOutputBuffer();
@@ -190,19 +193,20 @@
     public Class getValueClass() { return data.getValueClass(); }
 
     /** Construct a map reader for the named map.*/
-    public Reader(NutchFileSystem nfs, String dirName) throws IOException {
-      this(nfs, dirName, null);
+    public Reader(NutchFileSystem nfs, String dirName, NutchConf nutchConf) throws IOException {
+      this(nfs, dirName, null, nutchConf);
+      INDEX_SKIP = nutchConf.getInt("io.map.index.skip", 0);
     }
 
     /** Construct a map reader for the named map using the named comparator.*/
-    public Reader(NutchFileSystem nfs, String dirName, WritableComparator comparator)
+    public Reader(NutchFileSystem nfs, String dirName, WritableComparator comparator, NutchConf nutchConf)
       throws IOException {
       File dir = new File(dirName);
       File dataFile = new File(dir, DATA_FILE_NAME);
       File indexFile = new File(dir, INDEX_FILE_NAME);
 
       // open the data
-      this.data = new SequenceFile.Reader(nfs, dataFile.getPath());
+      this.data = new SequenceFile.Reader(nfs, dataFile.getPath(),  nutchConf);
       this.firstPosition = data.getPosition();
 
       if (comparator == null)
@@ -213,7 +217,7 @@
       this.getKey = this.comparator.newKey();
 
       // open the index
-      this.index = new SequenceFile.Reader(nfs, indexFile.getPath());
+      this.index = new SequenceFile.Reader(nfs, indexFile.getPath(), nutchConf);
     }
 
     private void readIndex() throws IOException {
@@ -416,7 +420,7 @@
    * @throws Exception
    */
   public static long fix(NutchFileSystem nfs, File dir,
-          Class keyClass, Class valueClass, boolean dryrun) throws Exception {
+          Class keyClass, Class valueClass, boolean dryrun, NutchConf nutchConf) throws Exception {
     String dr = (dryrun ? "[DRY RUN ] " : "");
     File data = new File(dir, DATA_FILE_NAME);
     File index = new File(dir, INDEX_FILE_NAME);
@@ -429,7 +433,7 @@
       // no fixing needed
       return -1;
     }
-    SequenceFile.Reader dataReader = new SequenceFile.Reader(nfs, data.toString());
+    SequenceFile.Reader dataReader = new SequenceFile.Reader(nfs, data.toString(), nutchConf);
     if (!dataReader.getKeyClass().equals(keyClass)) {
       throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() +
               ", got " + dataReader.getKeyClass().getName());
@@ -474,8 +478,10 @@
     String in = args[0];
     String out = args[1];
 
-    NutchFileSystem nfs = new LocalFileSystem();
-    MapFile.Reader reader = new MapFile.Reader(nfs, in);
+    NutchConf conf = new NutchConf();
+    int ioFileBufferSize = conf.getInt("io.file.buffer.size", 4096);
+    NutchFileSystem nfs = new LocalFileSystem(conf);
+    MapFile.Reader reader = new MapFile.Reader(nfs, in, conf);
     MapFile.Writer writer =
       new MapFile.Writer(nfs, out, reader.getKeyClass(), reader.getValueClass());
 

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/io/ObjectWritable.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/io/ObjectWritable.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/io/ObjectWritable.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/io/ObjectWritable.java Tue Jan 31 08:08:58 2006
@@ -25,13 +25,17 @@
 import java.io.*;
 import java.util.*;
 
+import org.apache.nutch.util.NutchConf;
+import org.apache.nutch.util.NutchConfigurable;
+
 /** A polymorphic Writable that writes an instance with it's class name.
  * Handles arrays, strings and primitive types without a Writable wrapper.
  */
-public class ObjectWritable implements Writable {
+public class ObjectWritable implements Writable, NutchConfigurable {
 
   private Class declaredClass;
   private Object instance;
+  private NutchConf nutchConf;
 
   public ObjectWritable() {}
   
@@ -57,7 +61,7 @@
   }
   
   public void readFields(DataInput in) throws IOException {
-    readObject(in, this);
+    readObject(in, this, this.nutchConf);
   }
   
   public void write(DataOutput out) throws IOException {
@@ -165,14 +169,14 @@
   
   /** Read a {@link Writable}, {@link String}, primitive type, or an array of
    * the preceding. */
-  public static Object readObject(DataInput in)
+  public static Object readObject(DataInput in, NutchConf nutchConf)
     throws IOException {
-    return readObject(in, null);
+    return readObject(in, null, nutchConf);
   }
     
   /** Read a {@link Writable}, {@link String}, primitive type, or an array of
    * the preceding. */
-  public static Object readObject(DataInput in, ObjectWritable objectWritable)
+  public static Object readObject(DataInput in, ObjectWritable objectWritable, NutchConf nutchConf)
     throws IOException {
     String className = UTF8.readString(in);
     Class declaredClass = (Class)PRIMITIVE_NAMES.get(className);
@@ -220,7 +224,7 @@
       int length = in.readInt();
       instance = Array.newInstance(declaredClass.getComponentType(), length);
       for (int i = 0; i < length; i++) {
-        Array.set(instance, i, readObject(in));
+        Array.set(instance, i, readObject(in, nutchConf));
       }
       
     } else if (declaredClass == String.class) {        // String
@@ -229,6 +233,9 @@
     } else {                                      // Writable
       try {
         Writable writable = (Writable)declaredClass.newInstance();
+        if(writable instanceof NutchConfigurable) {
+          ((NutchConfigurable) writable).setConf(nutchConf);
+        }
         writable.readFields(in);
         instance = writable;
       } catch (InstantiationException e) {
@@ -245,6 +252,14 @@
 
     return instance;
       
+  }
+
+  public void setConf(NutchConf conf) {
+    this.nutchConf = conf;
+  }
+
+  public NutchConf getConf() {
+    return this.nutchConf;
   }
   
 }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/io/SequenceFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/io/SequenceFile.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/io/SequenceFile.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/io/SequenceFile.java Tue Jan 31 08:08:58 2006
@@ -222,10 +222,12 @@
     private byte[] inflateIn = new byte[1024];
     private DataOutputBuffer inflateOut = new DataOutputBuffer();
     private Inflater inflater = new Inflater();
+    private NutchConf nutchConf;
 
     /** Open the named file. */
-    public Reader(NutchFileSystem nfs, String file) throws IOException {
-      this(nfs, file, NutchConf.get().getInt("io.file.buffer.size", 4096));
+    public Reader(NutchFileSystem nfs, String file, NutchConf nutchConf) throws IOException {
+      this(nfs, file, nutchConf.getInt("io.file.buffer.size", 4096));
+      this.nutchConf = nutchConf;
     }
 
     private Reader(NutchFileSystem nfs, String name, int bufferSize) throws IOException {
@@ -339,7 +341,9 @@
           }
           inBuf.reset(inflateOut.getData(), inflateOut.getLength());
         }
-
+        if(val instanceof NutchConfigurable) {
+          ((NutchConfigurable) val).setConf(this.nutchConf);
+        }
         val.readFields(inBuf);
 
         if (inBuf.getPosition() != inBuf.getLength())
@@ -386,9 +390,9 @@
 
     private void handleChecksumException(ChecksumException e)
       throws IOException {
-      if (NutchConf.get().getBoolean("io.skip.checksum.errors", false)) {
+      if (this.nutchConf.getBoolean("io.skip.checksum.errors", false)) {
         LOG.warning("Bad checksum at "+getPosition()+". Skipping entries.");
-        sync(getPosition()+NutchConf.get().getInt("io.bytes.per.checksum", 512));
+        sync(getPosition()+this.nutchConf.getInt("io.bytes.per.checksum", 512));
       } else {
         throw e;
       }
@@ -449,10 +453,6 @@
    * very efficient.  In particular, it should avoid allocating memory.
    */
   public static class Sorter {
-    private static final int FACTOR =
-      NutchConf.get().getInt("io.sort.factor", 100); 
-    private static final int MEGABYTES =
-      NutchConf.get().getInt("io.sort.mb", 100); 
 
     private WritableComparator comparator;
 
@@ -461,25 +461,30 @@
 
     private String outFile;
 
-    private int memory = MEGABYTES * 1024*1024;   // bytes
-    private int factor = FACTOR;                  // merged per pass
+    private int memory; // bytes
+    private int factor; // merged per pass
 
     private NutchFileSystem nfs = null;
 
     private Class keyClass;
     private Class valClass;
 
+    private NutchConf nutchConf;
+
     /** Sort and merge files containing the named classes. */
-    public Sorter(NutchFileSystem nfs, Class keyClass, Class valClass)  {
-      this(nfs, new WritableComparator(keyClass), valClass);
+    public Sorter(NutchFileSystem nfs, Class keyClass, Class valClass, NutchConf nutchConf)  {
+      this(nfs, new WritableComparator(keyClass), valClass, nutchConf);
     }
 
     /** Sort and merge using an arbitrary {@link WritableComparator}. */
-    public Sorter(NutchFileSystem nfs, WritableComparator comparator, Class valClass) {
+    public Sorter(NutchFileSystem nfs, WritableComparator comparator, Class valClass, NutchConf nutchConf) {
       this.nfs = nfs;
       this.comparator = comparator;
       this.keyClass = comparator.getKeyClass();
       this.valClass = valClass;
+      this.memory = nutchConf.getInt("io.sort.mb", 100) * 1024 * 1024;
+      this.factor = nutchConf.getInt("io.sort.factor", 100);
+      this.nutchConf = nutchConf;
     }
 
     /** Set the number of streams to merge at once.*/
@@ -513,7 +518,7 @@
 
     private int sortPass() throws IOException {
       LOG.fine("running sort pass");
-      SortPass sortPass = new SortPass();         // make the SortPass
+      SortPass sortPass = new SortPass(this.nutchConf);         // make the SortPass
       try {
         return sortPass.run();                    // run it
       } finally {
@@ -536,8 +541,8 @@
       private NFSDataOutputStream out;
         private String outName;
 
-      public SortPass() throws IOException {
-        in = new Reader(nfs, inFile);
+      public SortPass(NutchConf nutchConf) throws IOException {
+        in = new Reader(nfs, inFile, nutchConf);
       }
       
       public int run() throws IOException {

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/io/SetFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/io/SetFile.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/io/SetFile.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/io/SetFile.java Tue Jan 31 08:08:58 2006
@@ -51,14 +51,14 @@
   public static class Reader extends MapFile.Reader {
 
     /** Construct a set reader for the named set.*/
-    public Reader(NutchFileSystem nfs, String dirName) throws IOException {
-      super(nfs, dirName);
+    public Reader(NutchFileSystem nfs, String dirName, NutchConf nutchConf) throws IOException {
+      super(nfs, dirName, nutchConf);
     }
 
     /** Construct a set reader for the named set using the named comparator.*/
-    public Reader(NutchFileSystem nfs, String dirName, WritableComparator comparator)
+    public Reader(NutchFileSystem nfs, String dirName, WritableComparator comparator, NutchConf nutchConf)
       throws IOException {
-      super(nfs, dirName, comparator);
+      super(nfs, dirName, comparator, nutchConf);
     }
 
     // javadoc inherited

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/ipc/Client.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/ipc/Client.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/ipc/Client.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/ipc/Client.java Tue Jan 31 08:08:58 2006
@@ -35,6 +35,7 @@
 
 import org.apache.nutch.util.LogFormatter;
 import org.apache.nutch.util.NutchConf;
+import org.apache.nutch.util.NutchConfigurable;
 import org.apache.nutch.io.Writable;
 import org.apache.nutch.io.UTF8;
 
@@ -52,9 +53,10 @@
   private Hashtable connections = new Hashtable();
 
   private Class valueClass;                       // class of call values
-  private int timeout = NutchConf.get().getInt("ipc.client.timeout",10000);                    // timeout for calls
+  private int timeout ;// timeout for calls
   private int counter;                            // counter for call ids
   private boolean running = true;                 // true while client runs
+  private NutchConf nutchConf;
 
   /** A call waiting for a value. */
   private class Call {
@@ -160,6 +162,9 @@
             Writable value = makeValue();
             try {
               readingCall = call;
+              if(value instanceof NutchConfigurable) {
+                ((NutchConfigurable) value).setConf(nutchConf);
+              }
               value.readFields(in);                 // read value
             } finally {
               readingCall = null;
@@ -256,8 +261,10 @@
 
   /** Construct an IPC client whose values are of the given {@link Writable}
    * class. */
-  public Client(Class valueClass) {
+  public Client(Class valueClass, NutchConf nutchConf) {
     this.valueClass = valueClass;
+    this.timeout = nutchConf.getInt("ipc.client.timeout",10000);
+    this.nutchConf = nutchConf;
   }
 
   /** Stop all threads related to this client.  No further calls may be made

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/ipc/RPC.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/ipc/RPC.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/ipc/RPC.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/ipc/RPC.java Tue Jan 31 08:08:58 2006
@@ -56,10 +56,11 @@
 
 
   /** A method invocation, including the method name and its parameters.*/
-  private static class Invocation implements Writable {
+  private static class Invocation implements Writable, NutchConfigurable {
     private String methodName;
     private Class[] parameterClasses;
     private Object[] parameters;
+    private NutchConf nutchConf;
 
     public Invocation() {}
 
@@ -82,10 +83,9 @@
       methodName = UTF8.readString(in);
       parameters = new Object[in.readInt()];
       parameterClasses = new Class[parameters.length];
-
       ObjectWritable objectWritable = new ObjectWritable();
       for (int i = 0; i < parameters.length; i++) {
-        parameters[i] = ObjectWritable.readObject(in, objectWritable);
+        parameters[i] = ObjectWritable.readObject(in, objectWritable, this.nutchConf);
         parameterClasses[i] = objectWritable.getDeclaredClass();
       }
     }
@@ -111,15 +111,29 @@
       return buffer.toString();
     }
 
+    public void setConf(NutchConf conf) {
+      this.nutchConf = conf;
+    }
+
+    public NutchConf getConf() {
+      return this.nutchConf;
+    }
+
   }
 
-  private static Client CLIENT = new Client(ObjectWritable.class);
+  //TODO mb@media-style.com: static client or non-static client?
+  private static Client CLIENT;
 
   private static class Invoker implements InvocationHandler {
     private InetSocketAddress address;
 
-    public Invoker(InetSocketAddress address) {
+    public Invoker(InetSocketAddress address, NutchConf nutchConf) {
       this.address = address;
+      CLIENT = (Client) nutchConf.getObject(Client.class.getName());
+      if(CLIENT == null) {
+          CLIENT = new Client(ObjectWritable.class, nutchConf);
+          nutchConf.setObject(Client.class.getName(), CLIENT);
+      }
     }
 
     public Object invoke(Object proxy, Method method, Object[] args)
@@ -132,21 +146,25 @@
 
   /** Construct a client-side proxy object that implements the named protocol,
    * talking to a server at the named address. */
-  public static Object getProxy(Class protocol, InetSocketAddress addr) {
+  public static Object getProxy(Class protocol, InetSocketAddress addr, NutchConf nutchConf) {
     return Proxy.newProxyInstance(protocol.getClassLoader(),
                                   new Class[] { protocol },
-                                  new Invoker(addr));
+                                  new Invoker(addr, nutchConf));
   }
 
   /** Expert: Make multiple, parallel calls to a set of servers. */
   public static Object[] call(Method method, Object[][] params,
-                              InetSocketAddress[] addrs)
+                              InetSocketAddress[] addrs, NutchConf nutchConf)
     throws IOException {
 
     Invocation[] invocations = new Invocation[params.length];
     for (int i = 0; i < params.length; i++)
       invocations[i] = new Invocation(method, params[i]);
-    
+    CLIENT = (Client) nutchConf.getObject(Client.class.getName());
+    if(CLIENT == null) {
+        CLIENT = new Client(ObjectWritable.class, nutchConf);
+        nutchConf.setObject(Client.class.getName(), CLIENT);
+    }
     Writable[] wrappedValues = CLIENT.call(invocations, addrs);
     
     if (method.getReturnType() == Void.TYPE) {
@@ -165,16 +183,16 @@
 
   /** Construct a server for a protocol implementation instance listening on a
    * port. */
-  public static Server getServer(final Object instance, final int port) {
-    return getServer(instance, port, 1, false);
+  public static Server getServer(final Object instance, final int port, NutchConf nutchConf) {
+    return getServer(instance, port, 1, false, nutchConf);
   }
 
   /** Construct a server for a protocol implementation instance listening on a
    * port. */
   public static Server getServer(final Object instance, final int port,
                                  final int numHandlers,
-                                 final boolean verbose) {
-    return new Server(port, Invocation.class, numHandlers) {
+                                 final boolean verbose, NutchConf nutchConf) {
+    return new Server(port, Invocation.class, numHandlers, nutchConf) {
         
         Class implementation = instance.getClass();
 

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/ipc/Server.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/ipc/Server.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/ipc/Server.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/ipc/Server.java Tue Jan 31 08:08:58 2006
@@ -53,7 +53,7 @@
   private int maxQueuedCalls;                     // max number of queued calls
   private Class paramClass;                       // class of call parameters
 
-  private int timeout = NutchConf.get().getInt("ipc.client.timeout",10000);
+  private int timeout;
 
   private boolean running = true;                 // true while server runs
   private LinkedList callQueue = new LinkedList(); // queued calls
@@ -228,11 +228,12 @@
    * be of the named class.  The <code>handlerCount</handlerCount> determines
    * the number of handler threads that will be used to process calls.
    */
-  protected Server(int port, Class paramClass, int handlerCount) {
+  protected Server(int port, Class paramClass, int handlerCount, NutchConf nutchConf) {
     this.port = port;
     this.paramClass = paramClass;
     this.handlerCount = handlerCount;
     this.maxQueuedCalls = handlerCount;
+    this.timeout = nutchConf.getInt("ipc.client.timeout",10000); 
   }
 
   /** Sets the timeout used for network i/o. */

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/CombiningCollector.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/CombiningCollector.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/CombiningCollector.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/CombiningCollector.java Tue Jan 31 08:08:58 2006
@@ -28,8 +28,7 @@
  * then invokes the combiner's reduce method to merge some values before
  * they're transferred to a reduce node. */
 class CombiningCollector implements OutputCollector {
-  private static final int LIMIT
-    = NutchConf.get().getInt("mapred.combine.buffer.size", 100000);
+  private int limit;
 
   private int count = 0;
   private Map keyToValues;                        // the buffer
@@ -46,6 +45,7 @@
     this.reporter = reporter;
     this.combiner = (Reducer)job.newInstance(job.getCombinerClass());
     this.keyToValues = new TreeMap(job.getOutputKeyComparator());
+    this.limit = job.getInt("mapred.combine.buffer.size", 100000);
   }
 
   public synchronized void collect(WritableComparable key, Writable value)
@@ -63,7 +63,7 @@
 
     count++;
 
-    if (count >= LIMIT) {                         // time to flush
+    if (count >= this.limit) {                         // time to flush
       flush();
     }
   }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/JobClient.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/JobClient.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/JobClient.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/JobClient.java Tue Jan 31 08:08:58 2006
@@ -166,28 +166,30 @@
     JobSubmissionProtocol jobSubmitClient;
     NutchFileSystem fs = null;
 
+    private NutchConf nutchConf;
     static Random r = new Random();
 
     /**
      * Build a job client, connect to the default job tracker
      */
     public JobClient(NutchConf conf) throws IOException {
+      this.nutchConf = conf;
       String tracker = conf.get("mapred.job.tracker", "local");
       if ("local".equals(tracker)) {
-        this.jobSubmitClient = new LocalJobRunner();
+        this.jobSubmitClient = new LocalJobRunner(conf);
       } else {
         this.jobSubmitClient = (JobSubmissionProtocol) 
           RPC.getProxy(JobSubmissionProtocol.class,
-                       JobTracker.getAddress(conf));
+                       JobTracker.getAddress(conf), conf);
       }
     }
   
     /**
      * Build a job client, connect to the indicated job tracker.
      */
-    public JobClient(InetSocketAddress jobTrackAddr) throws IOException {
+    public JobClient(InetSocketAddress jobTrackAddr, NutchConf nutchConf) throws IOException {
         this.jobSubmitClient = (JobSubmissionProtocol) 
-            RPC.getProxy(JobSubmissionProtocol.class, jobTrackAddr);
+            RPC.getProxy(JobSubmissionProtocol.class, jobTrackAddr, nutchConf);
     }
 
 
@@ -207,7 +209,7 @@
     public synchronized NutchFileSystem getFs() throws IOException {
       if (this.fs == null) {
         String fsName = jobSubmitClient.getFilesystemName();
-        this.fs = NutchFileSystem.getNamed(fsName);
+        this.fs = NutchFileSystem.getNamed(fsName, this.nutchConf);
       }
       return fs;
     }
@@ -234,7 +236,7 @@
         //
 
         // Create a number of filenames in the JobTracker's fs namespace
-        File submitJobDir = new File(JobConf.getSystemDir(), "submit_" + Integer.toString(Math.abs(r.nextInt()),36));
+        File submitJobDir = new File(job.getSystemDir(), "submit_" + Integer.toString(Math.abs(r.nextInt()), 36));
         File submitJobFile = new File(submitJobDir, "job.xml");
         File submitJarFile = new File(submitJobDir, "job.jar");
 
@@ -349,7 +351,7 @@
         }
 
         // Submit the request
-        JobClient jc = new JobClient(NutchConf.get());
+        JobClient jc = new JobClient(new NutchConf());
         try {
             if (submitJobFile != null) {
                 RunningJob job = jc.submitJob(submitJobFile);

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/JobConf.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/JobConf.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/JobConf.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/JobConf.java Tue Jan 31 08:08:58 2006
@@ -50,9 +50,15 @@
  * of input files, and where the output files should be written. */
 public class JobConf extends NutchConf {
 
-  /** Construct a map/reduce job configuration.
-   *
-   * @param conf a NutchConf whose settings will be inherited.
+  public JobConf() {
+    super();
+  }
+    
+  /**
+   * Construct a map/reduce job configuration.
+   * 
+   * @param conf
+   *          a NutchConf whose settings will be inherited.
    */
   public JobConf(NutchConf conf) {
     super(conf);
@@ -81,32 +87,32 @@
   public String getJar() { return get("mapred.jar"); }
   public void setJar(String jar) { set("mapred.jar", jar); }
 
-  public static File getSystemDir() {
-    return new File(NutchConf.get().get("mapred.system.dir",
+  public File getSystemDir() {
+    return new File(get("mapred.system.dir",
                                         "/tmp/nutch/mapred/system"));
   }
 
-  public static String[] getLocalDirs() throws IOException {
-    return NutchConf.get().getStrings("mapred.local.dir");
+  public String[] getLocalDirs() throws IOException {
+    return getStrings("mapred.local.dir");
   }
 
-  public static void deleteLocalFiles() throws IOException {
+  public void deleteLocalFiles() throws IOException {
     String[] localDirs = getLocalDirs();
     for (int i = 0; i < localDirs.length; i++) {
-      FileUtil.fullyDelete(new File(localDirs[i]));
+      FileUtil.fullyDelete(new File(localDirs[i]), this);
     }
   }
 
-  public static void deleteLocalFiles(String subdir) throws IOException {
+  public void deleteLocalFiles(String subdir) throws IOException {
     String[] localDirs = getLocalDirs();
     for (int i = 0; i < localDirs.length; i++) {
-      FileUtil.fullyDelete(new File(localDirs[i], subdir));
+      FileUtil.fullyDelete(new File(localDirs[i], subdir), this);
     }
   }
 
   /** Constructs a local file name.  Files are distributed among configured
    * local directories.*/
-  public static File getLocalFile(String subdir, String name)
+  public File getLocalFile(String subdir, String name)
     throws IOException {
     String[] localDirs = getLocalDirs();
     String path = subdir + File.separator + name;

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/JobTracker.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/JobTracker.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/JobTracker.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/JobTracker.java Tue Jan 31 08:08:58 2006
@@ -210,25 +210,29 @@
     NutchFileSystem fs;
     File systemDir;
 
+    private NutchConf nutchConf;
+
     /**
      * Start the JobTracker process, listen on the indicated port
      */
     JobTracker(NutchConf conf) throws IOException {
         // This is a directory of temporary submission files.  We delete it
         // on startup, and can delete any files that we're done with
-        this.systemDir = JobConf.getSystemDir();
-        this.fs = NutchFileSystem.get();
+        JobConf jobConf = new JobConf(conf);
+        this.systemDir = jobConf.getSystemDir();
+        this.nutchConf = conf;
+        this.fs = NutchFileSystem.get(conf);
         FileUtil.fullyDelete(fs, systemDir);
         fs.mkdirs(systemDir);
 
         // Same with 'localDir' except it's always on the local disk.
-        JobConf.deleteLocalFiles(SUBDIR);
+        jobConf.deleteLocalFiles(SUBDIR);
 
         // Set ports, start RPC servers, etc.
         InetSocketAddress addr = getAddress(conf);
         this.localMachine = addr.getHostName();
         this.port = addr.getPort();
-        this.interTrackerServer = RPC.getServer(this,addr.getPort(),10,false);
+        this.interTrackerServer = RPC.getServer(this, addr.getPort(), 10, false, conf);
         this.interTrackerServer.start();
 	Properties p = System.getProperties();
 	for (Iterator it = p.keySet().iterator(); it.hasNext(); ) {
@@ -529,7 +533,7 @@
      * task allocation.)
      */
     JobInProgress createJob(String jobFile) throws IOException {
-        JobInProgress job = new JobInProgress(jobFile);
+        JobInProgress job = new JobInProgress(jobFile, this.nutchConf);
         jobs.put(job.getProfile().getJobId(), job);
 
         boolean error = true;
@@ -576,6 +580,7 @@
         long startTime;
         long finishTime;
         String deleteUponCompletion = null;
+        private NutchConf nutchConf;
 
         /**
          * Create a 'JobInProgress' object, which contains both JobProfile
@@ -583,13 +588,13 @@
          * of the JobTracker.  But JobInProgress adds info that's useful for
          * the JobTracker alone.
          */
-        public JobInProgress(String jobFile) throws IOException {
+        public JobInProgress(String jobFile, NutchConf nutchConf) throws IOException {
             String jobid = createJobId();
             String url = "http://" + localMachine + ":" + infoPort + "/jobdetails.jsp?jobid=" + jobid;
             this.profile = new JobProfile(jobid, jobFile, url);
             this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.RUNNING);
 
-            this.localJobFile = JobConf.getLocalFile(SUBDIR, jobid+".xml");
+            this.localJobFile = new JobConf(nutchConf).getLocalFile(SUBDIR, jobid + ".xml");
             fs.copyToLocalFile(new File(jobFile), localJobFile);
 
             JobConf jd = new JobConf(localJobFile);
@@ -602,6 +607,7 @@
             if (jobFile.startsWith(systemDir.getPath())) {
                 this.deleteUponCompletion = jobFile;
             }
+            this.nutchConf = nutchConf;
         }
 
         /**
@@ -613,7 +619,7 @@
 
             // construct input splits
             JobConf jd = new JobConf(localJobFile);
-            NutchFileSystem fs = NutchFileSystem.get();
+            NutchFileSystem fs = NutchFileSystem.get(nutchConf);
             FileSplit[] splits =
               jd.getInputFormat().getSplits(fs, jd, numMapTasks);
 
@@ -634,6 +640,7 @@
             for (int i = 0; i < numMapTasks; i++) {
                 mapIds[i] = createMapTaskId();
                 Task t = new MapTask(jobFile, mapIds[i], splits[i]);
+                t.setConf(this.nutchConf);
 
                 incompleteMapTasks.put(mapIds[i], t);
                 taskToJobMap.put(mapIds[i], jobid);
@@ -643,6 +650,7 @@
             for (int i = 0; i < numReduceTasks; i++) {
                 String taskid = createReduceTaskId();
                 Task t = new ReduceTask(jobFile, taskid, mapIds, i);
+                t.setConf(this.nutchConf);
                 reducesToLaunch.add(t);
                 taskToJobMap.put(taskid, jobid);
             }
@@ -1067,6 +1075,6 @@
           System.exit(-1);
         }
 
-        startTracker(NutchConf.get());
+        startTracker(new NutchConf());
     }
 }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/LocalJobRunner.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/LocalJobRunner.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/LocalJobRunner.java Tue Jan 31 08:08:58 2006
@@ -31,6 +31,7 @@
 
   private NutchFileSystem fs;
   private HashMap jobs = new HashMap();
+  private NutchConf nutchConf;
 
   private class Job extends Thread
     implements TaskUmbilicalProtocol {
@@ -40,15 +41,19 @@
 
     private JobStatus status = new JobStatus();
     private ArrayList mapIds = new ArrayList();
+    private MapOutputFile mapoutputFile;
 
-    public Job(String file) throws IOException {
+    public Job(String file, NutchConf nutchConf) throws IOException {
       this.file = file;
       this.id = "job_" + newId();
+      this.mapoutputFile = new MapOutputFile();
+      this.mapoutputFile.setConf(nutchConf);
 
-      File localFile = JobConf.getLocalFile("localRunner", id+".xml");
+      File localFile = new JobConf(nutchConf).getLocalFile("localRunner", id+".xml");
       fs.copyToLocalFile(new File(file), localFile);
       this.job = new JobConf(localFile);
-
+      
+      
       this.status.jobid = id;
       this.status.runState = JobStatus.RUNNING;
 
@@ -67,6 +72,7 @@
         for (int i = 0; i < splits.length; i++) {
           mapIds.add("map_" + newId());
           MapTask map = new MapTask(file, (String)mapIds.get(i), splits[i]);
+          map.setConf(job);
           map.run(job, this);
         }
 
@@ -74,12 +80,12 @@
         String reduceId = "reduce_" + newId();
         for (int i = 0; i < mapIds.size(); i++) {
           String mapId = (String)mapIds.get(i);
-          File mapOut = MapOutputFile.getOutputFile(mapId, 0);
-          File reduceIn = MapOutputFile.getInputFile(mapId, reduceId);
+          File mapOut = this.mapoutputFile.getOutputFile(mapId, 0);
+          File reduceIn = this.mapoutputFile.getInputFile(mapId, reduceId);
           reduceIn.getParentFile().mkdirs();
-          if (!NutchFileSystem.getNamed("local").rename(mapOut, reduceIn))
+          if (!NutchFileSystem.getNamed("local", this.job).rename(mapOut, reduceIn))
             throw new IOException("Couldn't rename " + mapOut);
-          MapOutputFile.removeAll(mapId);
+          this.mapoutputFile.removeAll(mapId);
         }
 
         // run a single reduce task
@@ -87,8 +93,9 @@
           new ReduceTask(file, reduceId,
                          (String[])mapIds.toArray(new String[0]),
                          0);
+        reduce.setConf(job);
         reduce.run(job, this);
-        MapOutputFile.removeAll(reduceId);
+        this.mapoutputFile.removeAll(reduceId);
         
         this.status.runState = JobStatus.SUCCEEDED;
 
@@ -138,14 +145,15 @@
 
   }
 
-  public LocalJobRunner() throws IOException {
-    this.fs = NutchFileSystem.get();
+  public LocalJobRunner(NutchConf nutchConf) throws IOException {
+    this.fs = NutchFileSystem.get(nutchConf);
+    this.nutchConf = nutchConf;
   }
 
   // JobSubmissionProtocol methods
 
   public JobStatus submitJob(String jobFile) throws IOException {
-    return new Job(jobFile).status;
+    return new Job(jobFile, this.nutchConf).status;
   }
 
   public void killJob(String id) {

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapFileOutputFormat.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapFileOutputFormat.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapFileOutputFormat.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapFileOutputFormat.java Tue Jan 31 08:08:58 2006
@@ -25,6 +25,7 @@
 import org.apache.nutch.io.MapFile;
 import org.apache.nutch.io.WritableComparable;
 import org.apache.nutch.io.Writable;
+import org.apache.nutch.util.NutchConf;
 
 public class MapFileOutputFormat implements OutputFormat {
 
@@ -52,7 +53,7 @@
   }
 
   /** Open the output generated by this format. */
-  public static MapFile.Reader[] getReaders(NutchFileSystem fs, File dir)
+  public static MapFile.Reader[] getReaders(NutchFileSystem fs, File dir, NutchConf nutchConf)
     throws IOException {
     File[] names = fs.listFiles(dir);
     
@@ -61,7 +62,7 @@
     
     MapFile.Reader[] parts = new MapFile.Reader[names.length];
     for (int i = 0; i < names.length; i++) {
-      parts[i] = new MapFile.Reader(fs, names[i].toString());
+      parts[i] = new MapFile.Reader(fs, names[i].toString(), nutchConf);
     }
     return parts;
   }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapOutputFile.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapOutputFile.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapOutputFile.java Tue Jan 31 08:08:58 2006
@@ -24,19 +24,20 @@
 import org.apache.nutch.util.*;
 
 /** A local file to be transferred via the {@link MapOutputProtocol}. */ 
-public class MapOutputFile implements Writable {
+public class MapOutputFile implements Writable, NutchConfigurable {
   private String mapTaskId;
   private String reduceTaskId;
   private int partition;
   
   /** Permits reporting of file copy progress. */
-  public static interface ProgressReporter {
+  public interface ProgressReporter {
     void progress(float progress) throws IOException;
   }
 
-  private static final ThreadLocal REPORTERS = new ThreadLocal();
+  private ThreadLocal REPORTERS = new ThreadLocal();
+  private JobConf jobConf;
   
-  public static void setProgressReporter(ProgressReporter reporter) {
+  public void setProgressReporter(ProgressReporter reporter) {
     REPORTERS.set(reporter);
   }
 
@@ -44,36 +45,37 @@
    * @param mapTaskId a map task id
    * @param partition a reduce partition
    */
-  public static File getOutputFile(String mapTaskId, int partition)
+  public File getOutputFile(String mapTaskId, int partition)
     throws IOException {
-    return JobConf.getLocalFile(mapTaskId, "part-"+partition+".out");
+    return this.jobConf.getLocalFile(mapTaskId, "part-"+partition+".out");
   }
 
   /** Create a local reduce input file name.
    * @param mapTaskId a map task id
    * @param reduceTaskId a reduce task id
    */
-  public static File getInputFile(String mapTaskId, String reduceTaskId)
+  public File getInputFile(String mapTaskId, String reduceTaskId)
     throws IOException {
-    return JobConf.getLocalFile(reduceTaskId, mapTaskId+".out");
+    return this.jobConf.getLocalFile(reduceTaskId, mapTaskId+".out");
   }
 
   /** Removes all of the files related to a task. */
-  public static void removeAll(String taskId) throws IOException {
-    JobConf.deleteLocalFiles(taskId);
+  public void removeAll(String taskId) throws IOException {
+    this.jobConf.deleteLocalFiles(taskId);
   }
 
   /** 
    * Removes all contents of temporary storage.  Called upon 
    * startup, to remove any leftovers from previous run.
    */
-  public static void cleanupStorage() throws IOException {
-    JobConf.deleteLocalFiles();
+  public void cleanupStorage() throws IOException {
+    this.jobConf.deleteLocalFiles();
   }
 
   /** Construct a file for transfer. */
-  public MapOutputFile() {
+  public MapOutputFile() { 
   }
+  
   public MapOutputFile(String mapTaskId, String reduceTaskId, int partition) {
     this.mapTaskId = mapTaskId;
     this.reduceTaskId = reduceTaskId;
@@ -88,7 +90,7 @@
     // write the length-prefixed file content to the wire
     File file = getOutputFile(mapTaskId, partition);
     out.writeLong(file.length());
-    NFSDataInputStream in = NutchFileSystem.getNamed("local").open(file);
+    NFSDataInputStream in = NutchFileSystem.getNamed("local", this.jobConf).open(file);
     try {
       byte[] buffer = new byte[8192];
       int l;
@@ -112,7 +114,7 @@
     long length = in.readLong();
     float progPerByte = 1.0f / length;
     long unread = length;
-    NFSDataOutputStream out = NutchFileSystem.getNamed("local").create(file);
+    NFSDataOutputStream out = NutchFileSystem.getNamed("local", this.jobConf).create(file);
     try {
       byte[] buffer = new byte[8192];
       while (unread > 0) {
@@ -127,6 +129,14 @@
     } finally {
       out.close();
     }
+  }
+
+  public void setConf(NutchConf conf) {
+    this.jobConf = new JobConf(conf);
+  }
+
+  public NutchConf getConf() {
+    return this.jobConf;
   }
 
 }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapTask.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapTask.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapTask.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapTask.java Tue Jan 31 08:08:58 2006
@@ -27,6 +27,8 @@
 /** A Map task. */
 public class MapTask extends Task {
   private FileSplit split;
+  private MapOutputFile mapOutputFile;
+  private NutchConf nutchConf;
 
   public MapTask() {}
 
@@ -36,7 +38,7 @@
   }
 
   public TaskRunner createRunner(TaskTracker tracker) {
-    return new MapTaskRunner(this, tracker);
+    return new MapTaskRunner(this, tracker, this.nutchConf);
   }
 
   public FileSplit getSplit() { return split; }
@@ -62,8 +64,8 @@
     try {
       for (int i = 0; i < partitions; i++) {
         outs[i] =
-          new SequenceFile.Writer(NutchFileSystem.getNamed("local"),
-                                  MapOutputFile.getOutputFile(getTaskId(), i).toString(),
+          new SequenceFile.Writer(NutchFileSystem.getNamed("local", job),
+                                  this.mapOutputFile.getOutputFile(getTaskId(), i).toString(),
                                   job.getOutputKeyClass(),
                                   job.getOutputValueClass());
       }
@@ -91,7 +93,7 @@
 
       final RecordReader rawIn =                  // open input
         job.getInputFormat().getRecordReader
-        (NutchFileSystem.get(), split, job, reporter);
+        (NutchFileSystem.get(job), split, job, reporter);
 
       RecordReader in = new RecordReader() {      // wrap in progress reporter
           private float perByte = 1.0f /(float)split.getLength();
@@ -130,6 +132,16 @@
       }
     }
     done(umbilical);
+  }
+
+  public void setConf(NutchConf conf) {
+    this.nutchConf = conf;
+    this.mapOutputFile = new MapOutputFile();
+    this.mapOutputFile.setConf(conf);
+  }
+
+  public NutchConf getConf() {
+    return this.nutchConf;
   }
   
 }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapTaskRunner.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapTaskRunner.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapTaskRunner.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapTaskRunner.java Tue Jan 31 08:08:58 2006
@@ -26,17 +26,22 @@
 
 /** Runs a map task. */
 class MapTaskRunner extends TaskRunner {
-  public MapTaskRunner(Task task, TaskTracker tracker) {
-    super(task, tracker);
+  private MapOutputFile mapOutputFile;
+
+  public MapTaskRunner(Task task, TaskTracker tracker, NutchConf nutchConf) {
+    super(task, tracker, nutchConf);
+    this.mapOutputFile = new MapOutputFile();
+    this.mapOutputFile.setConf(nutchConf);
   }
+  
   /** Delete any temporary files from previous failed attempts. */
   public void prepare() throws IOException {
-    MapOutputFile.removeAll(getTask().getTaskId());
+    this.mapOutputFile.removeAll(getTask().getTaskId());
   }
 
   /** Delete all of the temporary map output files. */
   public void close() throws IOException {
     LOG.info(getTask()+" done; removing files.");
-    MapOutputFile.removeAll(getTask().getTaskId());
+    this.mapOutputFile.removeAll(getTask().getTaskId());
   }
 }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/ReduceTask.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/ReduceTask.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/ReduceTask.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/ReduceTask.java Tue Jan 31 08:08:58 2006
@@ -37,6 +37,8 @@
   private Progress appendPhase = getProgress().addPhase("append");
   private Progress sortPhase  = getProgress().addPhase("sort");
   private Progress reducePhase = getProgress().addPhase("reduce");
+  private NutchConf nutchConf;
+  private MapOutputFile mapOutputFile;
 
   public ReduceTask() {}
 
@@ -48,7 +50,7 @@
   }
 
   public TaskRunner createRunner(TaskTracker tracker) {
-    return new ReduceTaskRunner(this, tracker);
+    return new ReduceTaskRunner(this, tracker, this.nutchConf);
   }
 
   public String[] getMapTaskIds() { return mapTaskIds; }
@@ -157,7 +159,8 @@
     Class keyClass = job.getOutputKeyClass();
     Class valueClass = job.getOutputValueClass();
     Reducer reducer = (Reducer)job.newInstance(job.getReducerClass());
-    NutchFileSystem lfs = NutchFileSystem.getNamed("local");
+    reducer.configure(job);
+    NutchFileSystem lfs = NutchFileSystem.getNamed("local", job);
 
     copyPhase.complete();                         // copy is already complete
 
@@ -175,12 +178,12 @@
 
       for (int i = 0; i < mapTaskIds.length; i++) {
         File partFile =
-          MapOutputFile.getInputFile(mapTaskIds[i], getTaskId());
+          this.mapOutputFile.getInputFile(mapTaskIds[i], getTaskId());
         float progPerByte = 1.0f / lfs.getLength(partFile);
         Progress phase = appendPhase.phase();
         phase.setStatus(partFile.toString());
         SequenceFile.Reader in =
-          new SequenceFile.Reader(lfs, partFile.toString());
+          new SequenceFile.Reader(lfs, partFile.toString(), job);
         try {
           int keyLen;
           while((keyLen = in.next(buffer)) > 0) {
@@ -227,7 +230,7 @@
 
       // sort the input file
       SequenceFile.Sorter sorter =
-        new SequenceFile.Sorter(lfs, comparator, valueClass);
+        new SequenceFile.Sorter(lfs, comparator, valueClass, job);
       sorter.sort(file, sortedFile);              // sort
       lfs.delete(new File(file));                 // remove unsorted
 
@@ -240,7 +243,7 @@
     // make output collector
     String name = getOutputName(getPartition());
     final RecordWriter out =
-      job.getOutputFormat().getRecordWriter(NutchFileSystem.get(), job, name);
+      job.getOutputFormat().getRecordWriter(NutchFileSystem.get(job), job, name);
     OutputCollector collector = new OutputCollector() {
         public void collect(WritableComparable key, Writable value)
           throws IOException {
@@ -250,7 +253,7 @@
       };
     
     // apply reduce function
-    SequenceFile.Reader in = new SequenceFile.Reader(lfs, sortedFile);
+    SequenceFile.Reader in = new SequenceFile.Reader(lfs, sortedFile, job);
     Reporter reporter = getReporter(umbilical, getProgress());
     long length = lfs.getLength(new File(sortedFile));
     try {
@@ -266,7 +269,6 @@
       lfs.delete(new File(sortedFile));           // remove sorted
       out.close(reporter);
     }
-
     done(umbilical);
   }
 
@@ -282,6 +284,16 @@
 
   private static synchronized String getOutputName(int partition) {
     return "part-" + NUMBER_FORMAT.format(partition);
+  }
+
+  public void setConf(NutchConf conf) {
+    this.nutchConf = conf;
+    this.mapOutputFile = new MapOutputFile();
+    this.mapOutputFile.setConf(conf);
+  }
+
+  public NutchConf getConf() {
+    return this.nutchConf;
   }
 
 }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java Tue Jan 31 08:08:58 2006
@@ -28,15 +28,18 @@
 class ReduceTaskRunner extends TaskRunner {
   private static final Logger LOG =
     LogFormatter.getLogger("org.apache.nutch.mapred.ReduceTaskRunner");
+  private MapOutputFile mapOutputFile;
 
-  public ReduceTaskRunner(Task task, TaskTracker tracker) {
-    super(task, tracker);
+  public ReduceTaskRunner(Task task, TaskTracker tracker, NutchConf nutchConf) {
+    super(task, tracker, nutchConf);
+    this.mapOutputFile = new MapOutputFile();
+    this.mapOutputFile.setConf(nutchConf);
   }
 
   /** Assemble all of the map output files. */
   public void prepare() throws IOException {
     ReduceTask task = ((ReduceTask)getTask());
-    MapOutputFile.removeAll(task.getTaskId());    // cleanup from failures
+    this.mapOutputFile.removeAll(task.getTaskId());    // cleanup from failures
     String[] mapTaskIds = task.getMapTaskIds();
     final Progress copyPhase = getTask().getProgress().phase();
 
@@ -74,9 +77,9 @@
         InetSocketAddress addr =
           new InetSocketAddress(loc.getHost(), loc.getPort());
         MapOutputProtocol client =
-          (MapOutputProtocol)RPC.getProxy(MapOutputProtocol.class, addr);
+          (MapOutputProtocol)RPC.getProxy(MapOutputProtocol.class, addr, this.nutchConf);
 
-        MapOutputFile.setProgressReporter(new MapOutputFile.ProgressReporter(){
+        this.mapOutputFile.setProgressReporter(new MapOutputFile.ProgressReporter() {
             public void progress(float progress) {
               copyPhase.phase().set(progress);
               try {
@@ -104,7 +107,7 @@
                   +loc.getMapTaskId()+" from "+addr,
                   e);
         } finally {
-          MapOutputFile.setProgressReporter(null);
+          this.mapOutputFile.setProgressReporter(null);
         }
         
       }
@@ -116,7 +119,7 @@
   /** Delete all of the temporary map output files. */
   public void close() throws IOException {
     getTask().getProgress().setStatus("closed");
-    MapOutputFile.removeAll(getTask().getTaskId());
+    this.mapOutputFile.removeAll(getTask().getTaskId());
   }
 
 }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java Tue Jan 31 08:08:58 2006
@@ -54,7 +54,7 @@
 
     reporter.setStatus(split.toString());
 
-    return new SequenceFileRecordReader(fs, split);
+    return new SequenceFileRecordReader(job, split);
   }
 
 }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/SequenceFileOutputFormat.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/SequenceFileOutputFormat.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/SequenceFileOutputFormat.java Tue Jan 31 08:08:58 2006
@@ -26,6 +26,7 @@
 import org.apache.nutch.io.SequenceFile;
 import org.apache.nutch.io.WritableComparable;
 import org.apache.nutch.io.Writable;
+import org.apache.nutch.util.NutchConf;
 
 public class SequenceFileOutputFormat implements OutputFormat {
 
@@ -53,8 +54,9 @@
   }
 
   /** Open the output generated by this format. */
-  public static SequenceFile.Reader[] getReaders(NutchFileSystem fs, File dir)
+  public static SequenceFile.Reader[] getReaders(NutchConf nutchConf, File dir)
     throws IOException {
+    NutchFileSystem fs = NutchFileSystem.get(nutchConf);
     File[] names = fs.listFiles(dir);
     
     // sort names, so that hash partitioning works
@@ -62,7 +64,7 @@
     
     SequenceFile.Reader[] parts = new SequenceFile.Reader[names.length];
     for (int i = 0; i < names.length; i++) {
-      parts[i] = new SequenceFile.Reader(fs, names[i].toString());
+      parts[i] = new SequenceFile.Reader(fs, names[i].toString(), nutchConf);
     }
     return parts;
   }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java Tue Jan 31 08:08:58 2006
@@ -26,6 +26,7 @@
 import org.apache.nutch.io.WritableComparable;
 import org.apache.nutch.io.LongWritable;
 import org.apache.nutch.io.UTF8;
+import org.apache.nutch.util.NutchConf;
 
 /** An {@link RecordReader} for {@link SequenceFile}s. */
 public class SequenceFileRecordReader implements RecordReader {
@@ -33,9 +34,10 @@
   private long end;
   private boolean more = true;
 
-  public SequenceFileRecordReader(NutchFileSystem fs, FileSplit split)
+  public SequenceFileRecordReader(NutchConf nutchConf, FileSplit split)
     throws IOException {
-    this.in = new SequenceFile.Reader(fs, split.getFile().toString());
+    NutchFileSystem fs = NutchFileSystem.get(nutchConf);
+    this.in = new SequenceFile.Reader(fs, split.getFile().toString(), nutchConf);
     this.end = split.getStart() + split.getLength();
 
     if (split.getStart() > in.getPosition())

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/Task.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/Task.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/Task.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/Task.java Tue Jan 31 08:08:58 2006
@@ -24,14 +24,13 @@
 import java.util.*;
 
 /** Base class for tasks. */
-public abstract class Task implements Writable {
+public abstract class Task implements Writable, NutchConfigurable {
   ////////////////////////////////////////////
   // Fields
   ////////////////////////////////////////////
 
   private String jobFile;                         // job configuration file
   private String taskId;                          // unique, includes job id
-
   ////////////////////////////////////////////
   // Constructors
   ////////////////////////////////////////////

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskRunner.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskRunner.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskRunner.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskRunner.java Tue Jan 31 08:08:58 2006
@@ -37,9 +37,12 @@
   private Task t;
   private TaskTracker tracker;
 
-  public TaskRunner(Task t, TaskTracker tracker) {
+  protected NutchConf nutchConf;
+
+  public TaskRunner(Task t, TaskTracker tracker, NutchConf nutchConf) {
     this.t = t;
     this.tracker = tracker;
+    this.nutchConf = nutchConf;
   }
 
   public Task getTask() { return t; }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTracker.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTracker.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTracker.java Tue Jan 31 08:08:58 2006
@@ -33,13 +33,9 @@
  * @author Mike Cafarella
  *******************************************************/
 public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutputProtocol, Runnable {
-    private static final int MAX_CURRENT_TASKS = 
-    NutchConf.get().getInt("mapred.tasktracker.tasks.maximum", 2);
-
+    private int maxCurrentTask; 
     static final long WAIT_FOR_DONE = 3 * 1000;
-
-    static final long TASK_TIMEOUT = 
-      NutchConf.get().getInt("mapred.task.timeout", 10* 60 * 1000);
+    private long taskTimeout; 
 
     static final int STALE_STATE = 1;
 
@@ -68,6 +64,7 @@
     static final String SUBDIR = "taskTracker";
 
     private NutchConf fConf;
+    private MapOutputFile mapOutputFile;
 
     /**
      * Start with the local machine name, and the default JobTracker
@@ -82,6 +79,10 @@
     public TaskTracker(InetSocketAddress jobTrackAddr, NutchConf conf) throws IOException {
         this.fConf = conf;
         this.jobTrackAddr = jobTrackAddr;
+        this.maxCurrentTask = conf.getInt("mapred.tasktracker.tasks.maximum", 2);
+        this.taskTimeout = conf.getInt("mapred.task.timeout", 10* 60 * 1000);
+        this.mapOutputFile = new MapOutputFile();
+        this.mapOutputFile.setConf(conf);
         initialize();
     }
 
@@ -94,7 +95,7 @@
         this.taskTrackerName = "tracker_" + (Math.abs(r.nextInt()) % 100000);
         this.localHostname = InetAddress.getLocalHost().getHostName();
 
-        JobConf.deleteLocalFiles(SUBDIR);
+        new JobConf(this.fConf).deleteLocalFiles(SUBDIR);
 
         // Clear out state tables
         this.tasks = new TreeMap();
@@ -107,7 +108,7 @@
         // RPC initialization
         while (true) {
             try {
-                this.taskReportServer = RPC.getServer(this, this.taskReportPort, MAX_CURRENT_TASKS, false);
+                this.taskReportServer = RPC.getServer(this, this.taskReportPort, this.maxCurrentTask, false, this.fConf);
                 this.taskReportServer.start();
                 break;
             } catch (BindException e) {
@@ -118,7 +119,7 @@
         }
         while (true) {
             try {
-                this.mapOutputServer = RPC.getServer(this, this.mapOutputPort, MAX_CURRENT_TASKS, false);
+                this.mapOutputServer = RPC.getServer(this, this.mapOutputPort, this.maxCurrentTask, false, this.fConf);
                 this.mapOutputServer.start();
                 break;
             } catch (BindException e) {
@@ -128,10 +129,10 @@
         }
 
         // Clear out temporary files that might be lying around
-        MapOutputFile.cleanupStorage();
+        this.mapOutputFile.cleanupStorage();
         this.justStarted = true;
 
-        this.jobClient = (InterTrackerProtocol) RPC.getProxy(InterTrackerProtocol.class, jobTrackAddr);
+        this.jobClient = (InterTrackerProtocol) RPC.getProxy(InterTrackerProtocol.class, jobTrackAddr, this.fConf);
     }
 
     /**
@@ -164,7 +165,7 @@
         }
 
         // Clear local storage
-        MapOutputFile.cleanupStorage();
+        this.mapOutputFile.cleanupStorage();
     }
 
     /**
@@ -213,7 +214,7 @@
             // Xmit the heartbeat
             //
             if (justStarted) {
-                this.fs = NutchFileSystem.getNamed(jobClient.getFilesystemName());
+                this.fs = NutchFileSystem.getNamed(jobClient.getFilesystemName(), this.fConf);
             }
             
             int resultCode = jobClient.emitHeartbeat(new TaskTrackerStatus(taskTrackerName, localHostname, mapOutputPort, taskReports), justStarted);
@@ -226,10 +227,10 @@
             //
             // Check if we should create a new Task
             //
-            if (runningTasks.size() < MAX_CURRENT_TASKS) {
+            if (runningTasks.size() < this.maxCurrentTask) {
                 Task t = jobClient.pollForNewTask(taskTrackerName);
                 if (t != null) {
-                    TaskInProgress tip = new TaskInProgress(t);
+                    TaskInProgress tip = new TaskInProgress(t, this.fConf);
                     synchronized (this) {
                       tasks.put(t.getTaskId(), tip);
                       runningTasks.put(t.getTaskId(), tip);
@@ -245,7 +246,7 @@
                 for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) {
                     TaskInProgress tip = (TaskInProgress) it.next();
                     if ((tip.getRunState() == TaskStatus.RUNNING) &&
-                        (System.currentTimeMillis() - tip.getLastProgressReport() > TASK_TIMEOUT)) {
+                        (System.currentTimeMillis() - tip.getLastProgressReport() > this.taskTimeout)) {
                         LOG.info("Task " + tip.getTask().getTaskId() + " timed out.  Killing.");
                         tip.reportDiagnosticInfo("Timed out.");
                         tip.killAndCleanup();
@@ -321,13 +322,15 @@
         TaskRunner runner;
         boolean done = false;
         boolean wasKilled = false;
+        private JobConf jobConf;
 
         /**
          */
-        public TaskInProgress(Task task) throws IOException {
+        public TaskInProgress(Task task, NutchConf nutchConf) throws IOException {
             this.task = task;
             this.lastProgressReport = System.currentTimeMillis();
-            JobConf.deleteLocalFiles(SUBDIR+File.separator+task.getTaskId());
+            this.jobConf = new JobConf(nutchConf);
+            this.jobConf.deleteLocalFiles(SUBDIR + File.separator + task.getTaskId());
             localizeTask(task);
         }
 
@@ -337,9 +340,9 @@
          */
         void localizeTask(Task t) throws IOException {
             File localJobFile =
-              JobConf.getLocalFile(SUBDIR+File.separator+t.getTaskId(), "job.xml");
+              this.jobConf.getLocalFile(SUBDIR+File.separator+t.getTaskId(), "job.xml");
             File localJarFile =
-              JobConf.getLocalFile(SUBDIR+File.separator+t.getTaskId(), "job.jar");
+              this.jobConf.getLocalFile(SUBDIR+File.separator+t.getTaskId(), "job.jar");
 
             String jobFile = t.getJobFile();
             fs.copyToLocalFile(new File(jobFile), localJobFile);
@@ -501,7 +504,7 @@
                 runner.close();
             } catch (IOException ie) {
             }
-            JobConf.deleteLocalFiles(SUBDIR+File.separator+task.getTaskId());
+            this.jobConf.deleteLocalFiles(SUBDIR + File.separator + task.getTaskId());
         }
     }
 
@@ -509,11 +512,14 @@
     // MapOutputProtocol
     /////////////////////////////////////////////////////////////////
     public MapOutputFile getFile(String mapTaskId, String reduceTaskId,
-                                 IntWritable partition) {
-        return new MapOutputFile(mapTaskId, reduceTaskId, partition.get());
-    }
+      IntWritable partition) {
+    MapOutputFile mapOutputFile = new MapOutputFile(mapTaskId, reduceTaskId,
+        partition.get());
+    mapOutputFile.setConf(this.fConf);
+    return mapOutputFile;
+  }
 
-    /////////////////////////////////////////////////////////////////
+    // ///////////////////////////////////////////////////////////////
     // TaskUmbilicalProtocol
     /////////////////////////////////////////////////////////////////
     /**
@@ -586,16 +592,17 @@
           LogFormatter.showTime(false);
           LOG.info("Child starting");
 
+          NutchConf nutchConf = new NutchConf();
           int port = Integer.parseInt(args[0]);
           String taskid = args[1];
           TaskUmbilicalProtocol umbilical =
             (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
-                                                new InetSocketAddress(port));
+                                                new InetSocketAddress(port), nutchConf);
             
           Task task = umbilical.getTask(taskid);
           JobConf job = new JobConf(task.getJobFile());
 
-          NutchConf.get().addConfResource(new File(task.getJobFile()));
+          nutchConf.addConfResource(new File(task.getJobFile()));
 
           startPinging(umbilical, taskid);        // start pinging parent
 
@@ -646,7 +653,7 @@
             System.exit(-1);
         }
 
-        TaskTracker tt = new TaskTracker(NutchConf.get());
+        TaskTracker tt = new TaskTracker(new NutchConf());
         tt.run();
     }
 }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/demo/Grep.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/demo/Grep.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/demo/Grep.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/demo/Grep.java Tue Jan 31 08:08:58 2006
@@ -45,7 +45,7 @@
       System.exit(-1);
     }
 
-    NutchConf defaults = NutchConf.get();
+    NutchConf defaults = new NutchConf();
 
     File tempDir =
       new File("grep-temp-"+

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/DataNode.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/DataNode.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/DataNode.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/DataNode.java Tue Jan 31 08:08:58 2006
@@ -85,8 +85,8 @@
      * Needs a directory to find its data (and config info)
      */
     public DataNode(String machineName, File datadir, InetSocketAddress nameNodeAddr, NutchConf conf) throws IOException {
-        this.namenode = (DatanodeProtocol) RPC.getProxy(DatanodeProtocol.class, nameNodeAddr);
-        this.data = new FSDataset(datadir);
+        this.namenode = (DatanodeProtocol) RPC.getProxy(DatanodeProtocol.class, nameNodeAddr, conf);
+        this.data = new FSDataset(datadir, conf);
 
         ServerSocket ss = null;
         int tmpPort = conf.getInt("ndfs.datanode.port", 50010);
@@ -748,6 +748,6 @@
      */
     public static void main(String args[]) throws IOException {
         LogFormatter.setShowThreadIDs(true);
-        runAndWait(NutchConf.get());
+        runAndWait(new NutchConf());
     }
 }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSConstants.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSConstants.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSConstants.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSConstants.java Tue Jan 31 08:08:58 2006
@@ -107,8 +107,8 @@
     public static long LEASE_PERIOD = 60 * 1000;
     public static int READ_TIMEOUT = 60 * 1000;
 
-    public static int BUFFER_SIZE =
-      NutchConf.get().getInt("io.file.buffer.size", 4096);
+    //TODO mb@media-style.com: should be nutchConf injected?
+    public static final int BUFFER_SIZE = new NutchConf().getInt("io.file.buffer.size", 4096);
 
 }
 

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSDataset.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSDataset.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSDataset.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSDataset.java Tue Jan 31 08:08:58 2006
@@ -191,7 +191,7 @@
     /**
      * An FSDataset has a directory where it loads its data files.
      */
-    public FSDataset(File dir) throws IOException {
+    public FSDataset(File dir, NutchConf nutchConf) throws IOException {
         this.dirpath = dir.getCanonicalPath();
         this.data = new File(dir, "data");
         if (! data.exists()) {
@@ -199,7 +199,7 @@
         }
         this.tmp = new File(dir, "tmp");
         if (tmp.exists()) {
-            FileUtil.fullyDelete(tmp);
+            FileUtil.fullyDelete(tmp, nutchConf);
         }
         this.tmp.mkdirs();
         this.dirTree = new FSDir(data);

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSNamesystem.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSNamesystem.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSNamesystem.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSNamesystem.java Tue Jan 31 08:08:58 2006
@@ -34,27 +34,7 @@
 public class FSNamesystem implements FSConstants {
     public static final Logger LOG = LogFormatter.getLogger("org.apache.nutch.fs.FSNamesystem");
 
-    // DESIRED_REPLICATION is how many copies we try to have at all times
-    final static int DESIRED_REPLICATION =
-      NutchConf.get().getInt("ndfs.replication", 3);
-
-    // The maximum number of replicates we should allow for a single block
-    final static int MAX_REPLICATION = DESIRED_REPLICATION;
-
-    // How many outgoing replication streams a given node should have at one time
-    final static int MAX_REPLICATION_STREAMS = NutchConf.get().getInt("ndfs.max-repl-streams", 2);
-
-    // MIN_REPLICATION is how many copies we need in place or else we disallow the write
-    final static int MIN_REPLICATION = 1;
-
-    // HEARTBEAT_RECHECK is how often a datanode sends its hearbeat
-    final static long HEARTBEAT_RECHECK = 1000;
-
-    // Whether we should use disk-availability info when determining target
-    final static boolean USE_AVAILABILITY = NutchConf.get().getBoolean("ndfs.availability.allocation", false);
-
-    private boolean allowSameHostTargets =
-        NutchConf.get().getBoolean("test.ndfs.same.host.targets.allowed", false);
+   
 
     //
     // Stores the correct file name hierarchy
@@ -148,18 +128,44 @@
     Daemon hbthread = null, lmthread = null;
     boolean fsRunning = true;
     long systemStart = 0;
+    private NutchConf nutchConf;
+
+    //  DESIRED_REPLICATION is how many copies we try to have at all times
+    private int desiredReplication;
+    //  The maximum number of replicates we should allow for a single block
+    private int maxReplication;
+    //  How many outgoing replication streams a given node should have at one time
+    private int maxReplicationStreams;
+    // MIN_REPLICATION is how many copies we need in place or else we disallow the write
+    private int minReplication;
+    // HEARTBEAT_RECHECK is how often a datanode sends its hearbeat
+    private int heartBeatRecheck;
+   //  Whether we should use disk-availability info when determining target
+    private boolean useAvailability;
+
+    private boolean allowSameHostTargets;
     
     /**
      * dir is where the filesystem directory state 
      * is stored
      */
-    public FSNamesystem(File dir) throws IOException {
+    public FSNamesystem(File dir, NutchConf nutchConf) throws IOException {
         this.dir = new FSDirectory(dir);
         this.hbthread = new Daemon(new HeartbeatMonitor());
         this.lmthread = new Daemon(new LeaseMonitor());
         hbthread.start();
         lmthread.start();
         this.systemStart = System.currentTimeMillis();
+        this.nutchConf = nutchConf;
+        
+        this.desiredReplication = nutchConf.getInt("ndfs.replication", 3);
+        this.maxReplication = desiredReplication;
+        this.maxReplicationStreams = nutchConf.getInt("ndfs.max-repl-streams", 2);
+        this.minReplication = 1;
+        this.heartBeatRecheck= 1000;
+        this.useAvailability = nutchConf.getBoolean("ndfs.availability.allocation", false);
+        this.allowSameHostTargets =
+           nutchConf.getBoolean("test.ndfs.same.host.targets.allowed", false);
     }
 
     /** Close down this filesystem manager.
@@ -244,10 +250,10 @@
                 results = new Object[2];
 
                 // Get the array of replication targets 
-                DatanodeInfo targets[] = chooseTargets(DESIRED_REPLICATION, null);
-                if (targets.length < MIN_REPLICATION) {
+                DatanodeInfo targets[] = chooseTargets(this.desiredReplication, null);
+                if (targets.length < this.minReplication) {
                     LOG.warning("Target-length is " + targets.length +
-                        ", below MIN_REPLICATION (" + MIN_REPLICATION + ")");
+                        ", below MIN_REPLICATION (" + this.minReplication+ ")");
                     return null;
                 }
 
@@ -300,8 +306,8 @@
             //
             if (checkFileProgress(src)) {
                 // Get the array of replication targets 
-                DatanodeInfo targets[] = chooseTargets(DESIRED_REPLICATION, null);
-                if (targets.length < MIN_REPLICATION) {
+                DatanodeInfo targets[] = chooseTargets(this.desiredReplication, null);
+                if (targets.length < this.minReplication) {
                     return null;
                 }
 
@@ -411,9 +417,9 @@
                 // the blocks.
                 for (int i = 0; i < pendingBlocks.length; i++) {
                     TreeSet containingNodes = (TreeSet) blocksMap.get(pendingBlocks[i]);
-                    if (containingNodes.size() < DESIRED_REPLICATION) {
+                    if (containingNodes.size() < this.desiredReplication) {
                         synchronized (neededReplications) {
-                            LOG.info("Completed file " + src + ", at holder " + holder + ".  There is/are only " + containingNodes.size() + " copies of block " + pendingBlocks[i] + ", so replicating up to " + DESIRED_REPLICATION);
+                            LOG.info("Completed file " + src + ", at holder " + holder + ".  There is/are only " + containingNodes.size() + " copies of block " + pendingBlocks[i] + ", so replicating up to " + this.desiredReplication);
                             neededReplications.add(pendingBlocks[i]);
                         }
                     }
@@ -449,7 +455,7 @@
         for (Iterator it = v.iterator(); it.hasNext(); ) {
             Block b = (Block) it.next();
             TreeSet containingNodes = (TreeSet) blocksMap.get(b);
-            if (containingNodes == null || containingNodes.size() < MIN_REPLICATION) {
+            if (containingNodes == null || containingNodes.size() < this.minReplication) {
                 return false;
             }
         }
@@ -814,7 +820,7 @@
             while (fsRunning) {
                 heartbeatCheck();
                 try {
-                    Thread.sleep(HEARTBEAT_RECHECK);
+                    Thread.sleep(heartBeatRecheck);
                 } catch (InterruptedException ie) {
                 }
             }
@@ -946,10 +952,10 @@
 
         synchronized (neededReplications) {
             if (dir.isValidBlock(block)) {
-                if (containingNodes.size() >= DESIRED_REPLICATION) {
+                if (containingNodes.size() >= this.desiredReplication) {
                     neededReplications.remove(block);
                     pendingReplications.remove(block);
-                } else if (containingNodes.size() < DESIRED_REPLICATION) {
+                } else if (containingNodes.size() < this.desiredReplication) {
                     if (! neededReplications.contains(block)) {
                         neededReplications.add(block);
                     }
@@ -968,8 +974,8 @@
                         nonExcess.add(cur);
                     }
                 }
-                if (nonExcess.size() > MAX_REPLICATION) {
-                    chooseExcessReplicates(nonExcess, block, MAX_REPLICATION);    
+                if (nonExcess.size() > this.maxReplication) {
+                    chooseExcessReplicates(nonExcess, block, this.maxReplication);    
                 }
             }
         }
@@ -1032,7 +1038,7 @@
         // necessary.  In that case, put block on a possibly-will-
         // be-replicated list.
         //
-        if (dir.isValidBlock(block) && (containingNodes.size() < DESIRED_REPLICATION)) {
+        if (dir.isValidBlock(block) && (containingNodes.size() < this.desiredReplication)) {
             synchronized (neededReplications) {
                 neededReplications.add(block);
             }
@@ -1147,7 +1153,7 @@
                     //
                     // We can only reply with 'maxXfers' or fewer blocks
                     //
-                    if (scheduledXfers >= MAX_REPLICATION_STREAMS - xmitsInProgress) {
+                    if (scheduledXfers >= this.maxReplicationStreams - xmitsInProgress) {
                         break;
                     }
 
@@ -1157,7 +1163,7 @@
                     } else {
                         TreeSet containingNodes = (TreeSet) blocksMap.get(block);
                         if (containingNodes.contains(srcNode)) {
-                            DatanodeInfo targets[] = chooseTargets(Math.min(DESIRED_REPLICATION - containingNodes.size(), MAX_REPLICATION_STREAMS - xmitsInProgress), containingNodes);
+                            DatanodeInfo targets[] = chooseTargets(Math.min(this.desiredReplication - containingNodes.size(), this.maxReplicationStreams - xmitsInProgress), containingNodes);
                             if (targets.length > 0) {
                                 // Build items to return
                                 replicateBlocks.add(block);
@@ -1181,7 +1187,7 @@
                         DatanodeInfo targets[] = (DatanodeInfo[]) replicateTargetSets.elementAt(i);
                         TreeSet containingNodes = (TreeSet) blocksMap.get(block);
 
-                        if (containingNodes.size() + targets.length >= DESIRED_REPLICATION) {
+                        if (containingNodes.size() + targets.length >= this.desiredReplication) {
                             neededReplications.remove(block);
                             pendingReplications.add(block);
                         }
@@ -1313,7 +1319,7 @@
                 " forbidden2.size()=" +
                 ( forbidden2 != null ? forbidden2.size() : 0 ));
             return null;
-        } else if (! USE_AVAILABILITY) {
+        } else if (! this.useAvailability) {
             int target = r.nextInt(targetList.size());
             return (DatanodeInfo) targetList.elementAt(target);
         } else {

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/NDFSClient.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/NDFSClient.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/NDFSClient.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/NDFSClient.java Tue Jan 31 08:08:58 2006
@@ -42,8 +42,8 @@
 
     /** Create a new NDFSClient connected to the given namenode server.
      */
-    public NDFSClient(InetSocketAddress nameNodeAddr) {
-        this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class, nameNodeAddr);
+    public NDFSClient(InetSocketAddress nameNodeAddr, NutchConf nutchConf) {
+        this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class, nameNodeAddr, nutchConf);
         this.clientName = "NDFSClient_" + r.nextInt();
         this.leaseChecker = new Daemon(new LeaseChecker());
         this.leaseChecker.start();

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/NameNode.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/NameNode.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/NameNode.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/NameNode.java Tue Jan 31 08:08:58 2006
@@ -48,21 +48,20 @@
     /**
      * Create a NameNode at the default location
      */
-    public NameNode() throws IOException {
-        this(new File(NutchConf.get().get("ndfs.name.dir",
+    public NameNode(NutchConf nutchConf) throws IOException {
+        this(new File(nutchConf.get("ndfs.name.dir",
                                           "/tmp/nutch/ndfs/name")),
              DataNode.createSocketAddr
-             (NutchConf.get().get("fs.default.name", "local")).getPort());
+             (nutchConf.get("fs.default.name", "local")).getPort(), nutchConf);
     }
 
     /**
      * Create a NameNode at the specified location and start it.
      */
-    public NameNode(File dir, int port) throws IOException {
-        this.namesystem = new FSNamesystem(dir);
-        this.handlerCount =
-            NutchConf.get().getInt("ndfs.namenode.handler.count", 10);
-        this.server = RPC.getServer(this, port, handlerCount, false);
+    public NameNode(File dir, int port, NutchConf nutchConf) throws IOException {
+        this.namesystem = new FSNamesystem(dir, nutchConf);
+        this.handlerCount = nutchConf.getInt("ndfs.namenode.handler.count", 10);
+        this.server = RPC.getServer(this, port, handlerCount, false, nutchConf);
         this.server.start();
     }
 
@@ -346,7 +345,7 @@
     /**
      */
     public static void main(String argv[]) throws IOException, InterruptedException {
-        NameNode namenode = new NameNode();
+        NameNode namenode = new NameNode(new NutchConf());
         namenode.join();
     }
 }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/net/BasicUrlNormalizer.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/net/BasicUrlNormalizer.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/net/BasicUrlNormalizer.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/net/BasicUrlNormalizer.java Tue Jan 31 08:08:58 2006
@@ -23,6 +23,7 @@
 
 import java.util.logging.Logger;
 import org.apache.nutch.util.LogFormatter;
+import org.apache.nutch.util.NutchConf;
 import org.apache.oro.text.regex.*;
 
 /** Converts URLs to a normal form . */
@@ -39,6 +40,8 @@
     private Rule relativePathRule = null;
     private Rule leadingRelativePathRule = null;
 
+    private NutchConf nutchConf;
+
     public BasicUrlNormalizer() {
       try {
         // this pattern tries to find spots like "/xx/../" in the url, which
@@ -171,6 +174,15 @@
         public Perl5Pattern pattern;
         public Perl5Substitution substitution;
     }
+
+
+  public void setConf(NutchConf conf) {
+    this.nutchConf = conf;
+  }
+
+  public NutchConf getConf() {
+    return this.nutchConf;
+  }
 
 }