You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/06/22 01:29:39 UTC

svn commit: r670272 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/MapTask.java src/test/org/apache/hadoop/mapred/TestMapCollection.java

Author: cdouglas
Date: Sat Jun 21 16:29:38 2008
New Revision: 670272

URL: http://svn.apache.org/viewvc?rev=670272&view=rev
Log:
HADOOP-3603. Fix MapOutputCollector to spill when io.sort.spill.percent is
1.0 and to detect spills when emitted records write no data.

Added:
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapCollection.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=670272&r1=670271&r2=670272&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Sat Jun 21 16:29:38 2008
@@ -649,6 +649,9 @@
     HADOOP-3590. Null pointer exception in JobTracker when the task tracker is 
     not yet resolved. (Amar Ramesh Kamat via ddas)
 
+    HADOOP-3603. Fix MapOutputCollector to spill when io.sort.spill.percent is
+    1.0 and to detect spills when emitted records write no data. (cdouglas)
+
 Release 0.17.1 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=670272&r1=670271&r2=670272&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Sat Jun 21 16:29:38 2008
@@ -428,7 +428,7 @@
           // serialize key bytes into buffer
         int keystart = bufindex;
         keySerializer.serialize(key);
-        if (bufindex < keystart || bufindex == bufvoid) {
+        if (bufindex < keystart) {
           // wrapped the key; reset required
           bb.reset();
           keystart = 0;
@@ -437,16 +437,21 @@
         int valstart = bufindex;
         valSerializer.serialize(value);
         int valend = bb.markRecord();
-        mapOutputByteCounter.increment(valend > keystart
-                ? valend - keystart
-                        : (bufvoid - keystart) + valend);
+        mapOutputByteCounter.increment(valend >= keystart
+            ? valend - keystart
+            : (bufvoid - keystart) + valend);
+
+        if (keystart == bufindex) {
+          // if emitted records make no writes, it's possible to wrap
+          // accounting space without notice
+          bb.write(new byte[0], 0, 0);
+        }
 
         int partition = partitioner.getPartition(key, value, partitions);
         if (partition < 0 || partition >= partitions) {
           throw new IOException("Illegal partition for " + key + " (" +
               partition + ")");
         }
-
         mapOutputRecordCounter.increment(1);
 
         // update accounting info
@@ -457,7 +462,7 @@
         kvindices[ind + VALSTART] = valstart;
         kvindex = (kvindex + 1) % kvoffsets.length;
       } catch (MapBufferTooSmallException e) {
-        LOG.debug("Record too large for in-memory buffer: " + e.getMessage());
+        LOG.info("Record too large for in-memory buffer: " + e.getMessage());
         spillSingleRecord(key, value);
         mapOutputRecordCounter.increment(1);
         return;
@@ -585,7 +590,8 @@
             }
 
             // sufficient accounting space?
-            kvfull = (kvindex + 1) % kvoffsets.length == kvstart;
+            final int kvnext = (kvindex + 1) % kvoffsets.length;
+            kvfull = kvnext == kvstart;
             // sufficient buffer space?
             if (bufstart <= bufend && bufend <= bufindex) {
               buffull = bufindex + len > bufvoid;
@@ -601,18 +607,18 @@
               // spill thread not running
               if (kvend != kvindex) {
                 // we have records we can spill
-                final boolean kvsoftlimit = (kvindex > kvend)
-                  ? kvindex - kvend > softRecordLimit
-                  : kvend - kvindex < kvoffsets.length - softRecordLimit;
+                final boolean kvsoftlimit = (kvnext > kvend)
+                  ? kvnext - kvend > softRecordLimit
+                  : kvend - kvnext <= kvoffsets.length - softRecordLimit;
                 final boolean bufsoftlimit = (bufindex > bufend)
                   ? bufindex - bufend > softBufferLimit
                   : bufend - bufindex < bufvoid - softBufferLimit;
                 if (kvsoftlimit || bufsoftlimit || (buffull && !wrap)) {
                   LOG.info("Spilling map output: buffer full = " + bufsoftlimit+
                            " and record full = " + kvsoftlimit);
-                  LOG.info("bufindex = " + bufindex + "; bufend = " + bufend +
+                  LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
                            "; bufvoid = " + bufvoid);
-                  LOG.info("kvindex = " + kvindex + "; kvend = " + kvend +
+                  LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +
                            "; length = " + kvoffsets.length);
                   kvend = kvindex;
                   bufend = bufmark;
@@ -682,6 +688,10 @@
             ).initCause(sortSpillException);
       }
       if (kvend != kvindex) {
+        LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
+                 "; bufvoid = " + bufvoid);
+        LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +
+                 "; length = " + kvoffsets.length);
         kvend = kvindex;
         bufend = bufmark;
         sortAndSpill();
@@ -717,7 +727,7 @@
     private void sortAndSpill() throws IOException {
       //approximate the length of the output file to be the length of the
       //buffer + header lengths for the partitions
-      long size = (bufend > bufstart
+      long size = (bufend >= bufstart
           ? bufend - bufstart
           : (bufvoid - bufend) + bufstart) +
                   partitions * APPROX_HEADER_LENGTH;
@@ -730,7 +740,8 @@
         out = localFs.create(filename);
         // create spill index
         Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
-                             getTaskID(), numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+                             getTaskID(), numSpills,
+                             partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
         indexOut = localFs.create(indexFilename);
         final int endPosition = (kvend > kvstart)
           ? kvend
@@ -746,7 +757,6 @@
             if (null == combinerClass) {
               // spill directly
               DataInputBuffer key = new DataInputBuffer();
-              long recordNo = 0;
               while (spindex < endPosition &&
                   kvindices[kvoffsets[spindex % kvoffsets.length]
                             + PARTITION] == i) {
@@ -757,7 +767,6 @@
                            kvindices[kvoff + KEYSTART]));
                 writer.append(key, value);
                 ++spindex;
-                ++recordNo;
               }
             } else {
               int spstart = spindex;
@@ -767,11 +776,7 @@
                 ++spindex;
               }
               // Note: we would like to avoid the combiner if we've fewer
-              // than some threshold of records for a partition, but we left
-              // our records uncompressed for the combiner. We accept the trip
-              // through the combiner to effect the compression for now;
-              // to remedy this would require us to observe the compression
-              // strategy here as we do in collect
+              // than some threshold of records for a partition
               if (spstart != spindex) {
                 combineCollector.setWriter(writer);
                 RawKeyValueIterator kvIter =
@@ -790,8 +795,8 @@
             if (null != writer) writer.close();
           }
         }
-        ++numSpills;
         LOG.info("Finished spill " + numSpills);
+        ++numSpills;
       } finally {
         if (out != null) out.close();
         if (indexOut != null) indexOut.close();
@@ -817,7 +822,8 @@
         out = localFs.create(filename);
         // create spill index
         Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
-                             getTaskID(), numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+                             getTaskID(), numSpills,
+                             partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
         indexOut = localFs.create(indexFilename);
         // we don't run the combiner for a single record
         for (int i = 0; i < partitions; ++i) {
@@ -874,8 +880,8 @@
      * deserialized value bytes. Should only be called during a spill.
      */
     private void getVBytesForOffset(int kvoff, InMemValBytes vbytes) {
-      final int nextindex = ((kvoff/ACCTSIZE) == 
-                             ((kvend - 1 + kvoffsets.length) % kvoffsets.length))
+      final int nextindex = (kvoff / ACCTSIZE ==
+                            (kvend - 1 + kvoffsets.length) % kvoffsets.length)
         ? bufend
         : kvindices[(kvoff + ACCTSIZE + KEYSTART) % kvindices.length];
       int vallen = (nextindex >= kvindices[kvoff + VALSTART])

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapCollection.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapCollection.java?rev=670272&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapCollection.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapCollection.java Sat Jun 21 16:29:38 2008
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import junit.framework.TestCase;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+
+public class TestMapCollection extends TestCase {
+
+  private static final Log LOG = LogFactory.getLog(
+      TestMapCollection.class.getName());
+
+  public static class KeyWritable
+      implements WritableComparable<KeyWritable>, JobConfigurable {
+
+    private final byte c = (byte)('K' & 0xFF);
+    static private boolean pedantic = false;
+    protected int expectedlen;
+
+    public void configure(JobConf conf) {
+      expectedlen = conf.getInt("test.keywritable.length", 1);
+      pedantic = conf.getBoolean("test.pedantic.verification", false);
+    }
+
+    public KeyWritable() { }
+
+    public KeyWritable(int len) {
+      this();
+      expectedlen = len;
+    }
+
+    public int getLength() {
+      return expectedlen;
+    }
+
+    public int compareTo(KeyWritable o) {
+      if (o == this) return 0;
+      return expectedlen - o.getLength();
+    }
+
+    public boolean equals(Object o) {
+      if (o == this) return true;
+      if (!(o instanceof KeyWritable)) return false;
+      return 0 == compareTo((KeyWritable)o);
+    }
+
+    public int hashCode() {
+      return 37 * expectedlen;
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      if (expectedlen != 0) {
+        int bytesread;
+        if (pedantic) {
+          for (int i = 0; i < expectedlen; ++i)
+            assertEquals("Invalid byte at " + i, c, in.readByte());
+          bytesread = expectedlen;
+        } else {
+          bytesread = in.skipBytes(expectedlen);
+        }
+        assertEquals("Too few bytes in record", expectedlen, bytesread);
+      }
+      // cannot verify that the stream has been exhausted
+    }
+
+    public void write(DataOutput out) throws IOException {
+      if (expectedlen != 0) {
+        if (expectedlen > 1024) {
+          byte[] b = new byte[expectedlen];
+          Arrays.fill(b, c);
+          out.write(b);
+        } else {
+          for (int i = 0; i < expectedlen; ++i) {
+            out.write(c);
+          }
+        }
+      }
+    }
+
+    public static class Comparator extends WritableComparator {
+      public Comparator() {
+        super(KeyWritable.class);
+      }
+
+      public int compare(byte[] b1, int s1, int l1,
+                         byte[] b2, int s2, int l2) {
+        if (pedantic) {
+          for (int i = s1; i < l1; ++i) {
+            assertEquals("Invalid key at " + s1, b1[i], (byte)('K' & 0xFF));
+          }
+          for (int i = s2; i < l2; ++i) {
+            assertEquals("Invalid key at " + s2, b2[i], (byte)('K' & 0xFF));
+          }
+        }
+        return l1 - l2;
+      }
+    }
+
+
+    static {
+      WritableComparator.define(KeyWritable.class, new Comparator());
+    }
+  }
+
+  public static class ValWritable extends KeyWritable {
+
+    private final byte c = (byte)('V' & 0xFF);
+
+    public ValWritable() { }
+
+    public ValWritable(int len) {
+      this();
+      expectedlen = len;
+    }
+
+    public void configure(JobConf conf) {
+      expectedlen = conf.getInt("test.valwritable.length", 1);
+    }
+  }
+
+  public static class SpillMapper
+      implements Mapper<NullWritable,NullWritable,KeyWritable,ValWritable> {
+
+    private int keylen = 1;
+    private int vallen = 1;
+    private int numrecs = 100;
+
+    public void configure(JobConf job) {
+      keylen = job.getInt("test.keywritable.length", 1);
+      vallen = job.getInt("test.valwritable.length", 1);
+      numrecs = job.getInt("test.spillmap.records", 100);
+    }
+
+    public void map(NullWritable key, NullWritable value,
+        OutputCollector<KeyWritable,ValWritable> out, Reporter reporter)
+        throws IOException {
+      KeyWritable k = new KeyWritable(keylen);
+      ValWritable v = new ValWritable(vallen);
+      for (int i = 0; i < numrecs; ++i) {
+        if ((i % 1000) == 0) {
+          reporter.progress();
+        }
+        out.collect(k, v);
+      }
+    }
+
+    public void close() { }
+
+  }
+
+  public static class SpillReducer
+      implements Reducer<KeyWritable,ValWritable,NullWritable,NullWritable> {
+
+    private int numrecs = 100;
+
+    public void configure(JobConf job) {
+      numrecs = job.getInt("test.spillmap.records", 100);
+    }
+
+    public void reduce(KeyWritable k, Iterator<ValWritable> values,
+        OutputCollector<NullWritable,NullWritable> out, Reporter reporter) {
+      int i = 0;
+      while (values.hasNext()) {
+        values.next();
+        ++i;
+      }
+      assertEquals("Unexpected record count (" + i + "/" +
+                   numrecs + ")", numrecs, i);
+    }
+
+    public void close() { }
+
+  }
+
+  public static class FakeSplit implements InputSplit {
+    public void write(DataOutput out) throws IOException { }
+    public void readFields(DataInput in) throws IOException { }
+    public long getLength() { return 0L; }
+    public String[] getLocations() { return new String[0]; }
+  }
+
+  public static class FakeIF
+      implements InputFormat<NullWritable,NullWritable> {
+
+    public FakeIF() { }
+
+    public void validateInput(JobConf conf) { }
+
+    public InputSplit[] getSplits(JobConf conf, int splits) {
+      return new InputSplit[] { new FakeSplit() };
+    }
+
+    public RecordReader<NullWritable,NullWritable> getRecordReader(
+        InputSplit ignored, JobConf conf, Reporter reporter) {
+      return new RecordReader<NullWritable,NullWritable>() {
+        private boolean done = false;
+        public boolean next(NullWritable key, NullWritable value)
+            throws IOException {
+          if (done)
+            return false;
+          done = true;
+          return true;
+        }
+        public NullWritable createKey() { return NullWritable.get(); }
+        public NullWritable createValue() { return NullWritable.get(); }
+        public long getPos() throws IOException { return 0L; }
+        public void close() throws IOException { }
+        public float getProgress() throws IOException { return 0.0f; }
+      };
+    }
+  }
+
+  private static void runTest(String name, int keylen, int vallen,
+      int records, int ioSortMB, float recPer, float spillPer,
+      boolean pedantic) throws Exception {
+    JobConf conf = new JobConf(new Configuration(), SpillMapper.class);
+
+    conf.setInt("io.sort.mb", ioSortMB);
+    conf.set("io.sort.record.percent", Float.toString(recPer));
+    conf.set("io.sort.spill.percent", Float.toString(spillPer));
+
+    conf.setInt("test.keywritable.length", keylen);
+    conf.setInt("test.valwritable.length", vallen);
+    conf.setInt("test.spillmap.records", records);
+    conf.setBoolean("test.pedantic.verification", pedantic);
+
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(1);
+    conf.setInputFormat(FakeIF.class);
+    conf.setOutputFormat(NullOutputFormat.class);
+    conf.setMapperClass(SpillMapper.class);
+    conf.setReducerClass(SpillReducer.class);
+    conf.setMapOutputKeyClass(KeyWritable.class);
+    conf.setMapOutputValueClass(ValWritable.class);
+
+    LOG.info("Running " + name);
+    JobClient.runJob(conf);
+  }
+
+  private static void runTest(String name, int keylen, int vallen, int records,
+      boolean pedantic) throws Exception {
+    runTest(name, keylen, vallen, records, 1, 0.05f, .8f, pedantic);
+  }
+
+  public void testLastFill() throws Exception {
+    // last byte of record/key is the last/first byte in the spill buffer
+    runTest("vallastbyte", 128, 896, 1344, 1, 0.125f, 0.5f, true);
+    runTest("keylastbyte", 512, 1024, 896, 1, 0.125f, 0.5f, true);
+  }
+
+  public void testLargeRecords() throws Exception {
+    // maps emitting records larger than io.sort.mb
+    runTest("largerec", 100, 1024*1024, 5, false);
+    runTest("largekeyzeroval", 1024*1024, 0, 5, false);
+  }
+
+  public void testSpillPer() throws Exception {
+    // set non-default, 100% speculative spill boundary
+    runTest("fullspill2B", 1, 1, 10000, 1, 0.05f, 1.0f, true);
+    runTest("fullspill200B", 100, 100, 10000, 1, 0.05f, 1.0f, true);
+    runTest("fullspillbuf", 10 * 1024, 20 * 1024, 256, 1, 0.3f, 1.0f, true);
+    runTest("lt50perspill", 100, 100, 10000, 1, 0.05f, 0.3f, true);
+  }
+
+  public void testZeroLength() throws Exception {
+    // test key/value at zero-length
+    runTest("zeroval", 1, 0, 10000, true);
+    runTest("zerokey", 0, 1, 10000, true);
+    runTest("zerokeyval", 0, 0, 10000, false);
+    runTest("zerokeyvalfull", 0, 0, 10000, 1, 0.05f, 1.0f, false);
+  }
+
+}