You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by to...@apache.org on 2011/01/04 01:52:06 UTC
svn commit: r1054845 - in /hbase/trunk: CHANGES.txt
src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java
Author: todd
Date: Tue Jan 4 00:52:06 2011
New Revision: 1054845
URL: http://svn.apache.org/viewvc?rev=1054845&view=rev
Log:
HBASE-3392. Update backport of InputSampler to reflect MAPREDUCE-1820
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1054845&r1=1054844&r2=1054845&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue Jan 4 00:52:06 2011
@@ -1297,6 +1297,7 @@ Release 0.90.0 - Unreleased
HBASE-2467 Concurrent flushers in HLog sync using HDFS-895
HBASE-3349 Pass HBase configuration to HttpServer
HBASE-3372 HRS shouldn't print a full stack for ServerNotRunningException
+ HBASE-3392 Update backport of InputSampler to reflect MAPREDUCE-1820
NEW FEATURES
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java?rev=1054845&r1=1054844&r2=1054845&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java Tue Jan 4 00:52:06 2011
@@ -51,7 +51,7 @@ import org.apache.hadoop.util.ToolRunner
* {@link TotalOrderPartitioner}.
*
* This is an identical copy of o.a.h.mapreduce.lib.partition.TotalOrderPartitioner
- * from Hadoop trunk at r910774, with the exception of replacing
+ * from Hadoop trunk at r961542, with the exception of replacing
* TaskAttemptContextImpl with TaskAttemptContext.
*/
public class InputSampler<K,V> extends Configured implements Tool {
@@ -63,7 +63,7 @@ public class InputSampler<K,V> extends C
" [-inFormat <input format class>]\n" +
" [-keyClass <map input & output key class>]\n" +
" [-splitRandom <double pcnt> <numSamples> <maxsplits> | " +
- "// Sample from random splits at random (general)\n" +
+ " // Sample from random splits at random (general)\n" +
" -splitSample <numSamples> <maxsplits> | " +
" // Sample from first records in splits (random data)\n"+
" -splitInterval <double pcnt> <maxsplits>]" +
@@ -129,16 +129,17 @@ public class InputSampler<K,V> extends C
List<InputSplit> splits = inf.getSplits(job);
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.size());
- int splitStep = splits.size() / splitsToSample;
int samplesPerSplit = numSamples / splitsToSample;
long records = 0;
for (int i = 0; i < splitsToSample; ++i) {
+ TaskAttemptContext samplingContext = new TaskAttemptContext(
+ job.getConfiguration(), new TaskAttemptID());
RecordReader<K,V> reader = inf.createRecordReader(
- splits.get(i * splitStep),
- new TaskAttemptContext(job.getConfiguration(),
- new TaskAttemptID()));
+ splits.get(i), samplingContext);
+ reader.initialize(splits.get(i), samplingContext);
while (reader.nextKeyValue()) {
- samples.add(reader.getCurrentKey());
+ samples.add(ReflectionUtils.copy(job.getConfiguration(),
+ reader.getCurrentKey(), null));
++records;
if ((i+1) * samplesPerSplit <= records) {
break;
@@ -213,13 +214,16 @@ public class InputSampler<K,V> extends C
// the target sample keyset
for (int i = 0; i < splitsToSample ||
(i < splits.size() && samples.size() < numSamples); ++i) {
- RecordReader<K,V> reader = inf.createRecordReader(splits.get(i),
- new TaskAttemptContext(job.getConfiguration(),
- new TaskAttemptID()));
+ TaskAttemptContext samplingContext = new TaskAttemptContext(
+ job.getConfiguration(), new TaskAttemptID());
+ RecordReader<K,V> reader = inf.createRecordReader(
+ splits.get(i), samplingContext);
+ reader.initialize(splits.get(i), samplingContext);
while (reader.nextKeyValue()) {
if (r.nextDouble() <= freq) {
if (samples.size() < numSamples) {
- samples.add(reader.getCurrentKey());
+ samples.add(ReflectionUtils.copy(job.getConfiguration(),
+ reader.getCurrentKey(), null));
} else {
// When exceeding the maximum number of samples, replace a
// random element with this one, then adjust the frequency
@@ -227,7 +231,8 @@ public class InputSampler<K,V> extends C
// pushed out
int ind = r.nextInt(numSamples);
if (ind != numSamples) {
- samples.set(ind, reader.getCurrentKey());
+ samples.set(ind, ReflectionUtils.copy(job.getConfiguration(),
+ reader.getCurrentKey(), null));
}
freq *= (numSamples - 1) / (double) numSamples;
}
@@ -277,19 +282,20 @@ public class InputSampler<K,V> extends C
List<InputSplit> splits = inf.getSplits(job);
ArrayList<K> samples = new ArrayList<K>();
int splitsToSample = Math.min(maxSplitsSampled, splits.size());
- int splitStep = splits.size() / splitsToSample;
long records = 0;
long kept = 0;
for (int i = 0; i < splitsToSample; ++i) {
+ TaskAttemptContext samplingContext = new TaskAttemptContext(
+ job.getConfiguration(), new TaskAttemptID());
RecordReader<K,V> reader = inf.createRecordReader(
- splits.get(i * splitStep),
- new TaskAttemptContext(job.getConfiguration(),
- new TaskAttemptID()));
+ splits.get(i), samplingContext);
+ reader.initialize(splits.get(i), samplingContext);
while (reader.nextKeyValue()) {
++records;
if ((double) kept / records < freq) {
+ samples.add(ReflectionUtils.copy(job.getConfiguration(),
+ reader.getCurrentKey(), null));
++kept;
- samples.add(reader.getCurrentKey());
}
}
reader.close();