You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/07/06 20:37:44 UTC

svn commit: r209495 - in /lucene/nutch/branches/mapred/src: java/org/apache/nutch/io/ java/org/apache/nutch/mapred/ test/org/apache/nutch/mapred/

Author: cutting
Date: Wed Jul  6 11:37:44 2005
New Revision: 209495

URL: http://svn.apache.org/viewcvs?rev=209495&view=rev
Log:
Add unit test for SequenceFile InputFormat.  Fix code to pass unit test.  SequenceFile now inserts sync marks after a fixed number of bytes rather than after a fixed number of entries.

Added:
    lucene/nutch/branches/mapred/src/test/org/apache/nutch/mapred/
    lucene/nutch/branches/mapred/src/test/org/apache/nutch/mapred/TestSequenceFileInputFormat.java
Modified:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/FileSplit.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormatBase.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java?rev=209495&r1=209494&r2=209495&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java Wed Jul  6 11:37:44 2005
@@ -39,10 +39,12 @@
   };
 
   private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
-  private static final int SYNC_INTERVAL = 10;    // num entries between syncs
   private static final int SYNC_HASH_SIZE = 16;   // number of bytes in hash 
   private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
 
+  /** The number of bytes between sync points.*/
+  public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 
+
   /** Write key/value pairs to a sequence-format file. */
   public static class Writer {
     private NFSDataOutputStream out;
@@ -56,7 +58,7 @@
     // Insert a globally unique 16-byte value every few entries, so that one
     // can seek into the middle of a file and then synchronize with record
     // starts and ends by scanning for this value.
-    private long count;                           // number of entries added
+    private long lastSyncPos;                     // position of last sync
     private final byte[] sync;                    // 16 random bytes
     {
       try {                                       // use hash of uid + host
@@ -145,7 +147,9 @@
       if (keyLength == 0)
         throw new IOException("zero length keys not allowed");
 
-      if ((++count % SYNC_INTERVAL) == 0) {       // time to emit sync
+      if (out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
+        lastSyncPos = out.getPos();               // update lastSyncPos
+        //LOG.info("sync@"+lastSyncPos);
         out.writeInt(SYNC_ESCAPE);                // escape it
         out.write(sync);                          // write sync
       }
@@ -297,6 +301,7 @@
       int length = in.readInt();
 
       if (version[3] > 1 && length == SYNC_ESCAPE) { // process a sync entry
+        //LOG.info("sync@"+in.getPos());
         in.readFully(syncCheck);                  // read syncCheck
         if (!Arrays.equals(sync, syncCheck))      // check it
           throw new IOException("File is corrupt!");
@@ -318,12 +323,12 @@
 
     /** Seek to the next sync mark past a given position.*/
     public synchronized void sync(long position) throws IOException {
-      if (position+sync.length >= end) {
+      if (position+SYNC_SIZE >= end) {
         seek(end);
         return;
       }
 
-      seek(position);
+      seek(position+4);                           // skip escape
       in.readFully(syncCheck);
       int syncLen = sync.length;
       for (int i = 0; in.getPos() < end; i++) {
@@ -332,8 +337,10 @@
           if (sync[j] != syncCheck[(i+j)%syncLen])
             break;
         }
-        if (j == syncLen)
+        if (j == syncLen) {
+          in.seek(in.getPos() - SYNC_SIZE);  // position before sync
           return;
+        }
         syncCheck[i%syncLen] = in.readByte();
       }
     }

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/FileSplit.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/FileSplit.java?rev=209495&r1=209494&r2=209495&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/FileSplit.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/FileSplit.java Wed Jul  6 11:37:44 2005
@@ -56,6 +56,7 @@
   /** The number of bytes in the file to process. */
   public long getLength() { return length; }
 
+  public String toString() { return file + ":" + start + "+" + length; }
 
   ////////////////////////////////////////////
   // Writable methods

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormatBase.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormatBase.java?rev=209495&r1=209494&r2=209495&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormatBase.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormatBase.java Wed Jul  6 11:37:44 2005
@@ -21,14 +21,25 @@
 
 import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.logging.Logger;
 
 import org.apache.nutch.fs.NutchFileSystem;
+import org.apache.nutch.util.LogFormatter;
 
 /** A base class for {@link InputFormat}. */
 public abstract class InputFormatBase implements InputFormat {
 
+  public static final Logger LOG =
+    LogFormatter.getLogger("org.apache.nutch.mapred.InputFormatBase");
+
   private static final double SPLIT_SLOP = 0.1;   // 10% slop
 
+  private int minSplitSize = 1;
+
+  protected void setMinSplitSize(int minSplitSize) {
+    this.minSplitSize = minSplitSize;
+  }
+
   public abstract RecordReader getRecordReader(NutchFileSystem fs,
                                                FileSplit split,
                                                JobConf job) throws IOException;
@@ -84,8 +95,11 @@
       totalSize += fs.getLength(files[i]);
     }
 
-    long bytesPerSplit = totalSize / numSplits;
+    long bytesPerSplit = Math.max(totalSize / numSplits, minSplitSize);
     long maxPerSplit = bytesPerSplit + (long)(bytesPerSplit*SPLIT_SLOP);
+
+    //LOG.info("bytesPerSplit = " + bytesPerSplit);
+    //LOG.info("maxPerSplit = " + maxPerSplit);
 
     ArrayList splits = new ArrayList(numSplits);  // generate splits
     for (int i = 0; i < files.length; i++) {

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java?rev=209495&r1=209494&r2=209495&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java Wed Jul  6 11:37:44 2005
@@ -31,6 +31,10 @@
 /** An {@link InputFormat} for {@link SequenceFile}s. */
 public class SequenceFileInputFormat extends InputFormatBase {
 
+  public SequenceFileInputFormat() {
+    setMinSplitSize(SequenceFile.SYNC_INTERVAL);
+  }
+
   protected File[] listFiles(NutchFileSystem fs, JobConf job)
     throws IOException {
 

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java?rev=209495&r1=209494&r2=209495&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java Wed Jul  6 11:37:44 2005
@@ -38,7 +38,10 @@
     this.in = new SequenceFile.Reader(fs, split.getFile().toString());
     this.end = split.getStart() + split.getLength();
 
-    in.sync(split.getStart());                    // sync to start
+    if (split.getStart() > in.getPosition())
+      in.sync(split.getStart());                  // sync to start
+
+    more = in.getPosition() < end;
   }
 
 

Added: lucene/nutch/branches/mapred/src/test/org/apache/nutch/mapred/TestSequenceFileInputFormat.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/test/org/apache/nutch/mapred/TestSequenceFileInputFormat.java?rev=209495&view=auto
==============================================================================
--- lucene/nutch/branches/mapred/src/test/org/apache/nutch/mapred/TestSequenceFileInputFormat.java (added)
+++ lucene/nutch/branches/mapred/src/test/org/apache/nutch/mapred/TestSequenceFileInputFormat.java Wed Jul  6 11:37:44 2005
@@ -0,0 +1,105 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.mapred;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+import java.util.logging.*;
+
+import org.apache.nutch.fs.*;
+import org.apache.nutch.io.*;
+import org.apache.nutch.util.*;
+
+public class TestSequenceFileInputFormat extends TestCase {
+  private static final Logger LOG = InputFormatBase.LOG;
+
+  private static int MAX_LENGTH = 10000;
+
+  public void testFormat() throws Exception {
+    JobConf job = new JobConf(NutchConf.get());
+    NutchFileSystem fs = NutchFileSystem.getNamed("local");
+    File dir = new File(System.getProperty("test.build.data",".") + "/mrtest");
+    File file = new File(dir, "test.seq");
+    
+    int seed = new Random().nextInt();
+    LOG.info("seed = "+seed);
+    Random random = new Random(seed);
+
+    dir.mkdirs();
+    job.setInputDir(dir);
+
+    // for a variety of lengths
+    for (int length = 0; length < MAX_LENGTH;
+         length+= random.nextInt(MAX_LENGTH/10)+1) {
+
+      LOG.info("creating; entries = " + length);
+
+      // create a file with length entries
+      file.delete();
+      SequenceFile.Writer writer =
+        new SequenceFile.Writer(fs, file.toString(),
+                                IntWritable.class, BytesWritable.class);
+      try {
+        for (int i = 0; i < length; i++) {
+          IntWritable key = new IntWritable(i);
+          byte[] data = new byte[random.nextInt(10)];
+          random.nextBytes(data);
+          BytesWritable value = new BytesWritable(data);
+          writer.append(key, value);
+        }
+      } finally {
+        writer.close();
+      }
+
+      // try splitting the file in a variety of sizes
+      InputFormat format = new SequenceFileInputFormat();
+      IntWritable key = new IntWritable();
+      BytesWritable value = new BytesWritable();
+      for (int i = 0; i < 3; i++) {
+        int numSplits =
+          random.nextInt(MAX_LENGTH/(SequenceFile.SYNC_INTERVAL/20))+1;
+        LOG.info("splitting: requesting = " + numSplits);
+        FileSplit[] splits = format.getSplits(fs, job, numSplits);
+        LOG.info("splitting: got =        " + splits.length);
+
+        // check each split
+        BitSet bits = new BitSet(length);
+        for (int j = 0; j < splits.length; j++) {
+          RecordReader reader = format.getRecordReader(fs, splits[j], job);
+          int count = 0;
+          while (reader.next(key, value)) {
+//             if (bits.get(key.get())) {
+//               LOG.info("splits["+j+"]="+splits[j]+" : " + key.get());
+//               LOG.info("@"+reader.getPos());
+//             }
+            assertFalse("Key in multiple partitions.", bits.get(key.get()));
+            bits.set(key.get());
+            count++;
+          }
+          //LOG.info("splits["+j+"]="+splits[j]+" count=" + count);
+        }
+        assertEquals("Some keys in no partition.", length, bits.cardinality());
+      }
+
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    new TestSequenceFileInputFormat().testFormat();
+  }
+}