You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2013/05/08 23:43:41 UTC

svn commit: r1480476 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/...

Author: vinodkv
Date: Wed May  8 21:43:40 2013
New Revision: 1480476

URL: http://svn.apache.org/r1480476
Log:
MAPREDUCE-5157. Bring back old sampler related code so that we can support binary compatibility with hadoop-1 sorter example. Contributed by Zhijie Shen.
svn merge --ignore-ancestry -c 1480474 ../../trunk/

Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1480476&r1=1480475&r2=1480476&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Wed May  8 21:43:40 2013
@@ -62,6 +62,9 @@ Release 2.0.5-beta - UNRELEASED
     MAPREDUCE-5159. Change ValueAggregatorJob to add APIs which can support
     binary compatibility with hadoop-1 examples. (Zhijie Shen via vinodkv)
 
+    MAPREDUCE-5157. Bring back old sampler related code so that we can support
+    binary compatibility with hadoop-1 sorter example. (Zhijie Shen via vinodkv)
+
   OPTIMIZATIONS
 
     MAPREDUCE-4974. Optimising the LineRecordReader initialize() method 

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java?rev=1480476&r1=1480475&r2=1480476&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java Wed May  8 21:43:40 2013
@@ -19,10 +19,18 @@
 package org.apache.hadoop.mapred.lib;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Random;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.Job;
 
 @InterfaceAudience.Public
@@ -30,6 +38,8 @@ import org.apache.hadoop.mapreduce.Job;
 public class InputSampler<K,V> extends 
   org.apache.hadoop.mapreduce.lib.partition.InputSampler<K, V> {
 
+  private static final Log LOG = LogFactory.getLog(InputSampler.class);
+
   public InputSampler(JobConf conf) {
     super(conf);
   }
@@ -38,4 +48,219 @@ public class InputSampler<K,V> extends 
       throws IOException, ClassNotFoundException, InterruptedException {
     writePartitionFile(new Job(job), sampler);
   }
+  /**
+   * Interface to sample using an {@link org.apache.hadoop.mapred.InputFormat}.
+   */
+  public interface Sampler<K,V> extends
+    org.apache.hadoop.mapreduce.lib.partition.InputSampler.Sampler<K, V> {
+    /**
+     * For a given job, collect and return a subset of the keys from the
+     * input data.
+     */
+    K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException;
+  }
+
+  /**
+   * Samples the first n records from s splits.
+   * Inexpensive way to sample random data.
+   */
+  public static class SplitSampler<K,V> extends
+      org.apache.hadoop.mapreduce.lib.partition.InputSampler.SplitSampler<K, V>
+          implements Sampler<K,V> {
+
+    /**
+     * Create a SplitSampler sampling <em>all</em> splits.
+     * Takes the first numSamples / numSplits records from each split.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     */
+    public SplitSampler(int numSamples) {
+      this(numSamples, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Create a new SplitSampler.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     * @param maxSplitsSampled The maximum number of splits to examine.
+     */
+    public SplitSampler(int numSamples, int maxSplitsSampled) {
+      super(numSamples, maxSplitsSampled);
+    }
+
+    /**
+     * From each split sampled, take the first numSamples / numSplits records.
+     */
+    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
+      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
+      ArrayList<K> samples = new ArrayList<K>(numSamples);
+      int splitsToSample = Math.min(maxSplitsSampled, splits.length);
+      int splitStep = splits.length / splitsToSample;
+      int samplesPerSplit = numSamples / splitsToSample;
+      long records = 0;
+      for (int i = 0; i < splitsToSample; ++i) {
+        RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
+            job, Reporter.NULL);
+        K key = reader.createKey();
+        V value = reader.createValue();
+        while (reader.next(key, value)) {
+          samples.add(key);
+          key = reader.createKey();
+          ++records;
+          if ((i+1) * samplesPerSplit <= records) {
+            break;
+          }
+        }
+        reader.close();
+      }
+      return (K[])samples.toArray();
+    }
+  }
+
+  /**
+   * Sample from random points in the input.
+   * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
+   * each split.
+   */
+  public static class RandomSampler<K,V> extends
+      org.apache.hadoop.mapreduce.lib.partition.InputSampler.RandomSampler<K, V>
+          implements Sampler<K,V> {
+
+    /**
+     * Create a new RandomSampler sampling <em>all</em> splits.
+     * This will read every split at the client, which is very expensive.
+     * @param freq Probability with which a key will be chosen.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     */
+    public RandomSampler(double freq, int numSamples) {
+      this(freq, numSamples, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Create a new RandomSampler.
+     * @param freq Probability with which a key will be chosen.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     * @param maxSplitsSampled The maximum number of splits to examine.
+     */
+    public RandomSampler(double freq, int numSamples, int maxSplitsSampled) {
+      super(freq, numSamples, maxSplitsSampled);
+    }
+
+    /**
+     * Randomize the split order, then take the specified number of keys from
+     * each split sampled, where each key is selected with the specified
+     * probability and possibly replaced by a subsequently selected key when
+     * the quota of keys from that split is satisfied.
+     */
+    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
+      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
+      ArrayList<K> samples = new ArrayList<K>(numSamples);
+      int splitsToSample = Math.min(maxSplitsSampled, splits.length);
+
+      Random r = new Random();
+      long seed = r.nextLong();
+      r.setSeed(seed);
+      LOG.debug("seed: " + seed);
+      // shuffle splits
+      for (int i = 0; i < splits.length; ++i) {
+        InputSplit tmp = splits[i];
+        int j = r.nextInt(splits.length);
+        splits[i] = splits[j];
+        splits[j] = tmp;
+      }
+      // our target rate is in terms of the maximum number of sample splits,
+      // but we accept the possibility of sampling additional splits to hit
+      // the target sample keyset
+      for (int i = 0; i < splitsToSample ||
+                     (i < splits.length && samples.size() < numSamples); ++i) {
+        RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
+            Reporter.NULL);
+        K key = reader.createKey();
+        V value = reader.createValue();
+        while (reader.next(key, value)) {
+          if (r.nextDouble() <= freq) {
+            if (samples.size() < numSamples) {
+              samples.add(key);
+            } else {
+              // When exceeding the maximum number of samples, replace a
+              // random element with this one, then adjust the frequency
+              // to reflect the possibility of existing elements being
+              // pushed out
+              int ind = r.nextInt(numSamples);
+              if (ind != numSamples) {
+                samples.set(ind, key);
+              }
+              freq *= (numSamples - 1) / (double) numSamples;
+            }
+            key = reader.createKey();
+          }
+        }
+        reader.close();
+      }
+      return (K[])samples.toArray();
+    }
+  }
+
+  /**
+   * Sample from s splits at regular intervals.
+   * Useful for sorted data.
+   */
+  public static class IntervalSampler<K,V> extends
+      org.apache.hadoop.mapreduce.lib.partition.InputSampler.IntervalSampler<K, V>
+          implements Sampler<K,V> {
+
+    /**
+     * Create a new IntervalSampler sampling <em>all</em> splits.
+     * @param freq The frequency with which records will be emitted.
+     */
+    public IntervalSampler(double freq) {
+      this(freq, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Create a new IntervalSampler.
+     * @param freq The frequency with which records will be emitted.
+     * @param maxSplitsSampled The maximum number of splits to examine.
+     * @see #getSample
+     */
+    public IntervalSampler(double freq, int maxSplitsSampled) {
+      super(freq, maxSplitsSampled);
+    }
+
+    /**
+     * For each split sampled, emit when the ratio of the number of records
+     * retained to the total record count is less than the specified
+     * frequency.
+     */
+    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
+      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
+      ArrayList<K> samples = new ArrayList<K>();
+      int splitsToSample = Math.min(maxSplitsSampled, splits.length);
+      int splitStep = splits.length / splitsToSample;
+      long records = 0;
+      long kept = 0;
+      for (int i = 0; i < splitsToSample; ++i) {
+        RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
+            job, Reporter.NULL);
+        K key = reader.createKey();
+        V value = reader.createValue();
+        while (reader.next(key, value)) {
+          ++records;
+          if ((double) kept / records < freq) {
+            ++kept;
+            samples.add(key);
+            key = reader.createKey();
+          }
+        }
+        reader.close();
+      }
+      return (K[])samples.toArray();
+    }
+  }
+
 }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java?rev=1480476&r1=1480475&r2=1480476&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java Wed May  8 21:43:40 2013
@@ -96,8 +96,8 @@ public class InputSampler<K,V> extends C
    */
   public static class SplitSampler<K,V> implements Sampler<K,V> {
 
-    private final int numSamples;
-    private final int maxSplitsSampled;
+    protected final int numSamples;
+    protected final int maxSplitsSampled;
 
     /**
      * Create a SplitSampler sampling <em>all</em> splits.
@@ -157,9 +157,9 @@ public class InputSampler<K,V> extends C
    * each split.
    */
   public static class RandomSampler<K,V> implements Sampler<K,V> {
-    private double freq;
-    private final int numSamples;
-    private final int maxSplitsSampled;
+    protected double freq;
+    protected final int numSamples;
+    protected final int maxSplitsSampled;
 
     /**
      * Create a new RandomSampler sampling <em>all</em> splits.
@@ -249,8 +249,8 @@ public class InputSampler<K,V> extends C
    * Useful for sorted data.
    */
   public static class IntervalSampler<K,V> implements Sampler<K,V> {
-    private final double freq;
-    private final int maxSplitsSampled;
+    protected final double freq;
+    protected final int maxSplitsSampled;
 
     /**
      * Create a new IntervalSampler sampling <em>all</em> splits.

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java?rev=1480476&r1=1480475&r2=1480476&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java Wed May  8 21:43:40 2013
@@ -17,23 +17,26 @@
  */
 package org.apache.hadoop.mapreduce.lib.partition;
 
+import static org.junit.Assert.assertEquals;
+
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.junit.Test;
-import static org.junit.Assert.*;
-
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.junit.Test;
 
 public class TestInputSampler {
 
@@ -47,6 +50,24 @@ public class TestInputSampler {
     public int getInit() { return i; }
   }
 
+  static class MapredSequentialSplit implements org.apache.hadoop.mapred.InputSplit {
+    private int i;
+    MapredSequentialSplit(int i) {
+      this.i = i;
+    }
+    @Override
+    public long getLength() { return 0; }
+    @Override
+    public String[] getLocations() { return new String[0]; }
+    public int getInit() { return i; }
+    @Override
+    public void write(DataOutput out) throws IOException {
+    }
+    @Override
+    public void readFields(DataInput in) throws IOException {
+    }
+  }
+
   static class TestInputSamplerIF
       extends InputFormat<IntWritable,NullWritable> {
 
@@ -90,6 +111,71 @@ public class TestInputSampler {
 
   }
 
+  static class TestMapredInputSamplerIF extends TestInputSamplerIF implements
+      org.apache.hadoop.mapred.InputFormat<IntWritable,NullWritable> {
+
+    TestMapredInputSamplerIF(int maxDepth, int numSplits, int... splitInit) {
+      super(maxDepth, numSplits, splitInit);
+    }
+
+    @Override
+    public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job,
+        int numSplits) throws IOException {
+      List<InputSplit> splits = null;
+      try {
+        splits = getSplits(Job.getInstance(job));
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+      org.apache.hadoop.mapred.InputSplit[] retVals =
+          new org.apache.hadoop.mapred.InputSplit[splits.size()];
+      for (int i = 0; i < splits.size(); ++i) {
+        MapredSequentialSplit split = new MapredSequentialSplit(
+            ((SequentialSplit) splits.get(i)).getInit());
+        retVals[i] = split;
+      }
+      return retVals;
+    }
+
+    @Override
+    public org.apache.hadoop.mapred.RecordReader<IntWritable, NullWritable>
+        getRecordReader(final org.apache.hadoop.mapred.InputSplit split,
+            JobConf job, Reporter reporter) throws IOException {
+      return new org.apache.hadoop.mapred.RecordReader
+          <IntWritable, NullWritable>() {
+        private final IntWritable i =
+            new IntWritable(((MapredSequentialSplit)split).getInit());
+        private int maxVal = i.get() + maxDepth + 1;
+
+        @Override
+        public boolean next(IntWritable key, NullWritable value)
+            throws IOException {
+          i.set(i.get() + 1);
+          return i.get() < maxVal;
+        }
+        @Override
+        public IntWritable createKey() {
+          return new IntWritable(i.get());
+        }
+        @Override
+        public NullWritable createValue() {
+          return NullWritable.get();
+        }
+        @Override
+        public long getPos() throws IOException {
+          return 0;
+        }
+        @Override
+        public void close() throws IOException {
+        }
+        @Override
+        public float getProgress() throws IOException {
+          return 0;
+        }
+      };
+    }
+  }
+
   /**
    * Verify SplitSampler contract, that an equal number of records are taken
    * from the first splits.
@@ -119,6 +205,36 @@ public class TestInputSampler {
   }
 
   /**
+   * Verify SplitSampler contract in mapred.lib.InputSampler, which is added
+   * back for binary compatibility of M/R 1.x
+   */
+  @Test (timeout = 30000)
+  @SuppressWarnings("unchecked") // IntWritable comparator not typesafe
+  public void testMapredSplitSampler() throws Exception {
+    final int TOT_SPLITS = 15;
+    final int NUM_SPLITS = 5;
+    final int STEP_SAMPLE = 5;
+    final int NUM_SAMPLES = NUM_SPLITS * STEP_SAMPLE;
+    org.apache.hadoop.mapred.lib.InputSampler.Sampler<IntWritable,NullWritable>
+        sampler = new org.apache.hadoop.mapred.lib.InputSampler.SplitSampler
+            <IntWritable,NullWritable>(NUM_SAMPLES, NUM_SPLITS);
+    int inits[] = new int[TOT_SPLITS];
+    for (int i = 0; i < TOT_SPLITS; ++i) {
+      inits[i] = i * STEP_SAMPLE;
+    }
+    Object[] samples = sampler.getSample(
+        new TestMapredInputSamplerIF(100000, TOT_SPLITS, inits),
+        new JobConf());
+    assertEquals(NUM_SAMPLES, samples.length);
+    Arrays.sort(samples, new IntWritable.Comparator());
+    for (int i = 0; i < NUM_SAMPLES; ++i) {
+      // mapred.lib.InputSampler.SplitSampler has a sampling step
+      assertEquals(i % STEP_SAMPLE + TOT_SPLITS * (i / STEP_SAMPLE),
+          ((IntWritable)samples[i]).get());
+    }
+  }
+
+  /**
    * Verify IntervalSampler contract, that samples are taken at regular
    * intervals from the given splits.
    */
@@ -146,4 +262,33 @@ public class TestInputSampler {
     }
   }
 
+  /**
+   * Verify IntervalSampler in mapred.lib.InputSampler, which is added back
+   * for binary compatibility of M/R 1.x
+   */
+  @Test (timeout = 30000)
+  @SuppressWarnings("unchecked") // IntWritable comparator not typesafe
+  public void testMapredIntervalSampler() throws Exception {
+    final int TOT_SPLITS = 16;
+    final int PER_SPLIT_SAMPLE = 4;
+    final int NUM_SAMPLES = TOT_SPLITS * PER_SPLIT_SAMPLE;
+    final double FREQ = 1.0 / TOT_SPLITS;
+    org.apache.hadoop.mapred.lib.InputSampler.Sampler<IntWritable,NullWritable>
+        sampler = new org.apache.hadoop.mapred.lib.InputSampler.IntervalSampler
+            <IntWritable,NullWritable>(FREQ, NUM_SAMPLES);
+    int inits[] = new int[TOT_SPLITS];
+    for (int i = 0; i < TOT_SPLITS; ++i) {
+      inits[i] = i;
+    }
+    Job ignored = Job.getInstance();
+    Object[] samples = sampler.getSample(new TestInputSamplerIF(
+          NUM_SAMPLES, TOT_SPLITS, inits), ignored);
+    assertEquals(NUM_SAMPLES, samples.length);
+    Arrays.sort(samples, new IntWritable.Comparator());
+    for (int i = 0; i < NUM_SAMPLES; ++i) {
+      assertEquals(i,
+          ((IntWritable)samples[i]).get());
+    }
+  }
+
 }