You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2010/07/08 02:10:38 UTC
svn commit: r961542 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java
src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java
Author: cdouglas
Date: Thu Jul 8 00:10:38 2010
New Revision: 961542
URL: http://svn.apache.org/viewvc?rev=961542&view=rev
Log:
MAPREDUCE-1820. Fix InputSampler to clone sampled keys. Contributed by Alex Kozlov
Added:
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=961542&r1=961541&r2=961542&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jul 8 00:10:38 2010
@@ -149,6 +149,9 @@ Trunk (unreleased changes)
MAPREDUCE-1838. Reduce the time needed for raiding a bunch of files
by randomly assigning files to map tasks. (Ramkumar Vadali via dhruba)
+ MAPREDUCE-1820. Fix InputSampler to clone sampled keys. (Alex Kozlov via
+ cdouglas)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java?rev=961542&r1=961541&r2=961542&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java Thu Jul 8 00:10:38 2010
@@ -26,7 +26,6 @@ import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -64,7 +63,7 @@ public class InputSampler<K,V> extends C
" [-inFormat <input format class>]\n" +
" [-keyClass <map input & output key class>]\n" +
" [-splitRandom <double pcnt> <numSamples> <maxsplits> | " +
- "// Sample from random splits at random (general)\n" +
+ " // Sample from random splits at random (general)\n" +
" -splitSample <numSamples> <maxsplits> | " +
" // Sample from first records in splits (random data)\n"+
" -splitInterval <double pcnt> <maxsplits>]" +
@@ -130,16 +129,17 @@ public class InputSampler<K,V> extends C
List<InputSplit> splits = inf.getSplits(job);
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.size());
- int splitStep = splits.size() / splitsToSample;
int samplesPerSplit = numSamples / splitsToSample;
long records = 0;
for (int i = 0; i < splitsToSample; ++i) {
+ TaskAttemptContext samplingContext = new TaskAttemptContextImpl(
+ job.getConfiguration(), new TaskAttemptID());
RecordReader<K,V> reader = inf.createRecordReader(
- splits.get(i * splitStep),
- new TaskAttemptContextImpl(job.getConfiguration(),
- new TaskAttemptID()));
+ splits.get(i), samplingContext);
+ reader.initialize(splits.get(i), samplingContext);
while (reader.nextKeyValue()) {
- samples.add(reader.getCurrentKey());
+ samples.add(ReflectionUtils.copy(job.getConfiguration(),
+ reader.getCurrentKey(), null));
++records;
if ((i+1) * samplesPerSplit <= records) {
break;
@@ -214,13 +214,16 @@ public class InputSampler<K,V> extends C
// the target sample keyset
for (int i = 0; i < splitsToSample ||
(i < splits.size() && samples.size() < numSamples); ++i) {
- RecordReader<K,V> reader = inf.createRecordReader(splits.get(i),
- new TaskAttemptContextImpl(job.getConfiguration(),
- new TaskAttemptID()));
+ TaskAttemptContext samplingContext = new TaskAttemptContextImpl(
+ job.getConfiguration(), new TaskAttemptID());
+ RecordReader<K,V> reader = inf.createRecordReader(
+ splits.get(i), samplingContext);
+ reader.initialize(splits.get(i), samplingContext);
while (reader.nextKeyValue()) {
if (r.nextDouble() <= freq) {
if (samples.size() < numSamples) {
- samples.add(reader.getCurrentKey());
+ samples.add(ReflectionUtils.copy(job.getConfiguration(),
+ reader.getCurrentKey(), null));
} else {
// When exceeding the maximum number of samples, replace a
// random element with this one, then adjust the frequency
@@ -228,7 +231,8 @@ public class InputSampler<K,V> extends C
// pushed out
int ind = r.nextInt(numSamples);
if (ind != numSamples) {
- samples.set(ind, reader.getCurrentKey());
+ samples.set(ind, ReflectionUtils.copy(job.getConfiguration(),
+ reader.getCurrentKey(), null));
}
freq *= (numSamples - 1) / (double) numSamples;
}
@@ -278,19 +282,20 @@ public class InputSampler<K,V> extends C
List<InputSplit> splits = inf.getSplits(job);
ArrayList<K> samples = new ArrayList<K>();
int splitsToSample = Math.min(maxSplitsSampled, splits.size());
- int splitStep = splits.size() / splitsToSample;
long records = 0;
long kept = 0;
for (int i = 0; i < splitsToSample; ++i) {
+ TaskAttemptContext samplingContext = new TaskAttemptContextImpl(
+ job.getConfiguration(), new TaskAttemptID());
RecordReader<K,V> reader = inf.createRecordReader(
- splits.get(i * splitStep),
- new TaskAttemptContextImpl(job.getConfiguration(),
- new TaskAttemptID()));
+ splits.get(i), samplingContext);
+ reader.initialize(splits.get(i), samplingContext);
while (reader.nextKeyValue()) {
++records;
if ((double) kept / records < freq) {
+ samples.add(ReflectionUtils.copy(job.getConfiguration(),
+ reader.getCurrentKey(), null));
++kept;
- samples.add(reader.getCurrentKey());
}
}
reader.close();
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java?rev=961542&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java Thu Jul 8 00:10:38 2010
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.partition;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class TestInputSampler {
+
+ static class SequentialSplit extends InputSplit {
+ private int i;
+ SequentialSplit(int i) {
+ this.i = i;
+ }
+ public long getLength() { return 0; }
+ public String[] getLocations() { return new String[0]; }
+ public int getInit() { return i; }
+ }
+
+ static class TestInputSamplerIF
+ extends InputFormat<IntWritable,NullWritable> {
+
+ final int maxDepth;
+ final ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
+
+ TestInputSamplerIF(int maxDepth, int numSplits, int... splitInit) {
+ this.maxDepth = maxDepth;
+ assert splitInit.length == numSplits;
+ for (int i = 0; i < numSplits; ++i) {
+ splits.add(new SequentialSplit(splitInit[i]));
+ }
+ }
+
+ public List<InputSplit> getSplits(JobContext context)
+ throws IOException, InterruptedException {
+ return splits;
+ }
+
+ public RecordReader<IntWritable,NullWritable> createRecordReader(
+ final InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new RecordReader<IntWritable,NullWritable>() {
+ private int maxVal;
+ private final IntWritable i = new IntWritable();
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ i.set(((SequentialSplit)split).getInit() - 1);
+ maxVal = i.get() + maxDepth + 1;
+ }
+ public boolean nextKeyValue() {
+ i.set(i.get() + 1);
+ return i.get() < maxVal;
+ }
+ public IntWritable getCurrentKey() { return i; }
+ public NullWritable getCurrentValue() { return NullWritable.get(); }
+ public float getProgress() { return 1.0f; }
+ public void close() { }
+ };
+ }
+
+ }
+
+ /**
+ * Verify SplitSampler contract, that an equal number of records are taken
+ * from the first splits.
+ */
+ @Test
+ @SuppressWarnings("unchecked") // IntWritable comparator not typesafe
+ public void testSplitSampler() throws Exception {
+ final int TOT_SPLITS = 15;
+ final int NUM_SPLITS = 5;
+ final int STEP_SAMPLE = 5;
+ final int NUM_SAMPLES = NUM_SPLITS * STEP_SAMPLE;
+ InputSampler.Sampler<IntWritable,NullWritable> sampler =
+ new InputSampler.SplitSampler<IntWritable,NullWritable>(
+ NUM_SAMPLES, NUM_SPLITS);
+ int inits[] = new int[TOT_SPLITS];
+ for (int i = 0; i < TOT_SPLITS; ++i) {
+ inits[i] = i * STEP_SAMPLE;
+ }
+ Job ignored = Job.getInstance();
+ Object[] samples = sampler.getSample(
+ new TestInputSamplerIF(100000, TOT_SPLITS, inits), ignored);
+ assertEquals(NUM_SAMPLES, samples.length);
+ Arrays.sort(samples, new IntWritable.Comparator());
+ for (int i = 0; i < NUM_SAMPLES; ++i) {
+ assertEquals(i, ((IntWritable)samples[i]).get());
+ }
+ }
+
+ /**
+ * Verify IntervalSampler contract, that samples are taken at regular
+ * intervals from the given splits.
+ */
+ @Test
+ @SuppressWarnings("unchecked") // IntWritable comparator not typesafe
+ public void testIntervalSampler() throws Exception {
+ final int TOT_SPLITS = 16;
+ final int PER_SPLIT_SAMPLE = 4;
+ final int NUM_SAMPLES = TOT_SPLITS * PER_SPLIT_SAMPLE;
+ final double FREQ = 1.0 / TOT_SPLITS;
+ InputSampler.Sampler<IntWritable,NullWritable> sampler =
+ new InputSampler.IntervalSampler<IntWritable,NullWritable>(
+ FREQ, NUM_SAMPLES);
+ int inits[] = new int[TOT_SPLITS];
+ for (int i = 0; i < TOT_SPLITS; ++i) {
+ inits[i] = i;
+ }
+ Job ignored = Job.getInstance();
+ Object[] samples = sampler.getSample(new TestInputSamplerIF(
+ NUM_SAMPLES, TOT_SPLITS, inits), ignored);
+ assertEquals(NUM_SAMPLES, samples.length);
+ Arrays.sort(samples, new IntWritable.Comparator());
+ for (int i = 0; i < NUM_SAMPLES; ++i) {
+ assertEquals(i, ((IntWritable)samples[i]).get());
+ }
+ }
+
+}