You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2010/07/08 02:10:38 UTC

svn commit: r961542 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java

Author: cdouglas
Date: Thu Jul  8 00:10:38 2010
New Revision: 961542

URL: http://svn.apache.org/viewvc?rev=961542&view=rev
Log:
MAPREDUCE-1820. Fix InputSampler to clone sampled keys. Contributed by Alex Kozlov

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=961542&r1=961541&r2=961542&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jul  8 00:10:38 2010
@@ -149,6 +149,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1838. Reduce the time needed for raiding a bunch of files
     by randomly assigning files to map tasks. (Ramkumar Vadali via dhruba)
 
+    MAPREDUCE-1820. Fix InputSampler to clone sampled keys. (Alex Kozlov via
+    cdouglas)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java?rev=961542&r1=961541&r2=961542&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java Thu Jul  8 00:10:38 2010
@@ -26,7 +26,6 @@ import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -64,7 +63,7 @@ public class InputSampler<K,V> extends C
       "      [-inFormat <input format class>]\n" +
       "      [-keyClass <map input & output key class>]\n" +
       "      [-splitRandom <double pcnt> <numSamples> <maxsplits> | " +
-      "// Sample from random splits at random (general)\n" +
+      "             // Sample from random splits at random (general)\n" +
       "       -splitSample <numSamples> <maxsplits> | " +
       "             // Sample from first records in splits (random data)\n"+
       "       -splitInterval <double pcnt> <maxsplits>]" +
@@ -130,16 +129,17 @@ public class InputSampler<K,V> extends C
       List<InputSplit> splits = inf.getSplits(job);
       ArrayList<K> samples = new ArrayList<K>(numSamples);
       int splitsToSample = Math.min(maxSplitsSampled, splits.size());
-      int splitStep = splits.size() / splitsToSample;
       int samplesPerSplit = numSamples / splitsToSample;
       long records = 0;
       for (int i = 0; i < splitsToSample; ++i) {
+        TaskAttemptContext samplingContext = new TaskAttemptContextImpl(
+            job.getConfiguration(), new TaskAttemptID());
         RecordReader<K,V> reader = inf.createRecordReader(
-          splits.get(i * splitStep), 
-          new TaskAttemptContextImpl(job.getConfiguration(), 
-                                     new TaskAttemptID()));
+            splits.get(i), samplingContext);
+        reader.initialize(splits.get(i), samplingContext);
         while (reader.nextKeyValue()) {
-          samples.add(reader.getCurrentKey());
+          samples.add(ReflectionUtils.copy(job.getConfiguration(),
+                                           reader.getCurrentKey(), null));
           ++records;
           if ((i+1) * samplesPerSplit <= records) {
             break;
@@ -214,13 +214,16 @@ public class InputSampler<K,V> extends C
       // the target sample keyset
       for (int i = 0; i < splitsToSample ||
                      (i < splits.size() && samples.size() < numSamples); ++i) {
-        RecordReader<K,V> reader = inf.createRecordReader(splits.get(i), 
-          new TaskAttemptContextImpl(job.getConfiguration(), 
-                                     new TaskAttemptID()));
+        TaskAttemptContext samplingContext = new TaskAttemptContextImpl(
+            job.getConfiguration(), new TaskAttemptID());
+        RecordReader<K,V> reader = inf.createRecordReader(
+            splits.get(i), samplingContext);
+        reader.initialize(splits.get(i), samplingContext);
         while (reader.nextKeyValue()) {
           if (r.nextDouble() <= freq) {
             if (samples.size() < numSamples) {
-              samples.add(reader.getCurrentKey());
+              samples.add(ReflectionUtils.copy(job.getConfiguration(),
+                                               reader.getCurrentKey(), null));
             } else {
               // When exceeding the maximum number of samples, replace a
               // random element with this one, then adjust the frequency
@@ -228,7 +231,8 @@ public class InputSampler<K,V> extends C
               // pushed out
               int ind = r.nextInt(numSamples);
               if (ind != numSamples) {
-                samples.set(ind, reader.getCurrentKey());
+                samples.set(ind, ReflectionUtils.copy(job.getConfiguration(),
+                                 reader.getCurrentKey(), null));
               }
               freq *= (numSamples - 1) / (double) numSamples;
             }
@@ -278,19 +282,20 @@ public class InputSampler<K,V> extends C
       List<InputSplit> splits = inf.getSplits(job);
       ArrayList<K> samples = new ArrayList<K>();
       int splitsToSample = Math.min(maxSplitsSampled, splits.size());
-      int splitStep = splits.size() / splitsToSample;
       long records = 0;
       long kept = 0;
       for (int i = 0; i < splitsToSample; ++i) {
+        TaskAttemptContext samplingContext = new TaskAttemptContextImpl(
+            job.getConfiguration(), new TaskAttemptID());
         RecordReader<K,V> reader = inf.createRecordReader(
-          splits.get(i * splitStep),
-          new TaskAttemptContextImpl(job.getConfiguration(), 
-                                     new TaskAttemptID()));
+            splits.get(i), samplingContext);
+        reader.initialize(splits.get(i), samplingContext);
         while (reader.nextKeyValue()) {
           ++records;
           if ((double) kept / records < freq) {
+            samples.add(ReflectionUtils.copy(job.getConfiguration(),
+                                 reader.getCurrentKey(), null));
             ++kept;
-            samples.add(reader.getCurrentKey());
           }
         }
         reader.close();

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java?rev=961542&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java Thu Jul  8 00:10:38 2010
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.partition;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class TestInputSampler {
+
+  static class SequentialSplit extends InputSplit {
+    private int i;
+    SequentialSplit(int i) {
+      this.i = i;
+    }
+    public long getLength() { return 0; }
+    public String[] getLocations() { return new String[0]; }
+    public int getInit() { return i; }
+  }
+
+  static class TestInputSamplerIF
+      extends InputFormat<IntWritable,NullWritable> {
+
+    final int maxDepth;
+    final ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
+
+    TestInputSamplerIF(int maxDepth, int numSplits, int... splitInit) {
+      this.maxDepth = maxDepth;
+      assert splitInit.length == numSplits;
+      for (int i = 0; i < numSplits; ++i) {
+        splits.add(new SequentialSplit(splitInit[i]));
+      }
+    }
+
+    public List<InputSplit> getSplits(JobContext context)
+        throws IOException, InterruptedException {
+      return splits;
+    }
+
+    public RecordReader<IntWritable,NullWritable> createRecordReader(
+        final InputSplit split, TaskAttemptContext context)
+        throws IOException, InterruptedException {
+      return new RecordReader<IntWritable,NullWritable>() {
+        private int maxVal;
+        private final IntWritable i = new IntWritable();
+        public void initialize(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+          i.set(((SequentialSplit)split).getInit() - 1);
+          maxVal = i.get() + maxDepth + 1;
+        }
+        public boolean nextKeyValue() {
+          i.set(i.get() + 1);
+          return i.get() < maxVal;
+        }
+        public IntWritable getCurrentKey() { return i; }
+        public NullWritable getCurrentValue() { return NullWritable.get(); }
+        public float getProgress() { return 1.0f; }
+        public void close() { }
+      };
+    }
+
+  }
+
+  /**
+   * Verify SplitSampler contract, that an equal number of records are taken
+   * from the first splits.
+   */
+  @Test
+  @SuppressWarnings("unchecked") // IntWritable comparator not typesafe
+  public void testSplitSampler() throws Exception {
+    final int TOT_SPLITS = 15;
+    final int NUM_SPLITS = 5;
+    final int STEP_SAMPLE = 5;
+    final int NUM_SAMPLES = NUM_SPLITS * STEP_SAMPLE;
+    InputSampler.Sampler<IntWritable,NullWritable> sampler =
+      new InputSampler.SplitSampler<IntWritable,NullWritable>(
+          NUM_SAMPLES, NUM_SPLITS);
+    int inits[] = new int[TOT_SPLITS];
+    for (int i = 0; i < TOT_SPLITS; ++i) {
+      inits[i] = i * STEP_SAMPLE;
+    }
+    Job ignored = Job.getInstance();
+    Object[] samples = sampler.getSample(
+        new TestInputSamplerIF(100000, TOT_SPLITS, inits), ignored);
+    assertEquals(NUM_SAMPLES, samples.length);
+    Arrays.sort(samples, new IntWritable.Comparator());
+    for (int i = 0; i < NUM_SAMPLES; ++i) {
+      assertEquals(i, ((IntWritable)samples[i]).get());
+    }
+  }
+
+  /**
+   * Verify IntervalSampler contract, that samples are taken at regular
+   * intervals from the given splits.
+   */
+  @Test
+  @SuppressWarnings("unchecked") // IntWritable comparator not typesafe
+  public void testIntervalSampler() throws Exception {
+    final int TOT_SPLITS = 16;
+    final int PER_SPLIT_SAMPLE = 4;
+    final int NUM_SAMPLES = TOT_SPLITS * PER_SPLIT_SAMPLE;
+    final double FREQ = 1.0 / TOT_SPLITS;
+    InputSampler.Sampler<IntWritable,NullWritable> sampler =
+      new InputSampler.IntervalSampler<IntWritable,NullWritable>(
+          FREQ, NUM_SAMPLES);
+    int inits[] = new int[TOT_SPLITS];
+    for (int i = 0; i < TOT_SPLITS; ++i) {
+      inits[i] = i;
+    }
+    Job ignored = Job.getInstance();
+    Object[] samples = sampler.getSample(new TestInputSamplerIF(
+          NUM_SAMPLES, TOT_SPLITS, inits), ignored);
+    assertEquals(NUM_SAMPLES, samples.length);
+    Arrays.sort(samples, new IntWritable.Comparator());
+    for (int i = 0; i < NUM_SAMPLES; ++i) {
+      assertEquals(i, ((IntWritable)samples[i]).get());
+    }
+  }
+
+}