You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ab...@apache.org on 2006/01/31 17:13:17 UTC
svn commit: r373853 [2/6] - in /lucene/nutch/trunk/src:
java/org/apache/nutch/analysis/ java/org/apache/nutch/clustering/
java/org/apache/nutch/crawl/ java/org/apache/nutch/fetcher/
java/org/apache/nutch/fs/ java/org/apache/nutch/indexer/ java/org/apac...
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/indexer/NdfsDirectory.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/indexer/NdfsDirectory.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/indexer/NdfsDirectory.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/indexer/NdfsDirectory.java Tue Jan 31 08:08:58 2006
@@ -19,19 +19,22 @@
import java.io.*;
import org.apache.lucene.store.*;
import org.apache.nutch.fs.*;
+import org.apache.nutch.util.NutchConf;
/** Reads a Lucene index stored in NDFS. */
public class NdfsDirectory extends Directory {
private NutchFileSystem fs;
private File directory;
+ private int ioFileBufferSize;
- public NdfsDirectory(NutchFileSystem fs, File directory, boolean create)
+ public NdfsDirectory(NutchFileSystem fs, File directory, boolean create, NutchConf nutchConf)
throws IOException {
this.fs = fs;
this.directory = directory;
-
+ this.ioFileBufferSize = nutchConf.getInt("io.file.buffer.size", 4096);
+
if (create) {
create();
}
@@ -103,12 +106,12 @@
if (fs.exists(file) && !fs.delete(file)) // delete existing, if any
throw new IOException("Cannot overwrite: " + file);
- return new NdfsIndexOutput(file);
+ return new NdfsIndexOutput(file, this.ioFileBufferSize);
}
public IndexInput openInput(String name) throws IOException {
- return new NdfsIndexInput(new File(directory, name));
+ return new NdfsIndexInput(new File(directory, name), this.ioFileBufferSize);
}
public Lock makeLock(final String name) {
@@ -152,7 +155,7 @@
private class Descriptor {
public NFSDataInputStream in;
public long position; // cache of in.getPos()
- public Descriptor(File file) throws IOException {
+ public Descriptor(File file, int ioFileBufferSize) throws IOException {
this.in = fs.open(file);
}
}
@@ -161,8 +164,8 @@
private final long length;
private boolean isClone;
- public NdfsIndexInput(File path) throws IOException {
- descriptor = new Descriptor(path);
+ public NdfsIndexInput(File path, int ioFileBufferSize) throws IOException {
+ descriptor = new Descriptor(path,ioFileBufferSize);
length = fs.getLength(path);
}
@@ -211,7 +214,7 @@
private class NdfsIndexOutput extends BufferedIndexOutput {
private NFSDataOutputStream out;
- public NdfsIndexOutput(File path) throws IOException {
+ public NdfsIndexOutput(File path, int ioFileBufferSize) throws IOException {
out = fs.create(path);
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/io/ArrayFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/io/ArrayFile.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/io/ArrayFile.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/io/ArrayFile.java Tue Jan 31 08:08:58 2006
@@ -46,8 +46,8 @@
private LongWritable key = new LongWritable();
/** Construct an array reader for the named file.*/
- public Reader(NutchFileSystem nfs, String file) throws IOException {
- super(nfs, file);
+ public Reader(NutchFileSystem nfs, String file, NutchConf nutchConf) throws IOException {
+ super(nfs, file, nutchConf);
}
/** Positions the reader before its <code>n</code>th value. */
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/io/MapFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/io/MapFile.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/io/MapFile.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/io/MapFile.java Tue Jan 31 08:08:58 2006
@@ -44,11 +44,8 @@
/** The name of the data file. */
public static final String DATA_FILE_NAME = "data";
- /** Number of index entries to skip between each entry. Zero by default.
- * Setting this to values larger than zero can facilitate opening large map
- * files using less memory. */
- public static final int INDEX_SKIP =
- NutchConf.get().getInt("io.map.index.skip", 0);
+
+
protected MapFile() {} // no public ctor
@@ -160,6 +157,12 @@
/** Provide access to an existing map. */
public static class Reader {
+
+ /** Number of index entries to skip between each entry. Zero by default.
+ * Setting this to values larger than zero can facilitate opening large map
+ * files using less memory. */
+ private int INDEX_SKIP = 0;
+
private WritableComparator comparator;
private DataOutputBuffer keyBuf = new DataOutputBuffer();
@@ -190,19 +193,20 @@
public Class getValueClass() { return data.getValueClass(); }
/** Construct a map reader for the named map.*/
- public Reader(NutchFileSystem nfs, String dirName) throws IOException {
- this(nfs, dirName, null);
+ public Reader(NutchFileSystem nfs, String dirName, NutchConf nutchConf) throws IOException {
+ this(nfs, dirName, null, nutchConf);
+ INDEX_SKIP = nutchConf.getInt("io.map.index.skip", 0);
}
/** Construct a map reader for the named map using the named comparator.*/
- public Reader(NutchFileSystem nfs, String dirName, WritableComparator comparator)
+ public Reader(NutchFileSystem nfs, String dirName, WritableComparator comparator, NutchConf nutchConf)
throws IOException {
File dir = new File(dirName);
File dataFile = new File(dir, DATA_FILE_NAME);
File indexFile = new File(dir, INDEX_FILE_NAME);
// open the data
- this.data = new SequenceFile.Reader(nfs, dataFile.getPath());
+ this.data = new SequenceFile.Reader(nfs, dataFile.getPath(), nutchConf);
this.firstPosition = data.getPosition();
if (comparator == null)
@@ -213,7 +217,7 @@
this.getKey = this.comparator.newKey();
// open the index
- this.index = new SequenceFile.Reader(nfs, indexFile.getPath());
+ this.index = new SequenceFile.Reader(nfs, indexFile.getPath(), nutchConf);
}
private void readIndex() throws IOException {
@@ -416,7 +420,7 @@
* @throws Exception
*/
public static long fix(NutchFileSystem nfs, File dir,
- Class keyClass, Class valueClass, boolean dryrun) throws Exception {
+ Class keyClass, Class valueClass, boolean dryrun, NutchConf nutchConf) throws Exception {
String dr = (dryrun ? "[DRY RUN ] " : "");
File data = new File(dir, DATA_FILE_NAME);
File index = new File(dir, INDEX_FILE_NAME);
@@ -429,7 +433,7 @@
// no fixing needed
return -1;
}
- SequenceFile.Reader dataReader = new SequenceFile.Reader(nfs, data.toString());
+ SequenceFile.Reader dataReader = new SequenceFile.Reader(nfs, data.toString(), nutchConf);
if (!dataReader.getKeyClass().equals(keyClass)) {
throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() +
", got " + dataReader.getKeyClass().getName());
@@ -474,8 +478,10 @@
String in = args[0];
String out = args[1];
- NutchFileSystem nfs = new LocalFileSystem();
- MapFile.Reader reader = new MapFile.Reader(nfs, in);
+ NutchConf conf = new NutchConf();
+ int ioFileBufferSize = conf.getInt("io.file.buffer.size", 4096);
+ NutchFileSystem nfs = new LocalFileSystem(conf);
+ MapFile.Reader reader = new MapFile.Reader(nfs, in, conf);
MapFile.Writer writer =
new MapFile.Writer(nfs, out, reader.getKeyClass(), reader.getValueClass());
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/io/ObjectWritable.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/io/ObjectWritable.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/io/ObjectWritable.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/io/ObjectWritable.java Tue Jan 31 08:08:58 2006
@@ -25,13 +25,17 @@
import java.io.*;
import java.util.*;
+import org.apache.nutch.util.NutchConf;
+import org.apache.nutch.util.NutchConfigurable;
+
/** A polymorphic Writable that writes an instance with it's class name.
* Handles arrays, strings and primitive types without a Writable wrapper.
*/
-public class ObjectWritable implements Writable {
+public class ObjectWritable implements Writable, NutchConfigurable {
private Class declaredClass;
private Object instance;
+ private NutchConf nutchConf;
public ObjectWritable() {}
@@ -57,7 +61,7 @@
}
public void readFields(DataInput in) throws IOException {
- readObject(in, this);
+ readObject(in, this, this.nutchConf);
}
public void write(DataOutput out) throws IOException {
@@ -165,14 +169,14 @@
/** Read a {@link Writable}, {@link String}, primitive type, or an array of
* the preceding. */
- public static Object readObject(DataInput in)
+ public static Object readObject(DataInput in, NutchConf nutchConf)
throws IOException {
- return readObject(in, null);
+ return readObject(in, null, nutchConf);
}
/** Read a {@link Writable}, {@link String}, primitive type, or an array of
* the preceding. */
- public static Object readObject(DataInput in, ObjectWritable objectWritable)
+ public static Object readObject(DataInput in, ObjectWritable objectWritable, NutchConf nutchConf)
throws IOException {
String className = UTF8.readString(in);
Class declaredClass = (Class)PRIMITIVE_NAMES.get(className);
@@ -220,7 +224,7 @@
int length = in.readInt();
instance = Array.newInstance(declaredClass.getComponentType(), length);
for (int i = 0; i < length; i++) {
- Array.set(instance, i, readObject(in));
+ Array.set(instance, i, readObject(in, nutchConf));
}
} else if (declaredClass == String.class) { // String
@@ -229,6 +233,9 @@
} else { // Writable
try {
Writable writable = (Writable)declaredClass.newInstance();
+ if(writable instanceof NutchConfigurable) {
+ ((NutchConfigurable) writable).setConf(nutchConf);
+ }
writable.readFields(in);
instance = writable;
} catch (InstantiationException e) {
@@ -245,6 +252,14 @@
return instance;
+ }
+
+ public void setConf(NutchConf conf) {
+ this.nutchConf = conf;
+ }
+
+ public NutchConf getConf() {
+ return this.nutchConf;
}
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/io/SequenceFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/io/SequenceFile.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/io/SequenceFile.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/io/SequenceFile.java Tue Jan 31 08:08:58 2006
@@ -222,10 +222,12 @@
private byte[] inflateIn = new byte[1024];
private DataOutputBuffer inflateOut = new DataOutputBuffer();
private Inflater inflater = new Inflater();
+ private NutchConf nutchConf;
/** Open the named file. */
- public Reader(NutchFileSystem nfs, String file) throws IOException {
- this(nfs, file, NutchConf.get().getInt("io.file.buffer.size", 4096));
+ public Reader(NutchFileSystem nfs, String file, NutchConf nutchConf) throws IOException {
+ this(nfs, file, nutchConf.getInt("io.file.buffer.size", 4096));
+ this.nutchConf = nutchConf;
}
private Reader(NutchFileSystem nfs, String name, int bufferSize) throws IOException {
@@ -339,7 +341,9 @@
}
inBuf.reset(inflateOut.getData(), inflateOut.getLength());
}
-
+ if(val instanceof NutchConfigurable) {
+ ((NutchConfigurable) val).setConf(this.nutchConf);
+ }
val.readFields(inBuf);
if (inBuf.getPosition() != inBuf.getLength())
@@ -386,9 +390,9 @@
private void handleChecksumException(ChecksumException e)
throws IOException {
- if (NutchConf.get().getBoolean("io.skip.checksum.errors", false)) {
+ if (this.nutchConf.getBoolean("io.skip.checksum.errors", false)) {
LOG.warning("Bad checksum at "+getPosition()+". Skipping entries.");
- sync(getPosition()+NutchConf.get().getInt("io.bytes.per.checksum", 512));
+ sync(getPosition()+this.nutchConf.getInt("io.bytes.per.checksum", 512));
} else {
throw e;
}
@@ -449,10 +453,6 @@
* very efficient. In particular, it should avoid allocating memory.
*/
public static class Sorter {
- private static final int FACTOR =
- NutchConf.get().getInt("io.sort.factor", 100);
- private static final int MEGABYTES =
- NutchConf.get().getInt("io.sort.mb", 100);
private WritableComparator comparator;
@@ -461,25 +461,30 @@
private String outFile;
- private int memory = MEGABYTES * 1024*1024; // bytes
- private int factor = FACTOR; // merged per pass
+ private int memory; // bytes
+ private int factor; // merged per pass
private NutchFileSystem nfs = null;
private Class keyClass;
private Class valClass;
+ private NutchConf nutchConf;
+
/** Sort and merge files containing the named classes. */
- public Sorter(NutchFileSystem nfs, Class keyClass, Class valClass) {
- this(nfs, new WritableComparator(keyClass), valClass);
+ public Sorter(NutchFileSystem nfs, Class keyClass, Class valClass, NutchConf nutchConf) {
+ this(nfs, new WritableComparator(keyClass), valClass, nutchConf);
}
/** Sort and merge using an arbitrary {@link WritableComparator}. */
- public Sorter(NutchFileSystem nfs, WritableComparator comparator, Class valClass) {
+ public Sorter(NutchFileSystem nfs, WritableComparator comparator, Class valClass, NutchConf nutchConf) {
this.nfs = nfs;
this.comparator = comparator;
this.keyClass = comparator.getKeyClass();
this.valClass = valClass;
+ this.memory = nutchConf.getInt("io.sort.mb", 100) * 1024 * 1024;
+ this.factor = nutchConf.getInt("io.sort.factor", 100);
+ this.nutchConf = nutchConf;
}
/** Set the number of streams to merge at once.*/
@@ -513,7 +518,7 @@
private int sortPass() throws IOException {
LOG.fine("running sort pass");
- SortPass sortPass = new SortPass(); // make the SortPass
+ SortPass sortPass = new SortPass(this.nutchConf); // make the SortPass
try {
return sortPass.run(); // run it
} finally {
@@ -536,8 +541,8 @@
private NFSDataOutputStream out;
private String outName;
- public SortPass() throws IOException {
- in = new Reader(nfs, inFile);
+ public SortPass(NutchConf nutchConf) throws IOException {
+ in = new Reader(nfs, inFile, nutchConf);
}
public int run() throws IOException {
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/io/SetFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/io/SetFile.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/io/SetFile.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/io/SetFile.java Tue Jan 31 08:08:58 2006
@@ -51,14 +51,14 @@
public static class Reader extends MapFile.Reader {
/** Construct a set reader for the named set.*/
- public Reader(NutchFileSystem nfs, String dirName) throws IOException {
- super(nfs, dirName);
+ public Reader(NutchFileSystem nfs, String dirName, NutchConf nutchConf) throws IOException {
+ super(nfs, dirName, nutchConf);
}
/** Construct a set reader for the named set using the named comparator.*/
- public Reader(NutchFileSystem nfs, String dirName, WritableComparator comparator)
+ public Reader(NutchFileSystem nfs, String dirName, WritableComparator comparator, NutchConf nutchConf)
throws IOException {
- super(nfs, dirName, comparator);
+ super(nfs, dirName, comparator, nutchConf);
}
// javadoc inherited
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/ipc/Client.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/ipc/Client.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/ipc/Client.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/ipc/Client.java Tue Jan 31 08:08:58 2006
@@ -35,6 +35,7 @@
import org.apache.nutch.util.LogFormatter;
import org.apache.nutch.util.NutchConf;
+import org.apache.nutch.util.NutchConfigurable;
import org.apache.nutch.io.Writable;
import org.apache.nutch.io.UTF8;
@@ -52,9 +53,10 @@
private Hashtable connections = new Hashtable();
private Class valueClass; // class of call values
- private int timeout = NutchConf.get().getInt("ipc.client.timeout",10000); // timeout for calls
+ private int timeout ;// timeout for calls
private int counter; // counter for call ids
private boolean running = true; // true while client runs
+ private NutchConf nutchConf;
/** A call waiting for a value. */
private class Call {
@@ -160,6 +162,9 @@
Writable value = makeValue();
try {
readingCall = call;
+ if(value instanceof NutchConfigurable) {
+ ((NutchConfigurable) value).setConf(nutchConf);
+ }
value.readFields(in); // read value
} finally {
readingCall = null;
@@ -256,8 +261,10 @@
/** Construct an IPC client whose values are of the given {@link Writable}
* class. */
- public Client(Class valueClass) {
+ public Client(Class valueClass, NutchConf nutchConf) {
this.valueClass = valueClass;
+ this.timeout = nutchConf.getInt("ipc.client.timeout",10000);
+ this.nutchConf = nutchConf;
}
/** Stop all threads related to this client. No further calls may be made
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/ipc/RPC.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/ipc/RPC.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/ipc/RPC.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/ipc/RPC.java Tue Jan 31 08:08:58 2006
@@ -56,10 +56,11 @@
/** A method invocation, including the method name and its parameters.*/
- private static class Invocation implements Writable {
+ private static class Invocation implements Writable, NutchConfigurable {
private String methodName;
private Class[] parameterClasses;
private Object[] parameters;
+ private NutchConf nutchConf;
public Invocation() {}
@@ -82,10 +83,9 @@
methodName = UTF8.readString(in);
parameters = new Object[in.readInt()];
parameterClasses = new Class[parameters.length];
-
ObjectWritable objectWritable = new ObjectWritable();
for (int i = 0; i < parameters.length; i++) {
- parameters[i] = ObjectWritable.readObject(in, objectWritable);
+ parameters[i] = ObjectWritable.readObject(in, objectWritable, this.nutchConf);
parameterClasses[i] = objectWritable.getDeclaredClass();
}
}
@@ -111,15 +111,29 @@
return buffer.toString();
}
+ public void setConf(NutchConf conf) {
+ this.nutchConf = conf;
+ }
+
+ public NutchConf getConf() {
+ return this.nutchConf;
+ }
+
}
- private static Client CLIENT = new Client(ObjectWritable.class);
+ //TODO mb@media-style.com: static client or non-static client?
+ private static Client CLIENT;
private static class Invoker implements InvocationHandler {
private InetSocketAddress address;
- public Invoker(InetSocketAddress address) {
+ public Invoker(InetSocketAddress address, NutchConf nutchConf) {
this.address = address;
+ CLIENT = (Client) nutchConf.getObject(Client.class.getName());
+ if(CLIENT == null) {
+ CLIENT = new Client(ObjectWritable.class, nutchConf);
+ nutchConf.setObject(Client.class.getName(), CLIENT);
+ }
}
public Object invoke(Object proxy, Method method, Object[] args)
@@ -132,21 +146,25 @@
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */
- public static Object getProxy(Class protocol, InetSocketAddress addr) {
+ public static Object getProxy(Class protocol, InetSocketAddress addr, NutchConf nutchConf) {
return Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol },
- new Invoker(addr));
+ new Invoker(addr, nutchConf));
}
/** Expert: Make multiple, parallel calls to a set of servers. */
public static Object[] call(Method method, Object[][] params,
- InetSocketAddress[] addrs)
+ InetSocketAddress[] addrs, NutchConf nutchConf)
throws IOException {
Invocation[] invocations = new Invocation[params.length];
for (int i = 0; i < params.length; i++)
invocations[i] = new Invocation(method, params[i]);
-
+ CLIENT = (Client) nutchConf.getObject(Client.class.getName());
+ if(CLIENT == null) {
+ CLIENT = new Client(ObjectWritable.class, nutchConf);
+ nutchConf.setObject(Client.class.getName(), CLIENT);
+ }
Writable[] wrappedValues = CLIENT.call(invocations, addrs);
if (method.getReturnType() == Void.TYPE) {
@@ -165,16 +183,16 @@
/** Construct a server for a protocol implementation instance listening on a
* port. */
- public static Server getServer(final Object instance, final int port) {
- return getServer(instance, port, 1, false);
+ public static Server getServer(final Object instance, final int port, NutchConf nutchConf) {
+ return getServer(instance, port, 1, false, nutchConf);
}
/** Construct a server for a protocol implementation instance listening on a
* port. */
public static Server getServer(final Object instance, final int port,
final int numHandlers,
- final boolean verbose) {
- return new Server(port, Invocation.class, numHandlers) {
+ final boolean verbose, NutchConf nutchConf) {
+ return new Server(port, Invocation.class, numHandlers, nutchConf) {
Class implementation = instance.getClass();
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/ipc/Server.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/ipc/Server.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/ipc/Server.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/ipc/Server.java Tue Jan 31 08:08:58 2006
@@ -53,7 +53,7 @@
private int maxQueuedCalls; // max number of queued calls
private Class paramClass; // class of call parameters
- private int timeout = NutchConf.get().getInt("ipc.client.timeout",10000);
+ private int timeout;
private boolean running = true; // true while server runs
private LinkedList callQueue = new LinkedList(); // queued calls
@@ -228,11 +228,12 @@
* be of the named class. The <code>handlerCount</handlerCount> determines
* the number of handler threads that will be used to process calls.
*/
- protected Server(int port, Class paramClass, int handlerCount) {
+ protected Server(int port, Class paramClass, int handlerCount, NutchConf nutchConf) {
this.port = port;
this.paramClass = paramClass;
this.handlerCount = handlerCount;
this.maxQueuedCalls = handlerCount;
+ this.timeout = nutchConf.getInt("ipc.client.timeout",10000);
}
/** Sets the timeout used for network i/o. */
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/CombiningCollector.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/CombiningCollector.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/CombiningCollector.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/CombiningCollector.java Tue Jan 31 08:08:58 2006
@@ -28,8 +28,7 @@
* then invokes the combiner's reduce method to merge some values before
* they're transferred to a reduce node. */
class CombiningCollector implements OutputCollector {
- private static final int LIMIT
- = NutchConf.get().getInt("mapred.combine.buffer.size", 100000);
+ private int limit;
private int count = 0;
private Map keyToValues; // the buffer
@@ -46,6 +45,7 @@
this.reporter = reporter;
this.combiner = (Reducer)job.newInstance(job.getCombinerClass());
this.keyToValues = new TreeMap(job.getOutputKeyComparator());
+ this.limit = job.getInt("mapred.combine.buffer.size", 100000);
}
public synchronized void collect(WritableComparable key, Writable value)
@@ -63,7 +63,7 @@
count++;
- if (count >= LIMIT) { // time to flush
+ if (count >= this.limit) { // time to flush
flush();
}
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/JobClient.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/JobClient.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/JobClient.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/JobClient.java Tue Jan 31 08:08:58 2006
@@ -166,28 +166,30 @@
JobSubmissionProtocol jobSubmitClient;
NutchFileSystem fs = null;
+ private NutchConf nutchConf;
static Random r = new Random();
/**
* Build a job client, connect to the default job tracker
*/
public JobClient(NutchConf conf) throws IOException {
+ this.nutchConf = conf;
String tracker = conf.get("mapred.job.tracker", "local");
if ("local".equals(tracker)) {
- this.jobSubmitClient = new LocalJobRunner();
+ this.jobSubmitClient = new LocalJobRunner(conf);
} else {
this.jobSubmitClient = (JobSubmissionProtocol)
RPC.getProxy(JobSubmissionProtocol.class,
- JobTracker.getAddress(conf));
+ JobTracker.getAddress(conf), conf);
}
}
/**
* Build a job client, connect to the indicated job tracker.
*/
- public JobClient(InetSocketAddress jobTrackAddr) throws IOException {
+ public JobClient(InetSocketAddress jobTrackAddr, NutchConf nutchConf) throws IOException {
this.jobSubmitClient = (JobSubmissionProtocol)
- RPC.getProxy(JobSubmissionProtocol.class, jobTrackAddr);
+ RPC.getProxy(JobSubmissionProtocol.class, jobTrackAddr, nutchConf);
}
@@ -207,7 +209,7 @@
public synchronized NutchFileSystem getFs() throws IOException {
if (this.fs == null) {
String fsName = jobSubmitClient.getFilesystemName();
- this.fs = NutchFileSystem.getNamed(fsName);
+ this.fs = NutchFileSystem.getNamed(fsName, this.nutchConf);
}
return fs;
}
@@ -234,7 +236,7 @@
//
// Create a number of filenames in the JobTracker's fs namespace
- File submitJobDir = new File(JobConf.getSystemDir(), "submit_" + Integer.toString(Math.abs(r.nextInt()),36));
+ File submitJobDir = new File(job.getSystemDir(), "submit_" + Integer.toString(Math.abs(r.nextInt()), 36));
File submitJobFile = new File(submitJobDir, "job.xml");
File submitJarFile = new File(submitJobDir, "job.jar");
@@ -349,7 +351,7 @@
}
// Submit the request
- JobClient jc = new JobClient(NutchConf.get());
+ JobClient jc = new JobClient(new NutchConf());
try {
if (submitJobFile != null) {
RunningJob job = jc.submitJob(submitJobFile);
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/JobConf.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/JobConf.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/JobConf.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/JobConf.java Tue Jan 31 08:08:58 2006
@@ -50,9 +50,15 @@
* of input files, and where the output files should be written. */
public class JobConf extends NutchConf {
- /** Construct a map/reduce job configuration.
- *
- * @param conf a NutchConf whose settings will be inherited.
+ public JobConf() {
+ super();
+ }
+
+ /**
+ * Construct a map/reduce job configuration.
+ *
+ * @param conf
+ * a NutchConf whose settings will be inherited.
*/
public JobConf(NutchConf conf) {
super(conf);
@@ -81,32 +87,32 @@
public String getJar() { return get("mapred.jar"); }
public void setJar(String jar) { set("mapred.jar", jar); }
- public static File getSystemDir() {
- return new File(NutchConf.get().get("mapred.system.dir",
+ public File getSystemDir() {
+ return new File(get("mapred.system.dir",
"/tmp/nutch/mapred/system"));
}
- public static String[] getLocalDirs() throws IOException {
- return NutchConf.get().getStrings("mapred.local.dir");
+ public String[] getLocalDirs() throws IOException {
+ return getStrings("mapred.local.dir");
}
- public static void deleteLocalFiles() throws IOException {
+ public void deleteLocalFiles() throws IOException {
String[] localDirs = getLocalDirs();
for (int i = 0; i < localDirs.length; i++) {
- FileUtil.fullyDelete(new File(localDirs[i]));
+ FileUtil.fullyDelete(new File(localDirs[i]), this);
}
}
- public static void deleteLocalFiles(String subdir) throws IOException {
+ public void deleteLocalFiles(String subdir) throws IOException {
String[] localDirs = getLocalDirs();
for (int i = 0; i < localDirs.length; i++) {
- FileUtil.fullyDelete(new File(localDirs[i], subdir));
+ FileUtil.fullyDelete(new File(localDirs[i], subdir), this);
}
}
/** Constructs a local file name. Files are distributed among configured
* local directories.*/
- public static File getLocalFile(String subdir, String name)
+ public File getLocalFile(String subdir, String name)
throws IOException {
String[] localDirs = getLocalDirs();
String path = subdir + File.separator + name;
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/JobTracker.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/JobTracker.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/JobTracker.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/JobTracker.java Tue Jan 31 08:08:58 2006
@@ -210,25 +210,29 @@
NutchFileSystem fs;
File systemDir;
+ private NutchConf nutchConf;
+
/**
* Start the JobTracker process, listen on the indicated port
*/
JobTracker(NutchConf conf) throws IOException {
// This is a directory of temporary submission files. We delete it
// on startup, and can delete any files that we're done with
- this.systemDir = JobConf.getSystemDir();
- this.fs = NutchFileSystem.get();
+ JobConf jobConf = new JobConf(conf);
+ this.systemDir = jobConf.getSystemDir();
+ this.nutchConf = conf;
+ this.fs = NutchFileSystem.get(conf);
FileUtil.fullyDelete(fs, systemDir);
fs.mkdirs(systemDir);
// Same with 'localDir' except it's always on the local disk.
- JobConf.deleteLocalFiles(SUBDIR);
+ jobConf.deleteLocalFiles(SUBDIR);
// Set ports, start RPC servers, etc.
InetSocketAddress addr = getAddress(conf);
this.localMachine = addr.getHostName();
this.port = addr.getPort();
- this.interTrackerServer = RPC.getServer(this,addr.getPort(),10,false);
+ this.interTrackerServer = RPC.getServer(this, addr.getPort(), 10, false, conf);
this.interTrackerServer.start();
Properties p = System.getProperties();
for (Iterator it = p.keySet().iterator(); it.hasNext(); ) {
@@ -529,7 +533,7 @@
* task allocation.)
*/
JobInProgress createJob(String jobFile) throws IOException {
- JobInProgress job = new JobInProgress(jobFile);
+ JobInProgress job = new JobInProgress(jobFile, this.nutchConf);
jobs.put(job.getProfile().getJobId(), job);
boolean error = true;
@@ -576,6 +580,7 @@
long startTime;
long finishTime;
String deleteUponCompletion = null;
+ private NutchConf nutchConf;
/**
* Create a 'JobInProgress' object, which contains both JobProfile
@@ -583,13 +588,13 @@
* of the JobTracker. But JobInProgress adds info that's useful for
* the JobTracker alone.
*/
- public JobInProgress(String jobFile) throws IOException {
+ public JobInProgress(String jobFile, NutchConf nutchConf) throws IOException {
String jobid = createJobId();
String url = "http://" + localMachine + ":" + infoPort + "/jobdetails.jsp?jobid=" + jobid;
this.profile = new JobProfile(jobid, jobFile, url);
this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.RUNNING);
- this.localJobFile = JobConf.getLocalFile(SUBDIR, jobid+".xml");
+ this.localJobFile = new JobConf(nutchConf).getLocalFile(SUBDIR, jobid + ".xml");
fs.copyToLocalFile(new File(jobFile), localJobFile);
JobConf jd = new JobConf(localJobFile);
@@ -602,6 +607,7 @@
if (jobFile.startsWith(systemDir.getPath())) {
this.deleteUponCompletion = jobFile;
}
+ this.nutchConf = nutchConf;
}
/**
@@ -613,7 +619,7 @@
// construct input splits
JobConf jd = new JobConf(localJobFile);
- NutchFileSystem fs = NutchFileSystem.get();
+ NutchFileSystem fs = NutchFileSystem.get(nutchConf);
FileSplit[] splits =
jd.getInputFormat().getSplits(fs, jd, numMapTasks);
@@ -634,6 +640,7 @@
for (int i = 0; i < numMapTasks; i++) {
mapIds[i] = createMapTaskId();
Task t = new MapTask(jobFile, mapIds[i], splits[i]);
+ t.setConf(this.nutchConf);
incompleteMapTasks.put(mapIds[i], t);
taskToJobMap.put(mapIds[i], jobid);
@@ -643,6 +650,7 @@
for (int i = 0; i < numReduceTasks; i++) {
String taskid = createReduceTaskId();
Task t = new ReduceTask(jobFile, taskid, mapIds, i);
+ t.setConf(this.nutchConf);
reducesToLaunch.add(t);
taskToJobMap.put(taskid, jobid);
}
@@ -1067,6 +1075,6 @@
System.exit(-1);
}
- startTracker(NutchConf.get());
+ startTracker(new NutchConf());
}
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/LocalJobRunner.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/LocalJobRunner.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/LocalJobRunner.java Tue Jan 31 08:08:58 2006
@@ -31,6 +31,7 @@
private NutchFileSystem fs;
private HashMap jobs = new HashMap();
+ private NutchConf nutchConf;
private class Job extends Thread
implements TaskUmbilicalProtocol {
@@ -40,15 +41,19 @@
private JobStatus status = new JobStatus();
private ArrayList mapIds = new ArrayList();
+ private MapOutputFile mapoutputFile;
- public Job(String file) throws IOException {
+ public Job(String file, NutchConf nutchConf) throws IOException {
this.file = file;
this.id = "job_" + newId();
+ this.mapoutputFile = new MapOutputFile();
+ this.mapoutputFile.setConf(nutchConf);
- File localFile = JobConf.getLocalFile("localRunner", id+".xml");
+ File localFile = new JobConf(nutchConf).getLocalFile("localRunner", id+".xml");
fs.copyToLocalFile(new File(file), localFile);
this.job = new JobConf(localFile);
-
+
+
this.status.jobid = id;
this.status.runState = JobStatus.RUNNING;
@@ -67,6 +72,7 @@
for (int i = 0; i < splits.length; i++) {
mapIds.add("map_" + newId());
MapTask map = new MapTask(file, (String)mapIds.get(i), splits[i]);
+ map.setConf(job);
map.run(job, this);
}
@@ -74,12 +80,12 @@
String reduceId = "reduce_" + newId();
for (int i = 0; i < mapIds.size(); i++) {
String mapId = (String)mapIds.get(i);
- File mapOut = MapOutputFile.getOutputFile(mapId, 0);
- File reduceIn = MapOutputFile.getInputFile(mapId, reduceId);
+ File mapOut = this.mapoutputFile.getOutputFile(mapId, 0);
+ File reduceIn = this.mapoutputFile.getInputFile(mapId, reduceId);
reduceIn.getParentFile().mkdirs();
- if (!NutchFileSystem.getNamed("local").rename(mapOut, reduceIn))
+ if (!NutchFileSystem.getNamed("local", this.job).rename(mapOut, reduceIn))
throw new IOException("Couldn't rename " + mapOut);
- MapOutputFile.removeAll(mapId);
+ this.mapoutputFile.removeAll(mapId);
}
// run a single reduce task
@@ -87,8 +93,9 @@
new ReduceTask(file, reduceId,
(String[])mapIds.toArray(new String[0]),
0);
+ reduce.setConf(job);
reduce.run(job, this);
- MapOutputFile.removeAll(reduceId);
+ this.mapoutputFile.removeAll(reduceId);
this.status.runState = JobStatus.SUCCEEDED;
@@ -138,14 +145,15 @@
}
- public LocalJobRunner() throws IOException {
- this.fs = NutchFileSystem.get();
+ public LocalJobRunner(NutchConf nutchConf) throws IOException {
+ this.fs = NutchFileSystem.get(nutchConf);
+ this.nutchConf = nutchConf;
}
// JobSubmissionProtocol methods
public JobStatus submitJob(String jobFile) throws IOException {
- return new Job(jobFile).status;
+ return new Job(jobFile, this.nutchConf).status;
}
public void killJob(String id) {
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapFileOutputFormat.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapFileOutputFormat.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapFileOutputFormat.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapFileOutputFormat.java Tue Jan 31 08:08:58 2006
@@ -25,6 +25,7 @@
import org.apache.nutch.io.MapFile;
import org.apache.nutch.io.WritableComparable;
import org.apache.nutch.io.Writable;
+import org.apache.nutch.util.NutchConf;
public class MapFileOutputFormat implements OutputFormat {
@@ -52,7 +53,7 @@
}
/** Open the output generated by this format. */
- public static MapFile.Reader[] getReaders(NutchFileSystem fs, File dir)
+ public static MapFile.Reader[] getReaders(NutchFileSystem fs, File dir, NutchConf nutchConf)
throws IOException {
File[] names = fs.listFiles(dir);
@@ -61,7 +62,7 @@
MapFile.Reader[] parts = new MapFile.Reader[names.length];
for (int i = 0; i < names.length; i++) {
- parts[i] = new MapFile.Reader(fs, names[i].toString());
+ parts[i] = new MapFile.Reader(fs, names[i].toString(), nutchConf);
}
return parts;
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapOutputFile.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapOutputFile.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapOutputFile.java Tue Jan 31 08:08:58 2006
@@ -24,19 +24,20 @@
import org.apache.nutch.util.*;
/** A local file to be transferred via the {@link MapOutputProtocol}. */
-public class MapOutputFile implements Writable {
+public class MapOutputFile implements Writable, NutchConfigurable {
private String mapTaskId;
private String reduceTaskId;
private int partition;
/** Permits reporting of file copy progress. */
- public static interface ProgressReporter {
+ public interface ProgressReporter {
void progress(float progress) throws IOException;
}
- private static final ThreadLocal REPORTERS = new ThreadLocal();
+ private ThreadLocal REPORTERS = new ThreadLocal();
+ private JobConf jobConf;
- public static void setProgressReporter(ProgressReporter reporter) {
+ public void setProgressReporter(ProgressReporter reporter) {
REPORTERS.set(reporter);
}
@@ -44,36 +45,37 @@
* @param mapTaskId a map task id
* @param partition a reduce partition
*/
- public static File getOutputFile(String mapTaskId, int partition)
+ public File getOutputFile(String mapTaskId, int partition)
throws IOException {
- return JobConf.getLocalFile(mapTaskId, "part-"+partition+".out");
+ return this.jobConf.getLocalFile(mapTaskId, "part-"+partition+".out");
}
/** Create a local reduce input file name.
* @param mapTaskId a map task id
* @param reduceTaskId a reduce task id
*/
- public static File getInputFile(String mapTaskId, String reduceTaskId)
+ public File getInputFile(String mapTaskId, String reduceTaskId)
throws IOException {
- return JobConf.getLocalFile(reduceTaskId, mapTaskId+".out");
+ return this.jobConf.getLocalFile(reduceTaskId, mapTaskId+".out");
}
/** Removes all of the files related to a task. */
- public static void removeAll(String taskId) throws IOException {
- JobConf.deleteLocalFiles(taskId);
+ public void removeAll(String taskId) throws IOException {
+ this.jobConf.deleteLocalFiles(taskId);
}
/**
* Removes all contents of temporary storage. Called upon
* startup, to remove any leftovers from previous run.
*/
- public static void cleanupStorage() throws IOException {
- JobConf.deleteLocalFiles();
+ public void cleanupStorage() throws IOException {
+ this.jobConf.deleteLocalFiles();
}
/** Construct a file for transfer. */
- public MapOutputFile() {
+ public MapOutputFile() {
}
+
public MapOutputFile(String mapTaskId, String reduceTaskId, int partition) {
this.mapTaskId = mapTaskId;
this.reduceTaskId = reduceTaskId;
@@ -88,7 +90,7 @@
// write the length-prefixed file content to the wire
File file = getOutputFile(mapTaskId, partition);
out.writeLong(file.length());
- NFSDataInputStream in = NutchFileSystem.getNamed("local").open(file);
+ NFSDataInputStream in = NutchFileSystem.getNamed("local", this.jobConf).open(file);
try {
byte[] buffer = new byte[8192];
int l;
@@ -112,7 +114,7 @@
long length = in.readLong();
float progPerByte = 1.0f / length;
long unread = length;
- NFSDataOutputStream out = NutchFileSystem.getNamed("local").create(file);
+ NFSDataOutputStream out = NutchFileSystem.getNamed("local", this.jobConf).create(file);
try {
byte[] buffer = new byte[8192];
while (unread > 0) {
@@ -127,6 +129,14 @@
} finally {
out.close();
}
+ }
+
+ public void setConf(NutchConf conf) {
+ this.jobConf = new JobConf(conf);
+ }
+
+ public NutchConf getConf() {
+ return this.jobConf;
}
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapTask.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapTask.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapTask.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapTask.java Tue Jan 31 08:08:58 2006
@@ -27,6 +27,8 @@
/** A Map task. */
public class MapTask extends Task {
private FileSplit split;
+ private MapOutputFile mapOutputFile;
+ private NutchConf nutchConf;
public MapTask() {}
@@ -36,7 +38,7 @@
}
public TaskRunner createRunner(TaskTracker tracker) {
- return new MapTaskRunner(this, tracker);
+ return new MapTaskRunner(this, tracker, this.nutchConf);
}
public FileSplit getSplit() { return split; }
@@ -62,8 +64,8 @@
try {
for (int i = 0; i < partitions; i++) {
outs[i] =
- new SequenceFile.Writer(NutchFileSystem.getNamed("local"),
- MapOutputFile.getOutputFile(getTaskId(), i).toString(),
+ new SequenceFile.Writer(NutchFileSystem.getNamed("local", job),
+ this.mapOutputFile.getOutputFile(getTaskId(), i).toString(),
job.getOutputKeyClass(),
job.getOutputValueClass());
}
@@ -91,7 +93,7 @@
final RecordReader rawIn = // open input
job.getInputFormat().getRecordReader
- (NutchFileSystem.get(), split, job, reporter);
+ (NutchFileSystem.get(job), split, job, reporter);
RecordReader in = new RecordReader() { // wrap in progress reporter
private float perByte = 1.0f /(float)split.getLength();
@@ -130,6 +132,16 @@
}
}
done(umbilical);
+ }
+
+ public void setConf(NutchConf conf) {
+ this.nutchConf = conf;
+ this.mapOutputFile = new MapOutputFile();
+ this.mapOutputFile.setConf(conf);
+ }
+
+ public NutchConf getConf() {
+ return this.nutchConf;
}
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapTaskRunner.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapTaskRunner.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapTaskRunner.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/MapTaskRunner.java Tue Jan 31 08:08:58 2006
@@ -26,17 +26,22 @@
/** Runs a map task. */
class MapTaskRunner extends TaskRunner {
- public MapTaskRunner(Task task, TaskTracker tracker) {
- super(task, tracker);
+ private MapOutputFile mapOutputFile;
+
+ public MapTaskRunner(Task task, TaskTracker tracker, NutchConf nutchConf) {
+ super(task, tracker, nutchConf);
+ this.mapOutputFile = new MapOutputFile();
+ this.mapOutputFile.setConf(nutchConf);
}
+
/** Delete any temporary files from previous failed attempts. */
public void prepare() throws IOException {
- MapOutputFile.removeAll(getTask().getTaskId());
+ this.mapOutputFile.removeAll(getTask().getTaskId());
}
/** Delete all of the temporary map output files. */
public void close() throws IOException {
LOG.info(getTask()+" done; removing files.");
- MapOutputFile.removeAll(getTask().getTaskId());
+ this.mapOutputFile.removeAll(getTask().getTaskId());
}
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/ReduceTask.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/ReduceTask.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/ReduceTask.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/ReduceTask.java Tue Jan 31 08:08:58 2006
@@ -37,6 +37,8 @@
private Progress appendPhase = getProgress().addPhase("append");
private Progress sortPhase = getProgress().addPhase("sort");
private Progress reducePhase = getProgress().addPhase("reduce");
+ private NutchConf nutchConf;
+ private MapOutputFile mapOutputFile;
public ReduceTask() {}
@@ -48,7 +50,7 @@
}
public TaskRunner createRunner(TaskTracker tracker) {
- return new ReduceTaskRunner(this, tracker);
+ return new ReduceTaskRunner(this, tracker, this.nutchConf);
}
public String[] getMapTaskIds() { return mapTaskIds; }
@@ -157,7 +159,8 @@
Class keyClass = job.getOutputKeyClass();
Class valueClass = job.getOutputValueClass();
Reducer reducer = (Reducer)job.newInstance(job.getReducerClass());
- NutchFileSystem lfs = NutchFileSystem.getNamed("local");
+ reducer.configure(job);
+ NutchFileSystem lfs = NutchFileSystem.getNamed("local", job);
copyPhase.complete(); // copy is already complete
@@ -175,12 +178,12 @@
for (int i = 0; i < mapTaskIds.length; i++) {
File partFile =
- MapOutputFile.getInputFile(mapTaskIds[i], getTaskId());
+ this.mapOutputFile.getInputFile(mapTaskIds[i], getTaskId());
float progPerByte = 1.0f / lfs.getLength(partFile);
Progress phase = appendPhase.phase();
phase.setStatus(partFile.toString());
SequenceFile.Reader in =
- new SequenceFile.Reader(lfs, partFile.toString());
+ new SequenceFile.Reader(lfs, partFile.toString(), job);
try {
int keyLen;
while((keyLen = in.next(buffer)) > 0) {
@@ -227,7 +230,7 @@
// sort the input file
SequenceFile.Sorter sorter =
- new SequenceFile.Sorter(lfs, comparator, valueClass);
+ new SequenceFile.Sorter(lfs, comparator, valueClass, job);
sorter.sort(file, sortedFile); // sort
lfs.delete(new File(file)); // remove unsorted
@@ -240,7 +243,7 @@
// make output collector
String name = getOutputName(getPartition());
final RecordWriter out =
- job.getOutputFormat().getRecordWriter(NutchFileSystem.get(), job, name);
+ job.getOutputFormat().getRecordWriter(NutchFileSystem.get(job), job, name);
OutputCollector collector = new OutputCollector() {
public void collect(WritableComparable key, Writable value)
throws IOException {
@@ -250,7 +253,7 @@
};
// apply reduce function
- SequenceFile.Reader in = new SequenceFile.Reader(lfs, sortedFile);
+ SequenceFile.Reader in = new SequenceFile.Reader(lfs, sortedFile, job);
Reporter reporter = getReporter(umbilical, getProgress());
long length = lfs.getLength(new File(sortedFile));
try {
@@ -266,7 +269,6 @@
lfs.delete(new File(sortedFile)); // remove sorted
out.close(reporter);
}
-
done(umbilical);
}
@@ -282,6 +284,16 @@
private static synchronized String getOutputName(int partition) {
return "part-" + NUMBER_FORMAT.format(partition);
+ }
+
+ public void setConf(NutchConf conf) {
+ this.nutchConf = conf;
+ this.mapOutputFile = new MapOutputFile();
+ this.mapOutputFile.setConf(conf);
+ }
+
+ public NutchConf getConf() {
+ return this.nutchConf;
}
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java Tue Jan 31 08:08:58 2006
@@ -28,15 +28,18 @@
class ReduceTaskRunner extends TaskRunner {
private static final Logger LOG =
LogFormatter.getLogger("org.apache.nutch.mapred.ReduceTaskRunner");
+ private MapOutputFile mapOutputFile;
- public ReduceTaskRunner(Task task, TaskTracker tracker) {
- super(task, tracker);
+ public ReduceTaskRunner(Task task, TaskTracker tracker, NutchConf nutchConf) {
+ super(task, tracker, nutchConf);
+ this.mapOutputFile = new MapOutputFile();
+ this.mapOutputFile.setConf(nutchConf);
}
/** Assemble all of the map output files. */
public void prepare() throws IOException {
ReduceTask task = ((ReduceTask)getTask());
- MapOutputFile.removeAll(task.getTaskId()); // cleanup from failures
+ this.mapOutputFile.removeAll(task.getTaskId()); // cleanup from failures
String[] mapTaskIds = task.getMapTaskIds();
final Progress copyPhase = getTask().getProgress().phase();
@@ -74,9 +77,9 @@
InetSocketAddress addr =
new InetSocketAddress(loc.getHost(), loc.getPort());
MapOutputProtocol client =
- (MapOutputProtocol)RPC.getProxy(MapOutputProtocol.class, addr);
+ (MapOutputProtocol)RPC.getProxy(MapOutputProtocol.class, addr, this.nutchConf);
- MapOutputFile.setProgressReporter(new MapOutputFile.ProgressReporter(){
+ this.mapOutputFile.setProgressReporter(new MapOutputFile.ProgressReporter() {
public void progress(float progress) {
copyPhase.phase().set(progress);
try {
@@ -104,7 +107,7 @@
+loc.getMapTaskId()+" from "+addr,
e);
} finally {
- MapOutputFile.setProgressReporter(null);
+ this.mapOutputFile.setProgressReporter(null);
}
}
@@ -116,7 +119,7 @@
/** Delete all of the temporary map output files. */
public void close() throws IOException {
getTask().getProgress().setStatus("closed");
- MapOutputFile.removeAll(getTask().getTaskId());
+ this.mapOutputFile.removeAll(getTask().getTaskId());
}
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java Tue Jan 31 08:08:58 2006
@@ -54,7 +54,7 @@
reporter.setStatus(split.toString());
- return new SequenceFileRecordReader(fs, split);
+ return new SequenceFileRecordReader(job, split);
}
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/SequenceFileOutputFormat.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/SequenceFileOutputFormat.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/SequenceFileOutputFormat.java Tue Jan 31 08:08:58 2006
@@ -26,6 +26,7 @@
import org.apache.nutch.io.SequenceFile;
import org.apache.nutch.io.WritableComparable;
import org.apache.nutch.io.Writable;
+import org.apache.nutch.util.NutchConf;
public class SequenceFileOutputFormat implements OutputFormat {
@@ -53,8 +54,9 @@
}
/** Open the output generated by this format. */
- public static SequenceFile.Reader[] getReaders(NutchFileSystem fs, File dir)
+ public static SequenceFile.Reader[] getReaders(NutchConf nutchConf, File dir)
throws IOException {
+ NutchFileSystem fs = NutchFileSystem.get(nutchConf);
File[] names = fs.listFiles(dir);
// sort names, so that hash partitioning works
@@ -62,7 +64,7 @@
SequenceFile.Reader[] parts = new SequenceFile.Reader[names.length];
for (int i = 0; i < names.length; i++) {
- parts[i] = new SequenceFile.Reader(fs, names[i].toString());
+ parts[i] = new SequenceFile.Reader(fs, names[i].toString(), nutchConf);
}
return parts;
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java Tue Jan 31 08:08:58 2006
@@ -26,6 +26,7 @@
import org.apache.nutch.io.WritableComparable;
import org.apache.nutch.io.LongWritable;
import org.apache.nutch.io.UTF8;
+import org.apache.nutch.util.NutchConf;
/** An {@link RecordReader} for {@link SequenceFile}s. */
public class SequenceFileRecordReader implements RecordReader {
@@ -33,9 +34,10 @@
private long end;
private boolean more = true;
- public SequenceFileRecordReader(NutchFileSystem fs, FileSplit split)
+ public SequenceFileRecordReader(NutchConf nutchConf, FileSplit split)
throws IOException {
- this.in = new SequenceFile.Reader(fs, split.getFile().toString());
+ NutchFileSystem fs = NutchFileSystem.get(nutchConf);
+ this.in = new SequenceFile.Reader(fs, split.getFile().toString(), nutchConf);
this.end = split.getStart() + split.getLength();
if (split.getStart() > in.getPosition())
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/Task.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/Task.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/Task.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/Task.java Tue Jan 31 08:08:58 2006
@@ -24,14 +24,13 @@
import java.util.*;
/** Base class for tasks. */
-public abstract class Task implements Writable {
+public abstract class Task implements Writable, NutchConfigurable {
////////////////////////////////////////////
// Fields
////////////////////////////////////////////
private String jobFile; // job configuration file
private String taskId; // unique, includes job id
-
////////////////////////////////////////////
// Constructors
////////////////////////////////////////////
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskRunner.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskRunner.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskRunner.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskRunner.java Tue Jan 31 08:08:58 2006
@@ -37,9 +37,12 @@
private Task t;
private TaskTracker tracker;
- public TaskRunner(Task t, TaskTracker tracker) {
+ protected NutchConf nutchConf;
+
+ public TaskRunner(Task t, TaskTracker tracker, NutchConf nutchConf) {
this.t = t;
this.tracker = tracker;
+ this.nutchConf = nutchConf;
}
public Task getTask() { return t; }
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTracker.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTracker.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTracker.java Tue Jan 31 08:08:58 2006
@@ -33,13 +33,9 @@
* @author Mike Cafarella
*******************************************************/
public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutputProtocol, Runnable {
- private static final int MAX_CURRENT_TASKS =
- NutchConf.get().getInt("mapred.tasktracker.tasks.maximum", 2);
-
+ private int maxCurrentTask;
static final long WAIT_FOR_DONE = 3 * 1000;
-
- static final long TASK_TIMEOUT =
- NutchConf.get().getInt("mapred.task.timeout", 10* 60 * 1000);
+ private long taskTimeout;
static final int STALE_STATE = 1;
@@ -68,6 +64,7 @@
static final String SUBDIR = "taskTracker";
private NutchConf fConf;
+ private MapOutputFile mapOutputFile;
/**
* Start with the local machine name, and the default JobTracker
@@ -82,6 +79,10 @@
public TaskTracker(InetSocketAddress jobTrackAddr, NutchConf conf) throws IOException {
this.fConf = conf;
this.jobTrackAddr = jobTrackAddr;
+ this.maxCurrentTask = conf.getInt("mapred.tasktracker.tasks.maximum", 2);
+ this.taskTimeout = conf.getInt("mapred.task.timeout", 10* 60 * 1000);
+ this.mapOutputFile = new MapOutputFile();
+ this.mapOutputFile.setConf(conf);
initialize();
}
@@ -94,7 +95,7 @@
this.taskTrackerName = "tracker_" + (Math.abs(r.nextInt()) % 100000);
this.localHostname = InetAddress.getLocalHost().getHostName();
- JobConf.deleteLocalFiles(SUBDIR);
+ new JobConf(this.fConf).deleteLocalFiles(SUBDIR);
// Clear out state tables
this.tasks = new TreeMap();
@@ -107,7 +108,7 @@
// RPC initialization
while (true) {
try {
- this.taskReportServer = RPC.getServer(this, this.taskReportPort, MAX_CURRENT_TASKS, false);
+ this.taskReportServer = RPC.getServer(this, this.taskReportPort, this.maxCurrentTask, false, this.fConf);
this.taskReportServer.start();
break;
} catch (BindException e) {
@@ -118,7 +119,7 @@
}
while (true) {
try {
- this.mapOutputServer = RPC.getServer(this, this.mapOutputPort, MAX_CURRENT_TASKS, false);
+ this.mapOutputServer = RPC.getServer(this, this.mapOutputPort, this.maxCurrentTask, false, this.fConf);
this.mapOutputServer.start();
break;
} catch (BindException e) {
@@ -128,10 +129,10 @@
}
// Clear out temporary files that might be lying around
- MapOutputFile.cleanupStorage();
+ this.mapOutputFile.cleanupStorage();
this.justStarted = true;
- this.jobClient = (InterTrackerProtocol) RPC.getProxy(InterTrackerProtocol.class, jobTrackAddr);
+ this.jobClient = (InterTrackerProtocol) RPC.getProxy(InterTrackerProtocol.class, jobTrackAddr, this.fConf);
}
/**
@@ -164,7 +165,7 @@
}
// Clear local storage
- MapOutputFile.cleanupStorage();
+ this.mapOutputFile.cleanupStorage();
}
/**
@@ -213,7 +214,7 @@
// Xmit the heartbeat
//
if (justStarted) {
- this.fs = NutchFileSystem.getNamed(jobClient.getFilesystemName());
+ this.fs = NutchFileSystem.getNamed(jobClient.getFilesystemName(), this.fConf);
}
int resultCode = jobClient.emitHeartbeat(new TaskTrackerStatus(taskTrackerName, localHostname, mapOutputPort, taskReports), justStarted);
@@ -226,10 +227,10 @@
//
// Check if we should create a new Task
//
- if (runningTasks.size() < MAX_CURRENT_TASKS) {
+ if (runningTasks.size() < this.maxCurrentTask) {
Task t = jobClient.pollForNewTask(taskTrackerName);
if (t != null) {
- TaskInProgress tip = new TaskInProgress(t);
+ TaskInProgress tip = new TaskInProgress(t, this.fConf);
synchronized (this) {
tasks.put(t.getTaskId(), tip);
runningTasks.put(t.getTaskId(), tip);
@@ -245,7 +246,7 @@
for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) {
TaskInProgress tip = (TaskInProgress) it.next();
if ((tip.getRunState() == TaskStatus.RUNNING) &&
- (System.currentTimeMillis() - tip.getLastProgressReport() > TASK_TIMEOUT)) {
+ (System.currentTimeMillis() - tip.getLastProgressReport() > this.taskTimeout)) {
LOG.info("Task " + tip.getTask().getTaskId() + " timed out. Killing.");
tip.reportDiagnosticInfo("Timed out.");
tip.killAndCleanup();
@@ -321,13 +322,15 @@
TaskRunner runner;
boolean done = false;
boolean wasKilled = false;
+ private JobConf jobConf;
/**
*/
- public TaskInProgress(Task task) throws IOException {
+ public TaskInProgress(Task task, NutchConf nutchConf) throws IOException {
this.task = task;
this.lastProgressReport = System.currentTimeMillis();
- JobConf.deleteLocalFiles(SUBDIR+File.separator+task.getTaskId());
+ this.jobConf = new JobConf(nutchConf);
+ this.jobConf.deleteLocalFiles(SUBDIR + File.separator + task.getTaskId());
localizeTask(task);
}
@@ -337,9 +340,9 @@
*/
void localizeTask(Task t) throws IOException {
File localJobFile =
- JobConf.getLocalFile(SUBDIR+File.separator+t.getTaskId(), "job.xml");
+ this.jobConf.getLocalFile(SUBDIR+File.separator+t.getTaskId(), "job.xml");
File localJarFile =
- JobConf.getLocalFile(SUBDIR+File.separator+t.getTaskId(), "job.jar");
+ this.jobConf.getLocalFile(SUBDIR+File.separator+t.getTaskId(), "job.jar");
String jobFile = t.getJobFile();
fs.copyToLocalFile(new File(jobFile), localJobFile);
@@ -501,7 +504,7 @@
runner.close();
} catch (IOException ie) {
}
- JobConf.deleteLocalFiles(SUBDIR+File.separator+task.getTaskId());
+ this.jobConf.deleteLocalFiles(SUBDIR + File.separator + task.getTaskId());
}
}
@@ -509,11 +512,14 @@
// MapOutputProtocol
/////////////////////////////////////////////////////////////////
public MapOutputFile getFile(String mapTaskId, String reduceTaskId,
- IntWritable partition) {
- return new MapOutputFile(mapTaskId, reduceTaskId, partition.get());
- }
+ IntWritable partition) {
+ MapOutputFile mapOutputFile = new MapOutputFile(mapTaskId, reduceTaskId,
+ partition.get());
+ mapOutputFile.setConf(this.fConf);
+ return mapOutputFile;
+ }
- /////////////////////////////////////////////////////////////////
+ // ///////////////////////////////////////////////////////////////
// TaskUmbilicalProtocol
/////////////////////////////////////////////////////////////////
/**
@@ -586,16 +592,17 @@
LogFormatter.showTime(false);
LOG.info("Child starting");
+ NutchConf nutchConf = new NutchConf();
int port = Integer.parseInt(args[0]);
String taskid = args[1];
TaskUmbilicalProtocol umbilical =
(TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
- new InetSocketAddress(port));
+ new InetSocketAddress(port), nutchConf);
Task task = umbilical.getTask(taskid);
JobConf job = new JobConf(task.getJobFile());
- NutchConf.get().addConfResource(new File(task.getJobFile()));
+ nutchConf.addConfResource(new File(task.getJobFile()));
startPinging(umbilical, taskid); // start pinging parent
@@ -646,7 +653,7 @@
System.exit(-1);
}
- TaskTracker tt = new TaskTracker(NutchConf.get());
+ TaskTracker tt = new TaskTracker(new NutchConf());
tt.run();
}
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/demo/Grep.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/demo/Grep.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/demo/Grep.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/demo/Grep.java Tue Jan 31 08:08:58 2006
@@ -45,7 +45,7 @@
System.exit(-1);
}
- NutchConf defaults = NutchConf.get();
+ NutchConf defaults = new NutchConf();
File tempDir =
new File("grep-temp-"+
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/DataNode.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/DataNode.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/DataNode.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/DataNode.java Tue Jan 31 08:08:58 2006
@@ -85,8 +85,8 @@
* Needs a directory to find its data (and config info)
*/
public DataNode(String machineName, File datadir, InetSocketAddress nameNodeAddr, NutchConf conf) throws IOException {
- this.namenode = (DatanodeProtocol) RPC.getProxy(DatanodeProtocol.class, nameNodeAddr);
- this.data = new FSDataset(datadir);
+ this.namenode = (DatanodeProtocol) RPC.getProxy(DatanodeProtocol.class, nameNodeAddr, conf);
+ this.data = new FSDataset(datadir, conf);
ServerSocket ss = null;
int tmpPort = conf.getInt("ndfs.datanode.port", 50010);
@@ -748,6 +748,6 @@
*/
public static void main(String args[]) throws IOException {
LogFormatter.setShowThreadIDs(true);
- runAndWait(NutchConf.get());
+ runAndWait(new NutchConf());
}
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSConstants.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSConstants.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSConstants.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSConstants.java Tue Jan 31 08:08:58 2006
@@ -107,8 +107,8 @@
public static long LEASE_PERIOD = 60 * 1000;
public static int READ_TIMEOUT = 60 * 1000;
- public static int BUFFER_SIZE =
- NutchConf.get().getInt("io.file.buffer.size", 4096);
+ //TODO mb@media-style.com: should be nutchConf injected?
+ public static final int BUFFER_SIZE = new NutchConf().getInt("io.file.buffer.size", 4096);
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSDataset.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSDataset.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSDataset.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSDataset.java Tue Jan 31 08:08:58 2006
@@ -191,7 +191,7 @@
/**
* An FSDataset has a directory where it loads its data files.
*/
- public FSDataset(File dir) throws IOException {
+ public FSDataset(File dir, NutchConf nutchConf) throws IOException {
this.dirpath = dir.getCanonicalPath();
this.data = new File(dir, "data");
if (! data.exists()) {
@@ -199,7 +199,7 @@
}
this.tmp = new File(dir, "tmp");
if (tmp.exists()) {
- FileUtil.fullyDelete(tmp);
+ FileUtil.fullyDelete(tmp, nutchConf);
}
this.tmp.mkdirs();
this.dirTree = new FSDir(data);
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSNamesystem.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSNamesystem.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSNamesystem.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSNamesystem.java Tue Jan 31 08:08:58 2006
@@ -34,27 +34,7 @@
public class FSNamesystem implements FSConstants {
public static final Logger LOG = LogFormatter.getLogger("org.apache.nutch.fs.FSNamesystem");
- // DESIRED_REPLICATION is how many copies we try to have at all times
- final static int DESIRED_REPLICATION =
- NutchConf.get().getInt("ndfs.replication", 3);
-
- // The maximum number of replicates we should allow for a single block
- final static int MAX_REPLICATION = DESIRED_REPLICATION;
-
- // How many outgoing replication streams a given node should have at one time
- final static int MAX_REPLICATION_STREAMS = NutchConf.get().getInt("ndfs.max-repl-streams", 2);
-
- // MIN_REPLICATION is how many copies we need in place or else we disallow the write
- final static int MIN_REPLICATION = 1;
-
- // HEARTBEAT_RECHECK is how often a datanode sends its hearbeat
- final static long HEARTBEAT_RECHECK = 1000;
-
- // Whether we should use disk-availability info when determining target
- final static boolean USE_AVAILABILITY = NutchConf.get().getBoolean("ndfs.availability.allocation", false);
-
- private boolean allowSameHostTargets =
- NutchConf.get().getBoolean("test.ndfs.same.host.targets.allowed", false);
+
//
// Stores the correct file name hierarchy
@@ -148,18 +128,44 @@
Daemon hbthread = null, lmthread = null;
boolean fsRunning = true;
long systemStart = 0;
+ private NutchConf nutchConf;
+
+ // DESIRED_REPLICATION is how many copies we try to have at all times
+ private int desiredReplication;
+ // The maximum number of replicates we should allow for a single block
+ private int maxReplication;
+ // How many outgoing replication streams a given node should have at one time
+ private int maxReplicationStreams;
+ // MIN_REPLICATION is how many copies we need in place or else we disallow the write
+ private int minReplication;
+ // HEARTBEAT_RECHECK is how often a datanode sends its hearbeat
+ private int heartBeatRecheck;
+ // Whether we should use disk-availability info when determining target
+ private boolean useAvailability;
+
+ private boolean allowSameHostTargets;
/**
* dir is where the filesystem directory state
* is stored
*/
- public FSNamesystem(File dir) throws IOException {
+ public FSNamesystem(File dir, NutchConf nutchConf) throws IOException {
this.dir = new FSDirectory(dir);
this.hbthread = new Daemon(new HeartbeatMonitor());
this.lmthread = new Daemon(new LeaseMonitor());
hbthread.start();
lmthread.start();
this.systemStart = System.currentTimeMillis();
+ this.nutchConf = nutchConf;
+
+ this.desiredReplication = nutchConf.getInt("ndfs.replication", 3);
+ this.maxReplication = desiredReplication;
+ this.maxReplicationStreams = nutchConf.getInt("ndfs.max-repl-streams", 2);
+ this.minReplication = 1;
+ this.heartBeatRecheck= 1000;
+ this.useAvailability = nutchConf.getBoolean("ndfs.availability.allocation", false);
+ this.allowSameHostTargets =
+ nutchConf.getBoolean("test.ndfs.same.host.targets.allowed", false);
}
/** Close down this filesystem manager.
@@ -244,10 +250,10 @@
results = new Object[2];
// Get the array of replication targets
- DatanodeInfo targets[] = chooseTargets(DESIRED_REPLICATION, null);
- if (targets.length < MIN_REPLICATION) {
+ DatanodeInfo targets[] = chooseTargets(this.desiredReplication, null);
+ if (targets.length < this.minReplication) {
LOG.warning("Target-length is " + targets.length +
- ", below MIN_REPLICATION (" + MIN_REPLICATION + ")");
+ ", below MIN_REPLICATION (" + this.minReplication+ ")");
return null;
}
@@ -300,8 +306,8 @@
//
if (checkFileProgress(src)) {
// Get the array of replication targets
- DatanodeInfo targets[] = chooseTargets(DESIRED_REPLICATION, null);
- if (targets.length < MIN_REPLICATION) {
+ DatanodeInfo targets[] = chooseTargets(this.desiredReplication, null);
+ if (targets.length < this.minReplication) {
return null;
}
@@ -411,9 +417,9 @@
// the blocks.
for (int i = 0; i < pendingBlocks.length; i++) {
TreeSet containingNodes = (TreeSet) blocksMap.get(pendingBlocks[i]);
- if (containingNodes.size() < DESIRED_REPLICATION) {
+ if (containingNodes.size() < this.desiredReplication) {
synchronized (neededReplications) {
- LOG.info("Completed file " + src + ", at holder " + holder + ". There is/are only " + containingNodes.size() + " copies of block " + pendingBlocks[i] + ", so replicating up to " + DESIRED_REPLICATION);
+ LOG.info("Completed file " + src + ", at holder " + holder + ". There is/are only " + containingNodes.size() + " copies of block " + pendingBlocks[i] + ", so replicating up to " + this.desiredReplication);
neededReplications.add(pendingBlocks[i]);
}
}
@@ -449,7 +455,7 @@
for (Iterator it = v.iterator(); it.hasNext(); ) {
Block b = (Block) it.next();
TreeSet containingNodes = (TreeSet) blocksMap.get(b);
- if (containingNodes == null || containingNodes.size() < MIN_REPLICATION) {
+ if (containingNodes == null || containingNodes.size() < this.minReplication) {
return false;
}
}
@@ -814,7 +820,7 @@
while (fsRunning) {
heartbeatCheck();
try {
- Thread.sleep(HEARTBEAT_RECHECK);
+ Thread.sleep(heartBeatRecheck);
} catch (InterruptedException ie) {
}
}
@@ -946,10 +952,10 @@
synchronized (neededReplications) {
if (dir.isValidBlock(block)) {
- if (containingNodes.size() >= DESIRED_REPLICATION) {
+ if (containingNodes.size() >= this.desiredReplication) {
neededReplications.remove(block);
pendingReplications.remove(block);
- } else if (containingNodes.size() < DESIRED_REPLICATION) {
+ } else if (containingNodes.size() < this.desiredReplication) {
if (! neededReplications.contains(block)) {
neededReplications.add(block);
}
@@ -968,8 +974,8 @@
nonExcess.add(cur);
}
}
- if (nonExcess.size() > MAX_REPLICATION) {
- chooseExcessReplicates(nonExcess, block, MAX_REPLICATION);
+ if (nonExcess.size() > this.maxReplication) {
+ chooseExcessReplicates(nonExcess, block, this.maxReplication);
}
}
}
@@ -1032,7 +1038,7 @@
// necessary. In that case, put block on a possibly-will-
// be-replicated list.
//
- if (dir.isValidBlock(block) && (containingNodes.size() < DESIRED_REPLICATION)) {
+ if (dir.isValidBlock(block) && (containingNodes.size() < this.desiredReplication)) {
synchronized (neededReplications) {
neededReplications.add(block);
}
@@ -1147,7 +1153,7 @@
//
// We can only reply with 'maxXfers' or fewer blocks
//
- if (scheduledXfers >= MAX_REPLICATION_STREAMS - xmitsInProgress) {
+ if (scheduledXfers >= this.maxReplicationStreams - xmitsInProgress) {
break;
}
@@ -1157,7 +1163,7 @@
} else {
TreeSet containingNodes = (TreeSet) blocksMap.get(block);
if (containingNodes.contains(srcNode)) {
- DatanodeInfo targets[] = chooseTargets(Math.min(DESIRED_REPLICATION - containingNodes.size(), MAX_REPLICATION_STREAMS - xmitsInProgress), containingNodes);
+ DatanodeInfo targets[] = chooseTargets(Math.min(this.desiredReplication - containingNodes.size(), this.maxReplicationStreams - xmitsInProgress), containingNodes);
if (targets.length > 0) {
// Build items to return
replicateBlocks.add(block);
@@ -1181,7 +1187,7 @@
DatanodeInfo targets[] = (DatanodeInfo[]) replicateTargetSets.elementAt(i);
TreeSet containingNodes = (TreeSet) blocksMap.get(block);
- if (containingNodes.size() + targets.length >= DESIRED_REPLICATION) {
+ if (containingNodes.size() + targets.length >= this.desiredReplication) {
neededReplications.remove(block);
pendingReplications.add(block);
}
@@ -1313,7 +1319,7 @@
" forbidden2.size()=" +
( forbidden2 != null ? forbidden2.size() : 0 ));
return null;
- } else if (! USE_AVAILABILITY) {
+ } else if (! this.useAvailability) {
int target = r.nextInt(targetList.size());
return (DatanodeInfo) targetList.elementAt(target);
} else {
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/NDFSClient.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/NDFSClient.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/NDFSClient.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/NDFSClient.java Tue Jan 31 08:08:58 2006
@@ -42,8 +42,8 @@
/** Create a new NDFSClient connected to the given namenode server.
*/
- public NDFSClient(InetSocketAddress nameNodeAddr) {
- this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class, nameNodeAddr);
+ public NDFSClient(InetSocketAddress nameNodeAddr, NutchConf nutchConf) {
+ this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class, nameNodeAddr, nutchConf);
this.clientName = "NDFSClient_" + r.nextInt();
this.leaseChecker = new Daemon(new LeaseChecker());
this.leaseChecker.start();
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/NameNode.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/NameNode.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/NameNode.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/NameNode.java Tue Jan 31 08:08:58 2006
@@ -48,21 +48,20 @@
/**
* Create a NameNode at the default location
*/
- public NameNode() throws IOException {
- this(new File(NutchConf.get().get("ndfs.name.dir",
+ public NameNode(NutchConf nutchConf) throws IOException {
+ this(new File(nutchConf.get("ndfs.name.dir",
"/tmp/nutch/ndfs/name")),
DataNode.createSocketAddr
- (NutchConf.get().get("fs.default.name", "local")).getPort());
+ (nutchConf.get("fs.default.name", "local")).getPort(), nutchConf);
}
/**
* Create a NameNode at the specified location and start it.
*/
- public NameNode(File dir, int port) throws IOException {
- this.namesystem = new FSNamesystem(dir);
- this.handlerCount =
- NutchConf.get().getInt("ndfs.namenode.handler.count", 10);
- this.server = RPC.getServer(this, port, handlerCount, false);
+ public NameNode(File dir, int port, NutchConf nutchConf) throws IOException {
+ this.namesystem = new FSNamesystem(dir, nutchConf);
+ this.handlerCount = nutchConf.getInt("ndfs.namenode.handler.count", 10);
+ this.server = RPC.getServer(this, port, handlerCount, false, nutchConf);
this.server.start();
}
@@ -346,7 +345,7 @@
/**
*/
public static void main(String argv[]) throws IOException, InterruptedException {
- NameNode namenode = new NameNode();
+ NameNode namenode = new NameNode(new NutchConf());
namenode.join();
}
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/net/BasicUrlNormalizer.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/net/BasicUrlNormalizer.java?rev=373853&r1=373852&r2=373853&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/net/BasicUrlNormalizer.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/net/BasicUrlNormalizer.java Tue Jan 31 08:08:58 2006
@@ -23,6 +23,7 @@
import java.util.logging.Logger;
import org.apache.nutch.util.LogFormatter;
+import org.apache.nutch.util.NutchConf;
import org.apache.oro.text.regex.*;
/** Converts URLs to a normal form . */
@@ -39,6 +40,8 @@
private Rule relativePathRule = null;
private Rule leadingRelativePathRule = null;
+ private NutchConf nutchConf;
+
public BasicUrlNormalizer() {
try {
// this pattern tries to find spots like "/xx/../" in the url, which
@@ -171,6 +174,15 @@
public Perl5Pattern pattern;
public Perl5Substitution substitution;
}
+
+
+ public void setConf(NutchConf conf) {
+ this.nutchConf = conf;
+ }
+
+ public NutchConf getConf() {
+ return this.nutchConf;
+ }
}