You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/07/06 23:48:41 UTC
svn commit: r209522 -
/lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
Author: cutting
Date: Wed Jul 6 14:48:39 2005
New Revision: 209522
URL: http://svn.apache.org/viewcvs?rev=209522&view=rev
Log:
Fix sorting to work with new sync method. Sorting prefixes temporary
data with its length, which is hard to compute with syncs. So syncs
are no longer stored in temporary sort files.
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java?rev=209522&r1=209521&r2=209522&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java Wed Jul 6 14:48:39 2005
@@ -59,7 +59,7 @@
// can seek into the middle of a file and then synchronize with record
// starts and ends by scanning for this value.
private long lastSyncPos; // position of last sync
- private final byte[] sync; // 16 random bytes
+ private byte[] sync; // 16 random bytes
{
try { // use hash of uid + host
MessageDigest digester = MessageDigest.getInstance("MD5");
@@ -147,7 +147,8 @@
if (keyLength == 0)
throw new IOException("zero length keys not allowed");
- if (out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
+ if (sync != null &&
+ out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
lastSyncPos = out.getPos(); // update lastSyncPos
//LOG.info("sync@"+lastSyncPos);
out.writeInt(SYNC_ESCAPE); // escape it
@@ -300,7 +301,8 @@
int length = in.readInt();
- if (version[3] > 1 && length == SYNC_ESCAPE) { // process a sync entry
+ if (version[3] > 1 && sync != null &&
+ length == SYNC_ESCAPE) { // process a sync entry
//LOG.info("sync@"+in.getPos());
in.readFully(syncCheck); // read syncCheck
if (!Arrays.equals(sync, syncCheck)) // check it
@@ -529,13 +531,15 @@
long length = buffer.getLength(); // compute its size
length += count*8; // allow for length/keyLength
- length += (count/SYNC_INTERVAL)*SYNC_SIZE; // allow for syncs
out.writeLong(length); // write size
out.writeLong(count); // write count
}
Writer writer = new Writer(out, keyClass, valClass);
+ if (!done) {
+ writer.sync = null; // disable sync on temp files
+ }
for (int i = 0; i < count; i++) { // write in sorted order
int p = pointers[i];
@@ -640,12 +644,13 @@
long count = in.readLong();
totalLength += length;
- totalLength -= (count/SYNC_INTERVAL)*SYNC_SIZE; // remove syncs
totalCount+= count;
Reader reader = new Reader(nfs, inName, memory/(factor+1),
in.getPos(), length);
+ reader.sync = null; // disable sync on temp files
+
MergeStream ms = new MergeStream(reader); // add segment to queue
if (ms.next()) {
queue.put(ms);
@@ -654,7 +659,6 @@
}
if (!last) { // intermediate file
- totalLength += (totalCount/SYNC_INTERVAL)*SYNC_SIZE; // add syncs
queue.out.writeLong(totalLength); // write size
queue.out.writeLong(totalCount); // write count
}