You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/07/06 23:48:41 UTC

svn commit: r209522 - /lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java

Author: cutting
Date: Wed Jul  6 14:48:39 2005
New Revision: 209522

URL: http://svn.apache.org/viewcvs?rev=209522&view=rev
Log:
Fix sorting to work with new sync method.  Sorting prefixes temporary
data with its length, which is hard to compute with syncs.  So syncs
are no longer stored in temporary sort files.

Modified:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java?rev=209522&r1=209521&r2=209522&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java Wed Jul  6 14:48:39 2005
@@ -59,7 +59,7 @@
     // can seek into the middle of a file and then synchronize with record
     // starts and ends by scanning for this value.
     private long lastSyncPos;                     // position of last sync
-    private final byte[] sync;                    // 16 random bytes
+    private byte[] sync;                          // 16 random bytes
     {
       try {                                       // use hash of uid + host
         MessageDigest digester = MessageDigest.getInstance("MD5");
@@ -147,7 +147,8 @@
       if (keyLength == 0)
         throw new IOException("zero length keys not allowed");
 
-      if (out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
+      if (sync != null &&
+          out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
         lastSyncPos = out.getPos();               // update lastSyncPos
         //LOG.info("sync@"+lastSyncPos);
         out.writeInt(SYNC_ESCAPE);                // escape it
@@ -300,7 +301,8 @@
 
       int length = in.readInt();
 
-      if (version[3] > 1 && length == SYNC_ESCAPE) { // process a sync entry
+      if (version[3] > 1 && sync != null &&
+          length == SYNC_ESCAPE) {                // process a sync entry
         //LOG.info("sync@"+in.getPos());
         in.readFully(syncCheck);                  // read syncCheck
         if (!Arrays.equals(sync, syncCheck))      // check it
@@ -529,13 +531,15 @@
 
           long length = buffer.getLength();       // compute its size
           length += count*8;                      // allow for length/keyLength
-          length += (count/SYNC_INTERVAL)*SYNC_SIZE; // allow for syncs
 
           out.writeLong(length);                  // write size
           out.writeLong(count);                   // write count
         }
 
         Writer writer = new Writer(out, keyClass, valClass);
+        if (!done) {
+          writer.sync = null;                     // disable sync on temp files
+        }
 
         for (int i = 0; i < count; i++) {         // write in sorted order
           int p = pointers[i];
@@ -640,12 +644,13 @@
             long count = in.readLong();
 
             totalLength += length;
-            totalLength -= (count/SYNC_INTERVAL)*SYNC_SIZE; // remove syncs
 
             totalCount+= count;
 
             Reader reader = new Reader(nfs, inName, memory/(factor+1),
                                        in.getPos(), length);
+            reader.sync = null;                   // disable sync on temp files
+
             MergeStream ms = new MergeStream(reader); // add segment to queue
             if (ms.next()) {
               queue.put(ms);
@@ -654,7 +659,6 @@
           }
 
           if (!last) {                             // intermediate file
-            totalLength += (totalCount/SYNC_INTERVAL)*SYNC_SIZE; // add syncs
             queue.out.writeLong(totalLength);     // write size
             queue.out.writeLong(totalCount);      // write count
           }