You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/06 23:27:16 UTC
[2/2] git commit: TEZ-901. Improvements to Counters generated by
runtime components. (sseth)
TEZ-901. Improvements to Counters generated by runtime components.
(sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/351a6105
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/351a6105
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/351a6105
Branch: refs/heads/master
Commit: 351a610589ef6f47bf00937c7f3289f4c4c1a204
Parents: 36e74ea
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Mar 6 14:26:58 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Mar 6 14:26:58 2014 -0800
----------------------------------------------------------------------
.../apache/tez/common/counters/TaskCounter.java | 151 +++++++++++++++----
.../org/apache/tez/mapreduce/input/MRInput.java | 48 +-----
.../apache/tez/mapreduce/output/MROutput.java | 54 +------
.../apache/tez/mapreduce/processor/MRTask.java | 9 +-
.../mapreduce/processor/map/MapProcessor.java | 13 --
.../processor/reduce/ReduceProcessor.java | 11 --
.../processor/map/TestMapProcessor.java | 2 +-
.../broadcast/input/BroadcastKVReader.java | 13 +-
.../input/BroadcastShuffleManager.java | 44 +++++-
.../broadcast/output/FileBasedKVWriter.java | 27 +++-
.../runtime/library/common/ValuesIterator.java | 2 +
.../common/localshuffle/LocalShuffle.java | 2 +-
.../library/common/shuffle/impl/Fetcher.java | 4 +-
.../common/shuffle/impl/InMemoryReader.java | 2 +-
.../common/shuffle/impl/InMemoryWriter.java | 4 +-
.../common/shuffle/impl/MergeManager.java | 76 ++++++++--
.../library/common/shuffle/impl/Shuffle.java | 21 ++-
.../common/shuffle/impl/ShuffleScheduler.java | 25 ++-
.../common/sort/impl/ExternalSorter.java | 41 +++--
.../runtime/library/common/sort/impl/IFile.java | 46 ++++--
.../common/sort/impl/PipelinedSorter.java | 8 +-
.../library/common/sort/impl/TezMerger.java | 64 +++++---
.../common/sort/impl/dflt/DefaultSorter.java | 32 +++-
.../library/input/ShuffledMergedInput.java | 2 +-
.../library/input/ShuffledUnorderedKVInput.java | 6 +-
.../library/output/OnFileSortedOutput.java | 2 +-
.../runtime/library/shuffle/common/Fetcher.java | 2 +-
.../library/shuffle/common/FetcherCallback.java | 4 +-
.../library/common/sort/impl/TestIFile.java | 6 +-
29 files changed, 467 insertions(+), 254 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
index b6fca27..47107c3 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
@@ -19,48 +19,143 @@
package org.apache.tez.common.counters;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-// TODO TEZAM5 For MR compatibility, a conversion from tez.TaskCounters to
-// mapreduce.TaskCounters will likely be required somewhere.
-// Similarly for FileSystemCounters and others.
// Counters used by Task classes
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
+@InterfaceAudience.Private
+
public enum TaskCounter {
// TODO Eventually, rename counters to be non-MR specific and map them to MR equivalent.
- MAP_INPUT_RECORDS,
- MAP_OUTPUT_RECORDS,
- MAP_SKIPPED_RECORDS,
- MAP_OUTPUT_BYTES,
- MAP_OUTPUT_MATERIALIZED_BYTES,
- SPLIT_RAW_BYTES,
- COMBINE_INPUT_RECORDS,
- COMBINE_OUTPUT_RECORDS,
+
+ /**
+ * Number of Input Groups seen by ShuffledMergedInput.
+ * Alternately the number of Input Groups seen by a Reduce task.
+ */
REDUCE_INPUT_GROUPS,
- REDUCE_SHUFFLE_BYTES,
+
+ /**
+ * Number of records (across all Groups) seen by ShuffledMergedInput
+ * Alternately number of records seen by a ReduceProcessor
+ */
REDUCE_INPUT_RECORDS,
- REDUCE_OUTPUT_RECORDS,
- REDUCE_SKIPPED_GROUPS,
- REDUCE_SKIPPED_RECORDS,
+
+ REDUCE_OUTPUT_RECORDS, // Not used at the moment.
+ REDUCE_SKIPPED_GROUPS, // Not used at the moment.
+ REDUCE_SKIPPED_RECORDS, // Not used at the moment.
+ SPLIT_RAW_BYTES,
+
+ COMBINE_INPUT_RECORDS,
+ COMBINE_OUTPUT_RECORDS, // Not used at the moment.
+
+ /**
+ * Number of records written to disk in case of OnFileSortedOutput.
+ *
+ * Number of additional records writtent out to disk in case of
+ * ShuffledMergedInput; this represents the number of unnecessary spills to
+ * disk caused by lac of memory.
+ */
SPILLED_RECORDS,
- SHUFFLED_MAPS,
- FAILED_SHUFFLE,
+
+ /**
+ * Number of Inputs from which data is copied. Represents physical Inputs.
+ */
+ NUM_SHUFFLED_INPUTS,
+
+ /**
+ * Number of failed copy attempts (physical inputs)
+ */
+ NUM_FAILED_SHUFFLE_INPUTS,
+
MERGED_MAP_OUTPUTS,
GC_TIME_MILLIS,
CPU_MILLISECONDS,
PHYSICAL_MEMORY_BYTES,
VIRTUAL_MEMORY_BYTES,
COMMITTED_HEAP_BYTES,
+
+ /**
+ * Represents the number of Input Records that were actually processed.
+ * Used by MRInput and ShuffledUnorderedKVInput
+ *
+ */
+ INPUT_RECORDS_PROCESSED,
- INPUT_RECORDS,
+ //
+ /**
+ * Represents the number of actual output records.
+ * Used by MROutput, OnFileSortedOutput, and OnFileUnorderedKVOutput
+ */
OUTPUT_RECORDS,
- SKIPPED_RECORDS,
+
+ SKIPPED_RECORDS, // Not used at the moment.
+
+ /**
+ * Represents the serialized output size (uncompressed) of data being written.
+ */
OUTPUT_BYTES,
- OUTPUT_MATERIALIZED_BYTES,
- INPUT_GROUPS,
+
+ /**
+ * Represents serialized output size (uncompressed) along with any overhead
+ * added by the format being used.
+ */
+ OUTPUT_BYTES_WITH_OVERHEAD,
+
+ /**
+ * Represents the actual physical size of the Output generated. This factors
+ * in Compression if it is enabled. (Will include actual serialized output
+ * size + overhead)
+ */
+ OUTPUT_BYTES_PHYSICAL,
+
+ /**
+ * Bytes written to disk due to unnecessary spills (lac of adequate memory).
+ * Used by OnFileSortedOutput and ShuffledMergedInput
+ */
+ ADDITIONAL_SPILLS_BYTES_WRITTEN,
+
+ /**
+ * Bytes read from disk due to previous spills (lac of adequate memory).
+ * Used by OnFileSortedOutput and ShuffledMergedInput
+ */
+ ADDITIONAL_SPILLS_BYTES_READ,
+
+ /**
+ * Actual number of unnecessary spills. (lac of adequate memory)
+ * Used by OnFileSortedOutput
+ */
+ ADDITIONAL_SPILL_COUNT,
+
+ INPUT_GROUPS, // Not used at the moment. Will eventually replace REDUCE_INPUT_GROUPS
+
+ /**
+ * Amount of physical data moved over the wire. Used by Shuffled*Input. Should
+ * be a combination of SHUFFLE_BYTES_TO_MEM and SHUFFLE_BYTES_TO_DISK
+ */
SHUFFLE_BYTES,
- SHUFFLED_TASKS,
- MERGED_TASK_OUTPUTS,
-}
+
+ /**
+ * Uncompressed size of the data being processed by the relevant Shuffle.
+ * Includes serialization, file format etc overheads.
+ */
+ SHUFFLE_BYTES_DECOMPRESSED,
+
+ /**
+ * Number of bytes which were shuffled directly to memory.
+ */
+ SHUFFLE_BYTES_TO_MEM,
+
+ /**
+ * Number of bytes which were shuffled directly to disk
+ */
+ SHUFFLE_BYTES_TO_DISK,
+
+ /**
+ * Number of Memory to Disk merges performed during sort-merge.
+ * Used by ShuffledMergedInput
+ */
+ NUM_MEM_TO_DISK_MERGES,
+
+ /**
+ * Number of disk to disk merges performed during the sort-merge
+ */
+ NUM_DISK_TO_DISK_MERGES,
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 34b5527..1ac0295 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.serializer.Deserializer;
@@ -44,7 +43,6 @@ import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
@@ -52,7 +50,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.mapreduce.common.Utils;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
@@ -111,8 +108,7 @@ public class MRInput implements LogicalInput {
protected TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
private TezCounter inputRecordCounter;
- private TezCounter fileInputByteCounter;
- private List<Statistics> fsStats;
+ // Potential counters - #splits, #totalSize, #actualyBytesRead
@Private
volatile boolean splitInfoViaEvents;
@@ -148,9 +144,9 @@ public class MRInput implements LogicalInput {
// TODO NEWTEZ Rename this to be specific to MRInput. This Input, in
// theory, can be used by the MapProcessor, ReduceProcessor or a custom
// processor. (The processor could provide the counter though)
- this.inputRecordCounter = inputContext.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS);
- this.fileInputByteCounter = inputContext.getCounters().findCounter(FileInputFormatCounter.BYTES_READ);
-
+
+ this.inputRecordCounter = inputContext.getCounters().findCounter(TaskCounter.INPUT_RECORDS_PROCESSED);
+
useNewApi = this.jobConf.getUseNewMapper();
this.splitInfoViaEvents = jobConf.getBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS_DEFAULT);
@@ -206,17 +202,8 @@ public class MRInput implements LogicalInput {
private void setupOldRecordReader() throws IOException {
Preconditions.checkNotNull(oldInputSplit, "Input split hasn't yet been setup");
- List<Statistics> matchedStats = null;
- if (oldInputSplit instanceof FileSplit) {
- matchedStats = Utils.getFsStatistics(((FileSplit) oldInputSplit).getPath(), this.jobConf);
- }
- fsStats = matchedStats;
-
- long bytesInPrev = getInputBytes();
oldRecordReader = oldInputFormat.getRecordReader(oldInputSplit,
this.jobConf, new MRReporter(inputContext, oldInputSplit));
- long bytesInCurr = getInputBytes();
- fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
setIncrementalConfigParams(oldInputSplit);
}
@@ -233,15 +220,7 @@ public class MRInput implements LogicalInput {
}
private void setupNewRecordReader() throws IOException {
- Preconditions.checkNotNull(newInputSplit, "Input split hasn't yet been setup");
- List<Statistics> matchedStats = null;
- if (newInputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
- matchedStats = Utils.getFsStatistics(
- ((org.apache.hadoop.mapreduce.lib.input.FileSplit)
- newInputSplit).getPath(), this.jobConf);
- }
- fsStats = matchedStats;
-
+ Preconditions.checkNotNull(newInputSplit, "Input split hasn't yet been setup");
try {
newRecordReader = newInputFormat.createRecordReader(newInputSplit, taskAttemptContext);
newRecordReader.initialize(newInputSplit, taskAttemptContext);
@@ -295,15 +274,11 @@ public class MRInput implements LogicalInput {
@Override
public List<Event> close() throws IOException {
- long bytesInPrev = getInputBytes();
if (useNewApi) {
newRecordReader.close();
} else {
oldRecordReader.close();
}
- long bytesInCurr = getInputBytes();
- fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
-
return null;
}
@@ -493,15 +468,6 @@ public class MRInput implements LogicalInput {
LOG.info("Processing split: " + inputSplit);
}
- private long getInputBytes() {
- if (fsStats == null) return 0;
- long bytesRead = 0;
- for (Statistics stat: fsStats) {
- bytesRead = bytesRead + stat.getBytesRead();
- }
- return bytesRead;
- }
-
protected TaskSplitMetaInfo[] readSplits(Configuration conf)
throws IOException {
TaskSplitMetaInfo[] allTaskSplitMetaInfo;
@@ -533,7 +499,6 @@ public class MRInput implements LogicalInput {
@Override
public boolean next() throws IOException {
boolean hasNext = false;
- long bytesInPrev = getInputBytes();
if (localNewApi) {
try {
hasNext = newRecordReader.nextKeyValue();
@@ -544,9 +509,6 @@ public class MRInput implements LogicalInput {
} else {
hasNext = oldRecordReader.next(key, value);
}
- long bytesInCurr = getInputBytes();
- fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
-
if (hasNext) {
inputRecordCounter.increment(1);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 25a1e0f..2ecf602 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -28,7 +28,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
@@ -37,13 +36,11 @@ import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.mapreduce.common.Utils;
import org.apache.tez.mapreduce.hadoop.MRConfig;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
@@ -77,8 +74,6 @@ public class MROutput implements LogicalOutput {
org.apache.hadoop.mapred.RecordWriter oldRecordWriter;
private TezCounter outputRecordCounter;
- private TezCounter fileOutputByteCounter;
- private List<Statistics> fsStats;
private TaskAttemptContext newApiTaskAttemptContext;
private org.apache.hadoop.mapred.TaskAttemptContext oldApiTaskAttemptContext;
@@ -123,10 +118,7 @@ public class MROutput implements LogicalOutput {
}
}
- outputRecordCounter = outputContext.getCounters().findCounter(
- TaskCounter.MAP_OUTPUT_RECORDS);
- fileOutputByteCounter = outputContext.getCounters().findCounter(
- FileOutputFormatCounter.BYTES_WRITTEN);
+ outputRecordCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);
if (useNewApi) {
newApiTaskAttemptContext = createTaskAttemptContext(taskAttemptId);
@@ -138,26 +130,12 @@ public class MROutput implements LogicalOutput {
throw new IOException(cnfe);
}
- List<Statistics> matchedStats = null;
- if (newOutputFormat instanceof
- org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
- matchedStats =
- Utils.getFsStatistics(
- org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
- .getOutputPath(newApiTaskAttemptContext),
- jobConf);
- }
- fsStats = matchedStats;
-
- long bytesOutPrev = getOutputBytes();
try {
newRecordWriter =
newOutputFormat.getRecordWriter(newApiTaskAttemptContext);
} catch (InterruptedException e) {
throw new IOException("Interrupted while creating record writer", e);
}
- long bytesOutCurr = getOutputBytes();
- fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
} else {
oldApiTaskAttemptContext =
new org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl(
@@ -165,26 +143,12 @@ public class MROutput implements LogicalOutput {
new MRTaskReporter(outputContext));
oldOutputFormat = jobConf.getOutputFormat();
- List<Statistics> matchedStats = null;
- if (oldOutputFormat
- instanceof org.apache.hadoop.mapred.FileOutputFormat) {
- matchedStats =
- Utils.getFsStatistics(
- org.apache.hadoop.mapred.FileOutputFormat.getOutputPath(
- jobConf),
- jobConf);
- }
- fsStats = matchedStats;
-
FileSystem fs = FileSystem.get(jobConf);
String finalName = getOutputName();
- long bytesOutPrev = getOutputBytes();
oldRecordWriter =
oldOutputFormat.getRecordWriter(
fs, jobConf, finalName, new MRReporter(outputContext));
- long bytesOutCurr = getOutputBytes();
- fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
}
initCommitter(jobConf, useNewApi);
@@ -248,15 +212,6 @@ public class MROutput implements LogicalOutput {
isMapperOutput, null);
}
- private long getOutputBytes() {
- if (fsStats == null) return 0;
- long bytesWritten = 0;
- for (Statistics stat: fsStats) {
- bytesWritten = bytesWritten + stat.getBytesWritten();
- }
- return bytesWritten;
- }
-
private String getOutputFileNamePrefix() {
String prefix = jobConf.get(MRJobConfig.MROUTPUT_FILE_NAME_PREFIX);
if (prefix == null) {
@@ -281,7 +236,6 @@ public class MROutput implements LogicalOutput {
@SuppressWarnings("unchecked")
@Override
public void write(Object key, Object value) throws IOException {
- long bytesOutPrev = getOutputBytes();
if (useNewWriter) {
try {
newRecordWriter.write(key, value);
@@ -292,9 +246,6 @@ public class MROutput implements LogicalOutput {
} else {
oldRecordWriter.write(key, value);
}
-
- long bytesOutCurr = getOutputBytes();
- fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
outputRecordCounter.increment(1);
}
};
@@ -312,7 +263,6 @@ public class MROutput implements LogicalOutput {
}
LOG.info("Closing Simple Output");
- long bytesOutPrev = getOutputBytes();
if (useNewApi) {
try {
newRecordWriter.close(newApiTaskAttemptContext);
@@ -322,8 +272,6 @@ public class MROutput implements LogicalOutput {
} else {
oldRecordWriter.close(null);
}
- long bytesOutCurr = getOutputBytes();
- fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
LOG.info("Closed Simple Output");
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index a5fda8c..9ab64ba 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -34,8 +34,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
@@ -57,14 +57,13 @@ import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezTaskStatus.State;
import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.TokenCache;
@@ -583,10 +582,6 @@ public abstract class MRTask {
}
}
- public abstract TezCounter getOutputRecordsCounter();
-
- public abstract TezCounter getInputRecordsCounter();
-
public org.apache.hadoop.mapreduce.TaskAttemptContext getTaskAttemptContext() {
return taskAttemptContext;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index b90cd11..27b52b2 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -36,8 +36,6 @@ import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.mapreduce.hadoop.mapreduce.MapContextImpl;
import org.apache.tez.mapreduce.input.MRInput;
@@ -386,15 +384,4 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
super.localizeConfiguration(jobConf);
jobConf.setBoolean(JobContext.TASK_ISMAP, true);
}
-
- @Override
- public TezCounter getOutputRecordsCounter() {
- return processorContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS);
- }
-
- @Override
- public TezCounter getInputRecordsCounter() {
- return processorContext.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS);
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 41dff33..c5ade59 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.mapreduce.output.MROutputLegacy;
import org.apache.tez.mapreduce.processor.MRTask;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
@@ -355,14 +354,4 @@ public class ReduceProcessor extends MRTask implements LogicalIOProcessor {
jobConf.setBoolean(JobContext.TASK_ISMAP, false);
}
- @Override
- public TezCounter getOutputRecordsCounter() {
- return processorContext.getCounters().findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS);
- }
-
- @Override
- public TezCounter getInputRecordsCounter() {
- return processorContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
- }
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 744d680..a8aec1f 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -151,7 +151,7 @@ public class TestMapProcessor {
Path mapOutputFile = mapOutputs.getInputFile(new InputAttemptIdentifier(0, 0));
LOG.info("mapOutputFile = " + mapOutputFile);
IFile.Reader reader =
- new IFile.Reader(localFs, mapOutputFile, null, null, false, 0, -1);
+ new IFile.Reader(localFs, mapOutputFile, null, null, null, false, 0, -1);
LongWritable key = new LongWritable();
Text value = new Text();
DataInputBuffer keyBuf = new DataInputBuffer();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
index da74ebd..2354257 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryReader;
@@ -53,16 +54,21 @@ public class BroadcastKVReader<K, V> implements KeyValueReader {
private final int ifileReadAheadLength;
private final int ifileBufferSize;
+ private final TezCounter inputRecordCounter;
+
private K key;
private V value;
private FetchedInput currentFetchedInput;
private IFile.Reader currentReader;
+ // TODO Remove this once per I/O counters are separated properly. Relying on
+ // the counter at the moment will generate aggregate numbers.
private int numRecordsRead = 0;
public BroadcastKVReader(BroadcastShuffleManager shuffleManager, Configuration conf,
- CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int ifileBufferSize)
+ CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int ifileBufferSize,
+ TezCounter inputRecordCounter)
throws IOException {
this.shuffleManager = shuffleManager;
@@ -70,6 +76,7 @@ public class BroadcastKVReader<K, V> implements KeyValueReader {
this.ifileReadAhead = ifileReadAhead;
this.ifileReadAheadLength = ifileReadAheadLength;
this.ifileBufferSize = ifileBufferSize;
+ this.inputRecordCounter = inputRecordCounter;
this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
@@ -98,12 +105,14 @@ public class BroadcastKVReader<K, V> implements KeyValueReader {
@Override
public boolean next() throws IOException {
if (readNextFromCurrentReader()) {
+ inputRecordCounter.increment(1);
numRecordsRead++;
return true;
} else {
boolean nextInputExists = moveToNextInput();
while (nextInputExists) {
if(readNextFromCurrentReader()) {
+ inputRecordCounter.increment(1);
numRecordsRead++;
return true;
}
@@ -181,7 +190,7 @@ public class BroadcastKVReader<K, V> implements KeyValueReader {
mfi.getBytes(), 0, (int) mfi.getActualSize());
} else {
return new IFile.Reader(fetchedInput.getInputStream(),
- fetchedInput.getCompressedSize(), codec, null, ifileReadAhead,
+ fetchedInput.getCompressedSize(), codec, null, null, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index 776f186..ca58396 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
@@ -58,6 +60,7 @@ import org.apache.tez.runtime.library.common.InputIdentifier;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.shuffle.common.FetchResult;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
import org.apache.tez.runtime.library.shuffle.common.Fetcher;
import org.apache.tez.runtime.library.shuffle.common.Fetcher.FetcherBuilder;
@@ -129,7 +132,14 @@ public class BroadcastShuffleManager implements FetcherCallback, MemoryUpdateCal
private volatile long initialMemoryAvailable = -1l;
- // TODO NEWTEZ Add counters.
+ private final TezCounter shuffledInputsCounter;
+ private final TezCounter failedShufflesCounter;
+ private final TezCounter bytesShuffledCounter;
+ private final TezCounter decompressedDataSizeCounter;
+ private final TezCounter bytesShuffledToDiskCounter;
+ private final TezCounter bytesShuffledToMemCounter;
+
+ // TODO More counters - FetchErrors, speed?
public BroadcastShuffleManager(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
this.inputContext = inputContext;
@@ -137,6 +147,13 @@ public class BroadcastShuffleManager implements FetcherCallback, MemoryUpdateCal
this.numInputs = numInputs;
long initalMemReq = getInitialMemoryReq();
this.inputContext.requestInitialMemory(initalMemReq, this);
+
+ this.shuffledInputsCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
+ this.failedShufflesCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
+ this.bytesShuffledCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
+ this.decompressedDataSizeCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
+ this.bytesShuffledToDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_DISK);
+ this.bytesShuffledToMemCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);
}
private void configureAndStart() throws IOException {
@@ -459,9 +476,9 @@ public class BroadcastShuffleManager implements FetcherCallback, MemoryUpdateCal
/////////////////// Methods from FetcherCallbackHandler
@Override
- public void fetchSucceeded(String host,
- InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput, long fetchedBytes,
- long copyDuration) throws IOException {
+ public void fetchSucceeded(String host, InputAttemptIdentifier srcAttemptIdentifier,
+ FetchedInput fetchedInput, long fetchedBytes, long decompressedLength, long copyDuration)
+ throws IOException {
InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
LOG.info("Completed fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType());
@@ -480,6 +497,19 @@ public class BroadcastShuffleManager implements FetcherCallback, MemoryUpdateCal
if (!completedInputSet.contains(inputIdentifier)) {
fetchedInput.commit();
committed = true;
+
+ // Processing counters for completed and commit fetches only. Need
+ // additional counters for excessive fetches - which primarily comes
+ // in after speculation or retries.
+ shuffledInputsCounter.increment(1);
+ bytesShuffledCounter.increment(fetchedBytes);
+ if (fetchedInput.getType() == Type.MEMORY) {
+ bytesShuffledToMemCounter.increment(fetchedBytes);
+ } else {
+ bytesShuffledToDiskCounter.increment(fetchedBytes);
+ }
+ decompressedDataSizeCounter.increment(decompressedLength);
+
registerCompletedInput(fetchedInput);
}
}
@@ -506,6 +536,7 @@ public class BroadcastShuffleManager implements FetcherCallback, MemoryUpdateCal
LOG.info("Fetch failed for src: " + srcAttemptIdentifier
+ "InputIdentifier: " + srcAttemptIdentifier + ", connectFailed: "
+ connectFailed);
+ failedShufflesCounter.increment(1);
if (srcAttemptIdentifier == null) {
String message = "Received fetchFailure for an unknown src (null)";
LOG.fatal(message);
@@ -602,8 +633,9 @@ public class BroadcastShuffleManager implements FetcherCallback, MemoryUpdateCal
/////////////////// End of methods for walking the available inputs
@SuppressWarnings("rawtypes")
- public BroadcastKVReader createReader() throws IOException {
- return new BroadcastKVReader(this, conf, codec, ifileReadAhead, ifileReadAheadLength, ifileBufferSize);
+ public BroadcastKVReader createReader(TezCounter inputRecordCounter) throws IOException {
+ return new BroadcastKVReader(this, conf, codec, ifileReadAhead, ifileReadAheadLength,
+ ifileBufferSize, inputRecordCounter);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
index 7071b87..c5e6cc1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.runtime.api.TezOutputContext;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.common.ConfigUtils;
@@ -65,6 +67,16 @@ public class FileBasedKVWriter implements KeyValueWriter {
private TezTaskOutput ouputFileManager;
private boolean closed = false;
+ // Number of output key-value pairs
+ private final TezCounter outputRecordsCounter;
+ // Number of bytes of actual output - uncompressed.
+ private final TezCounter outputBytesCounter;
+ // Size of the data with additional meta-data
+ private final TezCounter outputBytesCounterWithOverhead;
+ // Actual physical size of the data on disk.
+ private final TezCounter outputMaterializedBytesCounter;
+
+
// TODO NEWTEZ Define Counters
// Number of records
// Time waiting for a write to complete, if that's possible.
@@ -73,6 +85,11 @@ public class FileBasedKVWriter implements KeyValueWriter {
public FileBasedKVWriter(TezOutputContext outputContext, Configuration conf) throws IOException {
this.conf = conf;
+ this.outputRecordsCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);
+ this.outputBytesCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
+ this.outputBytesCounterWithOverhead = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
+ this.outputMaterializedBytesCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
+
this.rfs = ((LocalFileSystem) FileSystem.getLocal(this.conf)).getRaw();
// Setup serialization
@@ -103,8 +120,11 @@ public class FileBasedKVWriter implements KeyValueWriter {
public boolean close() throws IOException {
this.closed = true;
this.writer.close();
- TezIndexRecord rec = new TezIndexRecord(0, writer.getRawLength(),
- writer.getCompressedLength());
+ long rawLen = writer.getRawLength();
+ long compLen = writer.getCompressedLength();
+ outputBytesCounterWithOverhead.increment(rawLen);
+ outputMaterializedBytesCounter.increment(compLen);
+ TezIndexRecord rec = new TezIndexRecord(0, rawLen, compLen);
TezSpillRecord sr = new TezSpillRecord(1);
sr.putIndex(rec, 0);
@@ -118,6 +138,7 @@ public class FileBasedKVWriter implements KeyValueWriter {
@Override
public void write(Object key, Object value) throws IOException {
this.writer.append(key, value);
+ this.outputRecordsCounter.increment(1);
numRecords++;
}
@@ -131,7 +152,7 @@ public class FileBasedKVWriter implements KeyValueWriter {
// TODO NEWTEZ maybe use appropriate counter
this.writer = new IFile.Writer(conf, rfs, outputPath, keyClass, valClass,
- codec, null);
+ codec, null, outputBytesCounter);
}
public long getRawLength() {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
index fef3356..ab1cf7f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
@@ -39,6 +39,7 @@ import com.google.common.base.Preconditions;
* lead to corrupt data.
*
*/
+
public class ValuesIterator<KEY,VALUE> {
protected TezRawKeyValueIterator in; //input iterator
private KEY key; // current key
@@ -176,6 +177,7 @@ public class ValuesIterator<KEY,VALUE> {
DataInputBuffer nextKeyBytes = in.getKey();
keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength());
nextKey = keyDeserializer.deserialize(nextKey);
+ // TODO Is a counter increment required here ?
hasMoreValues = key != null && (comparator.compare(key, nextKey) == 0);
} else {
hasMoreValues = false;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java
index 36723b0..fe05410 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java
@@ -121,7 +121,7 @@ public class LocalShuffle {
sortFactor,
new Path(inputContext.getUniqueIdentifier()), // TODO NEWTEZ This is likely broken
comparator,
- null, spilledRecordsCounter, null, null);
+ null, spilledRecordsCounter, null, null, null);
}
private Path[] getMapFiles()
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index 4fd9b53..97b57f3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -372,6 +372,7 @@ class Fetcher extends Thread {
//Read the shuffle header
try {
ShuffleHeader header = new ShuffleHeader();
+ // TODO Review: Multiple header reads in case of status WAIT ?
header.readFields(input);
if (!header.mapId.startsWith(InputAttemptIdentifier.PATH_PREFIX)) {
throw new IllegalArgumentException(
@@ -412,6 +413,7 @@ class Fetcher extends Thread {
// Check if we can shuffle *now* ...
if (mapOutput.getType() == Type.WAIT) {
+ // TODO Review: Does this cause a tight loop ?
LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
//Not an error but wait to process data.
return EMPTY_ATTEMPT_ID_ARRAY;
@@ -431,7 +433,7 @@ class Fetcher extends Thread {
// Inform the shuffle scheduler
long endTime = System.currentTimeMillis();
- scheduler.copySucceeded(srcAttemptId, host, compressedLength,
+ scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength,
endTime - startTime, mapOutput);
// Note successful shuffle
remaining.remove(srcAttemptId);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java
index 479d704..9ed90d6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java
@@ -45,7 +45,7 @@ public class InMemoryReader extends Reader {
public InMemoryReader(MergeManager merger, InputAttemptIdentifier taskAttemptId,
byte[] data, int start, int length)
throws IOException {
- super(null, length - start, null, null, false, 0, -1);
+ super(null, length - start, null,null, null, false, 0, -1);
this.merger = merger;
this.taskAttemptId = taskAttemptId;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryWriter.java
index f81b28e..a9a86ff 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryWriter.java
@@ -38,8 +38,10 @@ public class InMemoryWriter extends Writer {
private DataOutputStream out;
+ // TODO Verify and fix counters if required.
+
public InMemoryWriter(BoundedByteArrayOutputStream arrayStream) {
- super(null);
+ super(null, null);
this.out =
new DataOutputStream(new IFileOutputStream(arrayStream));
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
index 2fb6b08..b9c5fba 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.TezInputContext;
@@ -114,6 +115,11 @@ public class MergeManager {
private final TezCounter mergedMapOutputsCounter;
+ private final TezCounter numMemToDiskMerges;
+ private final TezCounter numDiskToDiskMerges;
+ private final TezCounter additionalBytesWritten;
+ private final TezCounter additionalBytesRead;
+
private CompressionCodec codec;
private volatile boolean finalMergeComplete = false;
@@ -149,6 +155,11 @@ public class MergeManager {
this.localFS = localFS;
this.rfs = ((LocalFileSystem)localFS).getRaw();
+
+ this.numDiskToDiskMerges = inputContext.getCounters().findCounter(TaskCounter.NUM_DISK_TO_DISK_MERGES);
+ this.numMemToDiskMerges = inputContext.getCounters().findCounter(TaskCounter.NUM_MEM_TO_DISK_MERGES);
+ this.additionalBytesWritten = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
+ this.additionalBytesRead = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
}
@@ -393,6 +404,8 @@ public class MergeManager {
synchronized (inMemoryMerger) {
// Can hang if mergeThreshold is really low.
+ // TODO Can avoid spilling in case total input size is between
+ // mergeTghreshold and total available size.
if (!inMemoryMerger.isInProgress() && commitMemory >= mergeThreshold) {
LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
commitMemory + " > mergeThreshold=" + mergeThreshold +
@@ -402,7 +415,8 @@ public class MergeManager {
inMemoryMerger.startMerge(inMemoryMapOutputs);
}
}
-
+
+ // This should likely run a Combiner.
if (memToMemMerger != null) {
synchronized (memToMemMerger) {
if (!memToMemMerger.isInProgress() &&
@@ -466,6 +480,9 @@ public class MergeManager {
combiner.combine(kvIter, writer);
}
+ /**
+ * Merges multiple in-memory segment to another in-memory segment
+ */
private class IntermediateMemoryToMemoryMerger
extends MergeThread<MapOutput> {
@@ -494,10 +511,13 @@ public class MergeManager {
Writer writer =
new InMemoryWriter(mergedMapOutputs.getArrayStream());
-
+
LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
" segments of total-size: " + mergeOutputSize);
+ // Nothing will be materialized to disk because the sort factor is being
+ // set to the number of in memory segments.
+ // TODO Is this doing any combination ?
TezRawKeyValueIterator rIter =
TezMerger.merge(conf, rfs,
ConfigUtils.getIntermediateInputKeyClass(conf),
@@ -505,7 +525,7 @@ public class MergeManager {
inMemorySegments, inMemorySegments.size(),
new Path(inputContext.getUniqueIdentifier()),
(RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
- nullProgressable, null, null, null);
+ nullProgressable, null, null, null, null);
TezMerger.writeFile(rIter, writer, nullProgressable, TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
writer.close();
@@ -518,6 +538,9 @@ public class MergeManager {
}
}
+ /**
+ * Merges multiple in-memory segment to a disk segment
+ */
private class InMemoryMerger extends MergeThread<MapOutput> {
public InMemoryMerger(MergeManager manager) {
@@ -533,6 +556,8 @@ public class MergeManager {
return;
}
+ numMemToDiskMerges.increment(1);
+
//name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to
//be absent on the disk currently. So we don't overwrite a prev.
@@ -549,6 +574,10 @@ public class MergeManager {
createInMemorySegments(inputs, inMemorySegments,0);
int noInMemorySegments = inMemorySegments.size();
+ // TODO Maybe track serialized vs deserialized bytes.
+
+ // All disk writes done by this merge are overhead - due to the lac of
+ // adequate memory to keep all segments in memory.
Path outputPath = mapOutputFile.getInputFileForWrite(
srcTaskIdentifier.getInputIdentifier().getInputIndex(),
mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX);
@@ -559,26 +588,32 @@ public class MergeManager {
new Writer(conf, rfs, outputPath,
(Class)ConfigUtils.getIntermediateInputKeyClass(conf),
(Class)ConfigUtils.getIntermediateInputValueClass(conf),
- codec, null);
+ codec, null, null);
TezRawKeyValueIterator rIter = null;
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments...");
+ // Nothing actually materialized to disk - controlled by setting sort-factor to #segments.
rIter = TezMerger.merge(conf, rfs,
(Class)ConfigUtils.getIntermediateInputKeyClass(conf),
(Class)ConfigUtils.getIntermediateInputValueClass(conf),
inMemorySegments, inMemorySegments.size(),
new Path(inputContext.getUniqueIdentifier()),
(RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
- nullProgressable, spilledRecordsCounter, null, null);
+ nullProgressable, spilledRecordsCounter, null, additionalBytesRead, null);
+ // spilledRecordsCounter is tracking the number of keys that will be
+ // read from each of the segments being merged - which is essentially
+ // what will be written to disk.
if (null == combiner) {
TezMerger.writeFile(rIter, writer, nullProgressable, TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
} else {
+ // TODO Counters for Combine
runCombineProcessor(rIter, writer);
}
writer.close();
+ additionalBytesWritten.increment(writer.getCompressedLength());
writer = null;
LOG.info(inputContext.getUniqueIdentifier() +
@@ -602,7 +637,10 @@ public class MergeManager {
}
}
-
+
+ /**
+ * Merges multiple on-disk segments
+ */
private class OnDiskMerger extends MergeThread<Path> {
public OnDiskMerger(MergeManager manager) {
@@ -618,6 +656,7 @@ public class MergeManager {
LOG.info("No ondisk files to merge...");
return;
}
+ numDiskToDiskMerges.increment(1);
long approxOutputSize = 0;
int bytesPerSum =
@@ -643,7 +682,7 @@ public class MergeManager {
new Writer(conf, rfs, outputPath,
(Class)ConfigUtils.getIntermediateInputKeyClass(conf),
(Class)ConfigUtils.getIntermediateInputValueClass(conf),
- codec, null);
+ codec, null, null);
TezRawKeyValueIterator iter = null;
Path tmpDir = new Path(inputContext.getUniqueIdentifier());
try {
@@ -656,8 +695,12 @@ public class MergeManager {
nullProgressable, spilledRecordsCounter, null,
mergedMapOutputsCounter, null);
+ // TODO Maybe differentiate between data written because of Merges and
+ // the finalMerge (i.e. final mem available may be different from
+ // initial merge mem)
TezMerger.writeFile(iter, writer, nullProgressable, TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
writer.close();
+ additionalBytesWritten.increment(writer.getCompressedLength());
} catch (IOException e) {
localFS.delete(outputPath, true);
throw e;
@@ -707,7 +750,7 @@ public class MergeManager {
public RawKVIteratorReader(TezRawKeyValueIterator kvIter, long size)
throws IOException {
- super(null, size, null, spilledRecordsCounter, ifileReadAhead,
+ super(null, size, null, spilledRecordsCounter, null, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize);
this.kvIter = kvIter;
}
@@ -782,11 +825,11 @@ public class MergeManager {
mapOutputFile.getInputFileForWrite(srcTaskId,
inMemToDiskBytes).suffix(
Constants.MERGED_OUTPUT_PREFIX);
- final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs,
- keyClass, valueClass, memDiskSegments, numMemDiskSegments,
- tmpDir, comparator, nullProgressable, spilledRecordsCounter, null, null);
+ final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs, keyClass, valueClass,
+ memDiskSegments, numMemDiskSegments, tmpDir, comparator, nullProgressable,
+ spilledRecordsCounter, null, additionalBytesRead, null);
final Writer writer = new Writer(job, fs, outputPath,
- keyClass, valueClass, codec, null);
+ keyClass, valueClass, codec, null, null);
try {
TezMerger.writeFile(rIter, writer, nullProgressable, TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
// add to list of final disk outputs.
@@ -803,6 +846,7 @@ public class MergeManager {
} finally {
if (null != writer) {
writer.close();
+ additionalBytesWritten.increment(writer.getCompressedLength());
}
}
LOG.info("Merged " + numMemDiskSegments + " segments, " +
@@ -856,18 +900,18 @@ public class MergeManager {
TezRawKeyValueIterator diskMerge = TezMerger.merge(
job, fs, keyClass, valueClass, diskSegments,
ioSortFactor, numInMemSegments, tmpDir, comparator,
- nullProgressable, false, spilledRecordsCounter, null, null);
+ nullProgressable, false, spilledRecordsCounter, null, additionalBytesRead, null);
diskSegments.clear();
if (0 == finalSegments.size()) {
return diskMerge;
}
finalSegments.add(new Segment(
new RawKVIteratorReader(diskMerge, onDiskBytes), true));
- }
+ }
+ // This is doing nothing but creating an iterator over the segments.
return TezMerger.merge(job, fs, keyClass, valueClass,
finalSegments, finalSegments.size(), tmpDir,
comparator, nullProgressable, spilledRecordsCounter, null,
- null);
-
+ additionalBytesRead, null);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
index 8653b44..583e1a1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
@@ -129,18 +129,24 @@ public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
// TODO TEZ Get rid of Map / Reduce references.
- TezCounter shuffledMapsCounter =
- inputContext.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS);
+ TezCounter shuffledInputsCounter =
+ inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
TezCounter reduceShuffleBytes =
- inputContext.getCounters().findCounter(TaskCounter.REDUCE_SHUFFLE_BYTES);
+ inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
+ TezCounter reduceDataSizeDecompressed = inputContext.getCounters().findCounter(
+ TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
TezCounter failedShuffleCounter =
- inputContext.getCounters().findCounter(TaskCounter.FAILED_SHUFFLE);
+ inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
TezCounter spilledRecordsCounter =
inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
TezCounter reduceCombineInputCounter =
inputContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
TezCounter mergedMapOutputsCounter =
inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
+ TezCounter bytesShuffedToDisk = inputContext.getCounters().findCounter(
+ TaskCounter.SHUFFLE_BYTES_TO_DISK);
+ TezCounter bytesShuffedToMem = inputContext.getCounters().findCounter(
+ TaskCounter.SHUFFLE_BYTES_TO_MEM);
LOG.info("Shuffle assigned with " + numInputs + " inputs" + ", codec: "
+ (codec == null ? "None" : codec.getClass().getName()) +
@@ -151,9 +157,12 @@ public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
this.conf,
this.numInputs,
this,
- shuffledMapsCounter,
+ shuffledInputsCounter,
reduceShuffleBytes,
- failedShuffleCounter);
+ reduceDataSizeDecompressed,
+ failedShuffleCounter,
+ bytesShuffedToDisk,
+ bytesShuffedToMem);
eventHandler= new ShuffleInputEventHandler(
inputContext,
scheduler);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index 9106f95..b33b838 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -45,6 +45,7 @@ import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.apache.tez.runtime.library.common.shuffle.impl.MapOutput.Type;
import com.google.common.collect.Lists;
@@ -82,7 +83,10 @@ class ShuffleScheduler {
private final int abortFailureLimit;
private final TezCounter shuffledMapsCounter;
private final TezCounter reduceShuffleBytes;
+ private final TezCounter reduceBytesDecompressed;
private final TezCounter failedShuffleCounter;
+ private final TezCounter bytesShuffledToDisk;
+ private final TezCounter bytesShuffledToMem;
private final long startTime;
private long lastProgressTime;
@@ -102,7 +106,10 @@ class ShuffleScheduler {
Shuffle shuffle,
TezCounter shuffledMapsCounter,
TezCounter reduceShuffleBytes,
- TezCounter failedShuffleCounter) {
+ TezCounter reduceBytesDecompressed,
+ TezCounter failedShuffleCounter,
+ TezCounter bytesShuffledToDisk,
+ TezCounter bytesShuffledToMem) {
this.inputContext = inputContext;
this.numInputs = numberOfInputs;
abortFailureLimit = Math.max(30, numberOfInputs / 10);
@@ -111,7 +118,10 @@ class ShuffleScheduler {
this.shuffle = shuffle;
this.shuffledMapsCounter = shuffledMapsCounter;
this.reduceShuffleBytes = reduceShuffleBytes;
+ this.reduceBytesDecompressed = reduceBytesDecompressed;
this.failedShuffleCounter = failedShuffleCounter;
+ this.bytesShuffledToDisk = bytesShuffledToDisk;
+ this.bytesShuffledToMem = bytesShuffledToMem;
this.startTime = System.currentTimeMillis();
this.lastProgressTime = startTime;
this.maxFailedUniqueFetches = Math.min(numberOfInputs,
@@ -129,7 +139,8 @@ class ShuffleScheduler {
public synchronized void copySucceeded(InputAttemptIdentifier srcAttemptIdentifier,
MapHost host,
- long bytes,
+ long bytesCompressed,
+ long bytesDecompressed,
long milis,
MapOutput output
) throws IOException {
@@ -147,9 +158,15 @@ class ShuffleScheduler {
// update the status
lastProgressTime = System.currentTimeMillis();
- totalBytesShuffledTillNow += bytes;
+ totalBytesShuffledTillNow += bytesCompressed;
logProgress();
- reduceShuffleBytes.increment(bytes);
+ reduceShuffleBytes.increment(bytesCompressed);
+ reduceBytesDecompressed.increment(bytesDecompressed);
+ if (output.getType() == Type.DISK) {
+ bytesShuffledToDisk.increment(bytesCompressed);
+ } else {
+ bytesShuffledToMem.increment(bytesCompressed);
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("src task: "
+ TezRuntimeUtils.getTaskAttemptIdentifier(
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index 3e3b25f..ab8a869 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -94,11 +94,31 @@ public abstract class ExternalSorter implements MemoryUpdateCallback {
protected CompressionCodec codec;
// Counters
- // TODO TEZ Rename all counter variables [Mapping of counter to MR for compatibility in the MR layer]
+ // MR compatilbity layer needs to rename counters back to what MR requries.
+
+ // Represents final deserialized size of output (spills are not counted)
protected TezCounter mapOutputByteCounter;
+ // Represents final number of records written (spills are not counted)
protected TezCounter mapOutputRecordCounter;
+ // Represents the size of the final output - with any overheads introduced by
+ // the storage/serialization mechanism. This is an uncompressed data size.
+ protected TezCounter outputBytesWithOverheadCounter;
+ // Represents the size of the final output - which will be transmitted over
+ // the wire (spills are not counted). Factors in compression if it is enabled.
protected TezCounter fileOutputByteCounter;
+ // Represents total number of records written to disk (includes spills. Min
+ // value for this is equal to number of output records)
protected TezCounter spilledRecordsCounter;
+ // Bytes written as a result of additional spills. The single spill for the
+ // final output data is not considered. (This will be 0 if there's no
+ // additional spills. Compressed size - so may not represent the size in the
+ // sort buffer)
+ protected TezCounter additionalSpillBytesWritten;
+
+ protected TezCounter additionalSpillBytesRead;
+ // Number of additional spills. (This will be 0 if there's no additional
+ // spills)
+ protected TezCounter numAdditionalSpills;
@Private
public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
@@ -131,15 +151,16 @@ public abstract class ExternalSorter implements MemoryUpdateCallback {
keySerializer = serializationFactory.getSerializer(keyClass);
valSerializer = serializationFactory.getSerializer(valClass);
- // counters
- mapOutputByteCounter =
- outputContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_BYTES);
- mapOutputRecordCounter =
- outputContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS);
- fileOutputByteCounter =
- outputContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
- spilledRecordsCounter =
- outputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
+ // counters
+ mapOutputByteCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
+ mapOutputRecordCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);
+ outputBytesWithOverheadCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
+ fileOutputByteCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
+ spilledRecordsCounter = outputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
+ additionalSpillBytesWritten = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
+ additionalSpillBytesRead = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
+ numAdditionalSpills = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
+
// compression
if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
Class<? extends CompressionCodec> codecClass =
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index bdafdd0..e7545fc 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -85,6 +85,7 @@ public class IFile {
// Count records written to disk
private long numRecordsWritten = 0;
private final TezCounter writtenRecordsCounter;
+ private final TezCounter serializedUncompressedBytes;
IFileOutputStream checksumOut;
@@ -102,21 +103,24 @@ public class IFile {
public Writer(Configuration conf, FileSystem fs, Path file,
Class keyClass, Class valueClass,
CompressionCodec codec,
- TezCounter writesCounter) throws IOException {
+ TezCounter writesCounter,
+ TezCounter serializedBytesCounter) throws IOException {
this(conf, fs.create(file), keyClass, valueClass, codec,
- writesCounter);
+ writesCounter, serializedBytesCounter);
ownOutputStream = true;
}
- protected Writer(TezCounter writesCounter) {
+ protected Writer(TezCounter writesCounter, TezCounter serializedBytesCounter) {
writtenRecordsCounter = writesCounter;
+ serializedUncompressedBytes = serializedBytesCounter;
}
public Writer(Configuration conf, FSDataOutputStream out,
Class keyClass, Class valueClass,
- CompressionCodec codec, TezCounter writesCounter)
+ CompressionCodec codec, TezCounter writesCounter, TezCounter serializedBytesCounter)
throws IOException {
this.writtenRecordsCounter = writesCounter;
+ this.serializedUncompressedBytes = serializedBytesCounter;
this.checksumOut = new IFileOutputStream(out);
this.rawOut = out;
this.start = this.rawOut.getPos();
@@ -150,7 +154,7 @@ public class IFile {
public Writer(Configuration conf, FileSystem fs, Path file)
throws IOException {
- this(conf, fs, file, null, null, null, null);
+ this(conf, fs, file, null, null, null, null, null);
}
public void close() throws IOException {
@@ -237,7 +241,7 @@ public class IFile {
valueLength + " for " + value);
}
- if(rle && sameKey) {
+ if(rle && sameKey) {
WritableUtils.writeVInt(out, RLE_MARKER); // Same key as previous
WritableUtils.writeVInt(out, valueLength); // value length
out.write(buffer.getData(), keyLength, buffer.getLength()); // only the value
@@ -245,6 +249,9 @@ public class IFile {
decompressedBytesWritten += 0 + valueLength +
WritableUtils.getVIntSize(RLE_MARKER) +
WritableUtils.getVIntSize(valueLength);
+ if (serializedUncompressedBytes != null) {
+ serializedUncompressedBytes.increment(0 + valueLength);
+ }
} else {
// Write the record out
WritableUtils.writeVInt(out, keyLength); // key length
@@ -254,6 +261,9 @@ public class IFile {
decompressedBytesWritten += keyLength + valueLength +
WritableUtils.getVIntSize(keyLength) +
WritableUtils.getVIntSize(valueLength);
+ if (serializedUncompressedBytes != null) {
+ serializedUncompressedBytes.increment(keyLength + valueLength);
+ }
}
// Reset
@@ -292,6 +302,9 @@ public class IFile {
decompressedBytesWritten += 0 + valueLength
+ WritableUtils.getVIntSize(RLE_MARKER)
+ WritableUtils.getVIntSize(valueLength);
+ if (serializedUncompressedBytes != null) {
+ serializedUncompressedBytes.increment(0 + valueLength);
+ }
} else {
WritableUtils.writeVInt(out, keyLength);
WritableUtils.writeVInt(out, valueLength);
@@ -302,6 +315,9 @@ public class IFile {
decompressedBytesWritten += keyLength + valueLength
+ WritableUtils.getVIntSize(keyLength)
+ WritableUtils.getVIntSize(valueLength);
+ if (serializedUncompressedBytes != null) {
+ serializedUncompressedBytes.increment(keyLength + valueLength);
+ }
BufferUtils.copy(key, previous);
}
@@ -348,6 +364,7 @@ public class IFile {
// Count records read from disk
private long numRecordsRead = 0;
private final TezCounter readRecordsCounter;
+ private final TezCounter bytesReadCounter;
final InputStream in; // Possibly decompressed stream that we read
Decompressor decompressor;
@@ -366,6 +383,8 @@ public class IFile {
protected int currentValueLength;
byte keyBytes[] = new byte[0];
+ long startPos;
+
/**
* Construct an IFile Reader.
@@ -379,11 +398,11 @@ public class IFile {
*/
public Reader(FileSystem fs, Path file,
CompressionCodec codec,
- TezCounter readsCounter, boolean ifileReadAhead,
+ TezCounter readsCounter, TezCounter bytesReadCounter, boolean ifileReadAhead,
int ifileReadAheadLength, int bufferSize) throws IOException {
this(fs.open(file),
fs.getFileStatus(file).getLen(),
- codec, readsCounter, ifileReadAhead, ifileReadAheadLength, bufferSize);
+ codec, readsCounter, bytesReadCounter, ifileReadAhead, ifileReadAheadLength, bufferSize);
}
/**
@@ -398,10 +417,11 @@ public class IFile {
*/
public Reader(InputStream in, long length,
CompressionCodec codec,
- TezCounter readsCounter,
+ TezCounter readsCounter, TezCounter bytesReadCounter,
boolean readAhead, int readAheadLength,
int bufferSize) throws IOException {
readRecordsCounter = readsCounter;
+ this.bytesReadCounter = bytesReadCounter;
checksumIn = new IFileInputStream(in,length, readAhead, readAheadLength);
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
@@ -417,6 +437,8 @@ public class IFile {
this.dataIn = new DataInputStream(this.in);
this.fileLength = length;
+ startPos = checksumIn.getPosition();
+
if (bufferSize != -1) {
this.bufferSize = bufferSize;
}
@@ -537,7 +559,7 @@ public class IFile {
public void close() throws IOException {
// Close the underlying stream
in.close();
-
+
// Release the buffer
dataIn = null;
buffer = null;
@@ -545,6 +567,10 @@ public class IFile {
readRecordsCounter.increment(numRecordsRead);
}
+ if (bytesReadCounter != null) {
+ bytesReadCounter.increment(checksumIn.getPosition() - startPos + checksumIn.getSize());
+ }
+
// Return the decompressor
if (decompressor != null) {
decompressor.reset();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 6bfa098..e6d7d31 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -90,6 +90,8 @@ public class PipelinedSorter extends ExternalSorter {
private int totalIndexCacheMemory;
private int indexCacheMemoryLimit;
+ // TODO Set additional countesr - total bytes written, spills etc.
+
@Override
public void start() throws IOException {
@@ -266,7 +268,7 @@ public class PipelinedSorter extends ExternalSorter {
long segmentStart = out.getPos();
Writer writer =
new Writer(conf, out, keyClass, valClass, codec,
- spilledRecordsCounter);
+ spilledRecordsCounter, null);
writer.setRLE(merger.needsRLE());
if (combiner == null) {
while(kvIter.next()) {
@@ -370,14 +372,14 @@ public class PipelinedSorter extends ExternalSorter {
new Path(uniqueIdentifier),
(RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf),
nullProgressable, sortSegments,
- null, spilledRecordsCounter,
+ null, spilledRecordsCounter, null,
null); // Not using any Progress in TezMerger. Should just work.
//write merged output to disk
long segmentStart = finalOut.getPos();
Writer writer =
new Writer(conf, finalOut, keyClass, valClass, codec,
- spilledRecordsCounter);
+ spilledRecordsCounter, null);
writer.setRLE(merger.needsRLE());
if (combiner == null || numSpills < minSpillsForCombine) {
TezMerger.writeFile(kvIter, writer, nullProgressable, TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);