You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ss...@apache.org on 2012/02/08 01:56:29 UTC
svn commit: r1241721 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoo...
Author: sseth
Date: Wed Feb 8 00:56:28 2012
New Revision: 1241721
URL: http://svn.apache.org/viewvc?rev=1241721&view=rev
Log:
MAPREDUCE-3822. Changed FS counter computation to use all occurences of the same FS scheme, instead of randomly using one. (Contributed by Mahadev Konar)
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Counter.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1241721&r1=1241720&r2=1241721&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Feb 8 00:56:28 2012
@@ -767,6 +767,10 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3827. Changed Counters to use ConcurrentSkipListMap for
performance. (vinodkv via acmurthy)
+ MAPREDUCE-3822. Changed FS counter computation to use all occurences of
+ the same FS scheme, instead of randomly using one. (Mahadev Konar via
+ sseth)
+
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1241721&r1=1241720&r2=1241721&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Wed Feb 8 00:56:28 2012
@@ -141,7 +141,7 @@ class MapTask extends Task {
private TaskReporter reporter;
private long bytesInPrev = -1;
private long bytesInCurr = -1;
- private final Statistics fsStats;
+ private final List<Statistics> fsStats;
TrackedRecordReader(TaskReporter reporter, JobConf job)
throws IOException{
@@ -149,7 +149,7 @@ class MapTask extends Task {
fileInputByteCounter = reporter.getCounter(FileInputFormatCounter.BYTES_READ);
this.reporter = reporter;
- Statistics matchedStats = null;
+ List<Statistics> matchedStats = null;
if (this.reporter.getInputSplit() instanceof FileSplit) {
matchedStats = getFsStatistics(((FileSplit) this.reporter
.getInputSplit()).getPath(), job);
@@ -210,8 +210,13 @@ class MapTask extends Task {
return reporter;
}
- private long getInputBytes(Statistics stats) {
- return stats == null ? 0 : stats.getBytesRead();
+ private long getInputBytes(List<Statistics> stats) {
+ if (stats == null) return 0;
+ long bytesRead = 0;
+ for (Statistics stat: stats) {
+ bytesRead = bytesRead + stat.getBytesRead();
+ }
+ return bytesRead;
}
}
@@ -426,7 +431,7 @@ class MapTask extends Task {
private final org.apache.hadoop.mapreduce.Counter inputRecordCounter;
private final org.apache.hadoop.mapreduce.Counter fileInputByteCounter;
private final TaskReporter reporter;
- private final Statistics fsStats;
+ private final List<Statistics> fsStats;
NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
@@ -439,7 +444,7 @@ class MapTask extends Task {
this.fileInputByteCounter = reporter
.getCounter(FileInputFormatCounter.BYTES_READ);
- Statistics matchedStats = null;
+ List <Statistics> matchedStats = null;
if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split)
.getPath(), taskContext.getConfiguration());
@@ -498,8 +503,13 @@ class MapTask extends Task {
return result;
}
- private long getInputBytes(Statistics stats) {
- return stats == null ? 0 : stats.getBytesRead();
+ private long getInputBytes(List<Statistics> stats) {
+ if (stats == null) return 0;
+ long bytesRead = 0;
+ for (Statistics stat: stats) {
+ bytesRead = bytesRead + stat.getBytesRead();
+ }
+ return bytesRead;
}
}
@@ -554,7 +564,7 @@ class MapTask extends Task {
private final Counters.Counter mapOutputRecordCounter;
private final Counters.Counter fileOutputByteCounter;
- private final Statistics fsStats;
+ private final List<Statistics> fsStats;
@SuppressWarnings("unchecked")
NewDirectOutputCollector(MRJobConfig jobContext,
@@ -566,7 +576,7 @@ class MapTask extends Task {
fileOutputByteCounter = reporter
.getCounter(FileOutputFormatCounter.BYTES_WRITTEN);
- Statistics matchedStats = null;
+ List<Statistics> matchedStats = null;
if (outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
.getOutputPath(taskContext), taskContext.getConfiguration());
@@ -603,8 +613,13 @@ class MapTask extends Task {
}
}
- private long getOutputBytes(Statistics stats) {
- return stats == null ? 0 : stats.getBytesWritten();
+ private long getOutputBytes(List<Statistics> stats) {
+ if (stats == null) return 0;
+ long bytesWritten = 0;
+ for (Statistics stat: stats) {
+ bytesWritten = bytesWritten + stat.getBytesWritten();
+ }
+ return bytesWritten;
}
}
@@ -735,7 +750,7 @@ class MapTask extends Task {
private final Counters.Counter mapOutputRecordCounter;
private final Counters.Counter fileOutputByteCounter;
- private final Statistics fsStats;
+ private final List<Statistics> fsStats;
@SuppressWarnings("unchecked")
public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
@@ -750,7 +765,7 @@ class MapTask extends Task {
fileOutputByteCounter = reporter
.getCounter(FileOutputFormatCounter.BYTES_WRITTEN);
- Statistics matchedStats = null;
+ List<Statistics> matchedStats = null;
if (outputFormat instanceof FileOutputFormat) {
matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job), job);
}
@@ -785,8 +800,13 @@ class MapTask extends Task {
mapOutputRecordCounter.increment(1);
}
- private long getOutputBytes(Statistics stats) {
- return stats == null ? 0 : stats.getBytesWritten();
+ private long getOutputBytes(List<Statistics> stats) {
+ if (stats == null) return 0;
+ long bytesWritten = 0;
+ for (Statistics stat: stats) {
+ bytesWritten = bytesWritten + stat.getBytesWritten();
+ }
+ return bytesWritten;
}
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1241721&r1=1241720&r2=1241721&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java Wed Feb 8 00:56:28 2012
@@ -476,14 +476,14 @@ public class ReduceTask extends Task {
private final RecordWriter<K, V> real;
private final org.apache.hadoop.mapred.Counters.Counter reduceOutputCounter;
private final org.apache.hadoop.mapred.Counters.Counter fileOutputByteCounter;
- private final Statistics fsStats;
+ private final List<Statistics> fsStats;
@SuppressWarnings({ "deprecation", "unchecked" })
public OldTrackingRecordWriter(ReduceTask reduce, JobConf job,
TaskReporter reporter, String finalName) throws IOException {
this.reduceOutputCounter = reduce.reduceOutputCounter;
this.fileOutputByteCounter = reduce.fileOutputByteCounter;
- Statistics matchedStats = null;
+ List<Statistics> matchedStats = null;
if (job.getOutputFormat() instanceof FileOutputFormat) {
matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job), job);
}
@@ -514,8 +514,13 @@ public class ReduceTask extends Task {
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
}
- private long getOutputBytes(Statistics stats) {
- return stats == null ? 0 : stats.getBytesWritten();
+ private long getOutputBytes(List<Statistics> stats) {
+ if (stats == null) return 0;
+ long bytesWritten = 0;
+ for (Statistics stat: stats) {
+ bytesWritten = bytesWritten + stat.getBytesWritten();
+ }
+ return bytesWritten;
}
}
@@ -524,7 +529,7 @@ public class ReduceTask extends Task {
private final org.apache.hadoop.mapreduce.RecordWriter<K,V> real;
private final org.apache.hadoop.mapreduce.Counter outputRecordCounter;
private final org.apache.hadoop.mapreduce.Counter fileOutputByteCounter;
- private final Statistics fsStats;
+ private final List<Statistics> fsStats;
@SuppressWarnings("unchecked")
NewTrackingRecordWriter(ReduceTask reduce,
@@ -533,7 +538,7 @@ public class ReduceTask extends Task {
this.outputRecordCounter = reduce.reduceOutputCounter;
this.fileOutputByteCounter = reduce.fileOutputByteCounter;
- Statistics matchedStats = null;
+ List<Statistics> matchedStats = null;
if (reduce.outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
.getOutputPath(taskContext), taskContext.getConfiguration());
@@ -566,8 +571,13 @@ public class ReduceTask extends Task {
outputRecordCounter.increment(1);
}
- private long getOutputBytes(Statistics stats) {
- return stats == null ? 0 : stats.getBytesWritten();
+ private long getOutputBytes(List<Statistics> stats) {
+ if (stats == null) return 0;
+ long bytesWritten = 0;
+ for (Statistics stat: stats) {
+ bytesWritten = bytesWritten + stat.getBytesWritten();
+ }
+ return bytesWritten;
}
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1241721&r1=1241720&r2=1241721&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Wed Feb 8 00:56:28 2012
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.text.NumberFormat;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -326,14 +327,13 @@ abstract public class Task implements Wr
* the path.
* @return a Statistics instance, or null if none is found for the scheme.
*/
- protected static Statistics getFsStatistics(Path path, Configuration conf) throws IOException {
- Statistics matchedStats = null;
+ protected static List<Statistics> getFsStatistics(Path path, Configuration conf) throws IOException {
+ List<Statistics> matchedStats = new ArrayList<FileSystem.Statistics>();
path = path.getFileSystem(conf).makeQualified(path);
String scheme = path.toUri().getScheme();
for (Statistics stats : FileSystem.getAllStatistics()) {
if (stats.getScheme().equals(scheme)) {
- matchedStats = stats;
- break;
+ matchedStats.add(stats);
}
}
return matchedStats;
@@ -866,41 +866,53 @@ abstract public class Task implements Wr
* system and only creates the counters when they are needed.
*/
class FileSystemStatisticUpdater {
- private FileSystem.Statistics stats;
+ private List<FileSystem.Statistics> stats;
private Counters.Counter readBytesCounter, writeBytesCounter,
readOpsCounter, largeReadOpsCounter, writeOpsCounter;
-
- FileSystemStatisticUpdater(FileSystem.Statistics stats) {
+ private String scheme;
+ FileSystemStatisticUpdater(List<FileSystem.Statistics> stats, String scheme) {
this.stats = stats;
+ this.scheme = scheme;
}
void updateCounters() {
- String scheme = stats.getScheme();
if (readBytesCounter == null) {
readBytesCounter = counters.findCounter(scheme,
FileSystemCounter.BYTES_READ);
}
- readBytesCounter.setValue(stats.getBytesRead());
if (writeBytesCounter == null) {
writeBytesCounter = counters.findCounter(scheme,
FileSystemCounter.BYTES_WRITTEN);
}
- writeBytesCounter.setValue(stats.getBytesWritten());
if (readOpsCounter == null) {
readOpsCounter = counters.findCounter(scheme,
FileSystemCounter.READ_OPS);
}
- readOpsCounter.setValue(stats.getReadOps());
if (largeReadOpsCounter == null) {
largeReadOpsCounter = counters.findCounter(scheme,
FileSystemCounter.LARGE_READ_OPS);
}
- largeReadOpsCounter.setValue(stats.getLargeReadOps());
if (writeOpsCounter == null) {
writeOpsCounter = counters.findCounter(scheme,
FileSystemCounter.WRITE_OPS);
}
- writeOpsCounter.setValue(stats.getWriteOps());
+ long readBytes = 0;
+ long writeBytes = 0;
+ long readOps = 0;
+ long largeReadOps = 0;
+ long writeOps = 0;
+ for (FileSystem.Statistics stat: stats) {
+ readBytes = readBytes + stat.getBytesRead();
+ writeBytes = writeBytes + stat.getBytesWritten();
+ readOps = readOps + stat.getReadOps();
+ largeReadOps = largeReadOps + stat.getLargeReadOps();
+ writeOps = writeOps + stat.getWriteOps();
+ }
+ readBytesCounter.setValue(readBytes);
+ writeBytesCounter.setValue(writeBytes);
+ readOpsCounter.setValue(readOps);
+ largeReadOpsCounter.setValue(largeReadOps);
+ writeOpsCounter.setValue(writeOps);
}
}
@@ -911,16 +923,28 @@ abstract public class Task implements Wr
new HashMap<String, FileSystemStatisticUpdater>();
private synchronized void updateCounters() {
+ Map<String, List<FileSystem.Statistics>> map = new
+ HashMap<String, List<FileSystem.Statistics>>();
for(Statistics stat: FileSystem.getAllStatistics()) {
String uriScheme = stat.getScheme();
- FileSystemStatisticUpdater updater = statisticUpdaters.get(uriScheme);
+ if (map.containsKey(uriScheme)) {
+ List<FileSystem.Statistics> list = map.get(uriScheme);
+ list.add(stat);
+ } else {
+ List<FileSystem.Statistics> list = new ArrayList<FileSystem.Statistics>();
+ list.add(stat);
+ map.put(uriScheme, list);
+ }
+ }
+ for (Map.Entry<String, List<FileSystem.Statistics>> entry: map.entrySet()) {
+ FileSystemStatisticUpdater updater = statisticUpdaters.get(entry.getKey());
if(updater==null) {//new FileSystem has been found in the cache
- updater = new FileSystemStatisticUpdater(stat);
- statisticUpdaters.put(uriScheme, updater);
+ updater = new FileSystemStatisticUpdater(entry.getValue(), entry.getKey());
+ statisticUpdaters.put(entry.getKey(), updater);
}
- updater.updateCounters();
+ updater.updateCounters();
}
-
+
gcUpdater.incrementGcCounter();
updateResourceCounters();
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Counter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Counter.java?rev=1241721&r1=1241720&r2=1241721&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Counter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Counter.java Wed Feb 8 00:56:28 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.io.Writable;
/**
@@ -73,6 +74,7 @@ public interface Counter extends Writabl
*/
void increment(long incr);
+ @Private
/**
* Return the underlying object if this is a facade.
* @return the undelying object.
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java?rev=1241721&r1=1241720&r2=1241721&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java Wed Feb 8 00:56:28 2012
@@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.counters;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Counter;
@@ -99,6 +100,7 @@ public interface CounterGroupBase<T exte
*/
void incrAllCounters(CounterGroupBase<T> rightGroup);
+ @Private
/**
* Exposes the underlying group type if a facade.
* @return the underlying object that this object is wrapping up.