You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/05/29 01:45:55 UTC
tez git commit: TEZ-2198. Fix sorter spill counts (rbalamohan)
Repository: tez
Updated Branches:
refs/heads/master b9056657a -> 42b7756eb
TEZ-2198. Fix sorter spill counts (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/42b7756e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/42b7756e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/42b7756e
Branch: refs/heads/master
Commit: 42b7756ebea108f81dbf633c847e2f70cd099e0f
Parents: b905665
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Fri May 29 05:16:46 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Fri May 29 05:16:46 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/common/counters/TaskCounter.java | 11 ++-
.../common/sort/impl/ExternalSorter.java | 21 +++++-
.../common/sort/impl/PipelinedSorter.java | 54 +++++++++-----
.../common/sort/impl/dflt/DefaultSorter.java | 76 ++++++++++----------
.../common/sort/impl/TestPipelinedSorter.java | 58 ++++++++++++++-
.../sort/impl/dflt/TestDefaultSorter.java | 43 +++++++++--
.../apache/tez/test/TestPipelinedShuffle.java | 2 +-
8 files changed, 197 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/42b7756e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ece14a3..7f240c1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
TEZ-2468. Change the minimum Java version to Java 7.
ALL CHANGES:
+ TEZ-2198. Fix sorter spill counts.
TEZ-1883. Change findbugs version to 3.x.
TEZ-2440. Sorter should check for indexCacheList.size() in flush().
TEZ-2490. TEZ-2450 breaks Hadoop 2.2 and 2.4 compatability.
http://git-wip-us.apache.org/repos/asf/tez/blob/42b7756e/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
index 128b067..7dcdf8a 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
@@ -131,10 +131,19 @@ public enum TaskCounter {
ADDITIONAL_SPILLS_BYTES_READ,
/**
- * Actual number of unnecessary spills. (lac of adequate memory)
+ * Spills that were generated & read by the same task (unnecessary spills due to lac of
+ * adequate memory).
+ *
* Used by OnFileSortedOutput
*/
ADDITIONAL_SPILL_COUNT,
+
+ /**
+ * Number of spill files being offered via shuffle-handler.
+ * e.g Without pipelined shuffle, this would be 1. With pipelined shuffle, this could be many
+ * as final merge is avoided.
+ */
+ SHUFFLE_CHUNK_COUNT,
INPUT_GROUPS, // Not used at the moment. Will eventually replace REDUCE_INPUT_GROUPS
http://git-wip-us.apache.org/repos/asf/tez/blob/42b7756e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index 40d22fe..27fe37a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
import java.util.Iterator;
import java.util.Map;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.slf4j.Logger;
@@ -118,6 +119,9 @@ public abstract class ExternalSorter {
protected final boolean cleanup;
+ protected final boolean finalMergeEnabled;
+ protected final boolean sendEmptyPartitionDetails;
+
// Counters
// MR compatilbity layer needs to rename counters back to what MR requries.
@@ -141,9 +145,10 @@ public abstract class ExternalSorter {
protected final TezCounter additionalSpillBytesWritten;
protected final TezCounter additionalSpillBytesRead;
- // Number of additional spills. (This will be 0 if there's no additional
- // spills)
+ // Number of spills written & consumed by the same task to generate the final file
protected final TezCounter numAdditionalSpills;
+ // Number of files offered via shuffle-handler to consumers.
+ protected final TezCounter numShuffleChunks;
public ExternalSorter(OutputContext outputContext, Configuration conf, int numOutputs,
long initialMemoryAvailable) throws IOException {
@@ -187,6 +192,7 @@ public abstract class ExternalSorter {
additionalSpillBytesWritten = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
additionalSpillBytesRead = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
numAdditionalSpills = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
+ numShuffleChunks = outputContext.getCounters().findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT);
// compression
if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
@@ -235,6 +241,17 @@ public abstract class ExternalSorter {
this.conf.setInt(TezRuntimeFrameworkConfigs.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, this.partitions);
this.partitioner = TezRuntimeUtils.instantiatePartitioner(this.conf);
this.combiner = TezRuntimeUtils.instantiateCombiner(this.conf, outputContext);
+ this.finalMergeEnabled = conf.getBoolean(
+ TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT);
+ this.sendEmptyPartitionDetails = conf.getBoolean(
+ TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
+ TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);
+ }
+
+ @VisibleForTesting
+ public boolean isFinalMergeEnabled() {
+ return finalMergeEnabled;
}
/**
http://git-wip-us.apache.org/repos/asf/tez/blob/42b7756e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 9113fca..6e4d72e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -111,8 +111,6 @@ public class PipelinedSorter extends ExternalSorter {
private int indexCacheMemoryLimit;
private final boolean pipelinedShuffle;
- private final boolean finalMergeEnabled;
- private final boolean sendEmptyPartitionDetails;
// TODO Set additional countesr - total bytes written, spills etc.
@@ -127,20 +125,11 @@ public class PipelinedSorter extends ExternalSorter {
partitionBits = bitcount(partitions)+1;
- finalMergeEnabled = conf.getBoolean(
- TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT,
- TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT);
-
boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT);
- sendEmptyPartitionDetails = conf.getBoolean(
- TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
- TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);
-
-
- pipelinedShuffle = !finalMergeEnabled && confPipelinedShuffle;
+ pipelinedShuffle = !isFinalMergeEnabled() && confPipelinedShuffle;
//sanity checks
final long sortmb = this.availableMemoryMb;
@@ -157,7 +146,7 @@ public class PipelinedSorter extends ExternalSorter {
int numberOfBlocks = Math.max(1, (int) Math.ceil(1.0 * usage / blockSize));
LOG.info("Number of Blocks : " + numberOfBlocks
+ ", maxMemUsage=" + maxMemUsage + ", BLOCK_SIZE=" + blockSize + ", finalMergeEnabled="
- + finalMergeEnabled + ", pipelinedShuffle=" + pipelinedShuffle + ", "
+ + isFinalMergeEnabled() + ", pipelinedShuffle=" + pipelinedShuffle + ", "
+ "sendEmptyPartitionDetails=" + sendEmptyPartitionDetails);
long totalCapacityWithoutMeta = 0;
for (int i = 0; i < numberOfBlocks; i++) {
@@ -235,7 +224,7 @@ public class PipelinedSorter extends ExternalSorter {
if (pipelinedShuffle) {
List<Event> events = Lists.newLinkedList();
String pathComponent = (outputContext.getUniqueIdentifier() + "_" + (numSpills-1));
- ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, false, outputContext,
+ ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false, outputContext,
(numSpills - 1), indexCacheList.get(numSpills - 1), partitions, sendEmptyPartitionDetails,
pathComponent);
outputContext.sendEvents(events);
@@ -343,6 +332,21 @@ public class PipelinedSorter extends ExternalSorter {
mapOutputByteCounter.increment(valend - keystart);
}
+ private void adjustSpillCounters(long rawLength, long compLength) {
+ if (!isFinalMergeEnabled()) {
+ outputBytesWithOverheadCounter.increment(rawLength);
+ } else {
+ if (numSpills > 0) {
+ additionalSpillBytesWritten.increment(compLength);
+ // Reset the value will be set during the final merge.
+ outputBytesWithOverheadCounter.setValue(0);
+ } else {
+ // Set this up for the first write only. Subsequent ones will be handled in the final merge.
+ outputBytesWithOverheadCounter.increment(rawLength);
+ }
+ }
+ }
+
public void spill() throws IOException {
// create spill file
final long size = capacity +
@@ -381,6 +385,7 @@ public class PipelinedSorter extends ExternalSorter {
}
//close
writer.close();
+ adjustSpillCounters(writer.getRawLength(), writer.getCompressedLength());
// record offsets
final TezIndexRecord rec =
@@ -399,6 +404,11 @@ public class PipelinedSorter extends ExternalSorter {
//TODO: honor cache limits
indexCacheList.add(spillRec);
++numSpills;
+ if (!isFinalMergeEnabled()) {
+ fileOutputByteCounter.increment(rfs.getFileStatus(filename).getLen());
+ //No final merge. Set the number of files offered via shuffle-handler
+ numShuffleChunks.setValue(numSpills);
+ }
} finally {
out.close();
}
@@ -440,14 +450,13 @@ public class PipelinedSorter extends ExternalSorter {
//safe to clean up
bufferList.clear();
- numAdditionalSpills.increment(numSpills - 1);
if(indexCacheList.isEmpty()) {
LOG.warn("Index list is empty... returning");
return;
}
- if (!finalMergeEnabled) {
+ if (!isFinalMergeEnabled()) {
//Generate events for all spills
List<Event> events = Lists.newLinkedList();
@@ -459,16 +468,17 @@ public class PipelinedSorter extends ExternalSorter {
boolean isLastEvent = (i == numSpills - 1);
String pathComponent = (outputContext.getUniqueIdentifier() + "_" + i);
- ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
+ ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent,
outputContext, i, indexCacheList.get(i), partitions,
sendEmptyPartitionDetails, pathComponent);
LOG.info("Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i);
}
outputContext.sendEvents(events);
- //No need to generate final merge
return;
}
+ numAdditionalSpills.increment(numSpills - 1);
+
//In case final merge is required, the following code path is executed.
if (numSpills == 1) {
// someday be able to pass this directly to shuffle
@@ -485,6 +495,8 @@ public class PipelinedSorter extends ExternalSorter {
+ "finalIndexFile=" + finalIndexFile + ", filename=" + filename + ", indexFilename=" +
indexFilename);
}
+ numShuffleChunks.setValue(numSpills);
+ fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen());
return;
}
@@ -531,7 +543,7 @@ public class PipelinedSorter extends ExternalSorter {
new Path(uniqueIdentifier),
(RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf),
nullProgressable, sortSegments, true,
- null, spilledRecordsCounter, null,
+ null, spilledRecordsCounter, additionalSpillBytesRead,
null); // Not using any Progress in TezMerger. Should just work.
//write merged output to disk
@@ -548,6 +560,7 @@ public class PipelinedSorter extends ExternalSorter {
//close
writer.close();
+ outputBytesWithOverheadCounter.increment(writer.getRawLength());
// record offsets
final TezIndexRecord rec =
@@ -558,6 +571,9 @@ public class PipelinedSorter extends ExternalSorter {
spillRec.putIndex(rec, parts);
}
+ numShuffleChunks.setValue(1); //final merge has happened.
+ fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen());
+
spillRec.writeToFile(finalIndexFile, conf);
finalOut.close();
for (int i = 0; i < numSpills; i++) {
http://git-wip-us.apache.org/repos/asf/tez/blob/42b7756e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index afe07f0..ebf40f3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -120,8 +120,6 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
private long totalKeys = 0;
private long sameKey = 0;
- private final boolean finalMergeEnabled;
- private final boolean sendEmptyPartitionDetails;
public DefaultSorter(OutputContext outputContext, Configuration conf, int numOutputs,
long initialMemoryAvailable) throws IOException {
@@ -139,10 +137,6 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
indexCacheMemoryLimit = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES,
TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES_DEFAULT);
- finalMergeEnabled = conf.getBoolean(
- TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT,
- TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT);
-
boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT);
@@ -152,10 +146,6 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
+ "with DefaultSorter. It is supported only with PipelinedSorter.");
}
- sendEmptyPartitionDetails = conf.getBoolean(
- TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
- TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);
-
// buffers and accounting
int maxMemUsage = sortmb << 20;
maxMemUsage -= maxMemUsage % METASIZE;
@@ -175,7 +165,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
LOG.info(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + ": " + sortmb);
LOG.info("soft limit at " + softLimit);
LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
- LOG.info("kvstart = " + kvstart + "; length = " + maxRec + "; finalMergeEnabled = " + finalMergeEnabled);
+ LOG.info("kvstart = " + kvstart + "; length = " + maxRec + "; finalMergeEnabled = " + isFinalMergeEnabled());
}
// k/v serialization
@@ -716,7 +706,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
cleanup();
Thread.currentThread().interrupt();
}
- if (finalMergeEnabled) {
+ if (isFinalMergeEnabled()) {
fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen());
}
}
@@ -818,6 +808,22 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
spill(mstart, mend);
}
+ private void adjustSpillCounters(long rawLen, long compLength) {
+ if (!isFinalMergeEnabled()) {
+ outputBytesWithOverheadCounter.increment(rawLen);
+ } else {
+ if (numSpills > 0) {
+ additionalSpillBytesWritten.increment(compLength);
+ numAdditionalSpills.increment(1);
+ // Reset the value will be set during the final merge.
+ outputBytesWithOverheadCounter.setValue(0);
+ } else {
+ // Set this up for the first write only. Subsequent ones will be handled in the final merge.
+ outputBytesWithOverheadCounter.increment(rawLen);
+ }
+ }
+ }
+
protected void spill(int mstart, int mend)
throws IOException, InterruptedException {
@@ -879,15 +885,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
// close the writer
writer.close();
- if (numSpills > 0) {
- additionalSpillBytesWritten.increment(writer.getCompressedLength());
- numAdditionalSpills.increment(1);
- // Reset the value will be set during the final merge.
- outputBytesWithOverheadCounter.setValue(0);
- } else {
- // Set this up for the first write only. Subsequent ones will be handled in the final merge.
- outputBytesWithOverheadCounter.increment(writer.getRawLength());
- }
+ adjustSpillCounters(writer.getRawLength(), writer.getCompressedLength());
// record offsets
final TezIndexRecord rec =
new TezIndexRecord(
@@ -916,6 +914,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
}
LOG.info("Finished spill " + numSpills);
++numSpills;
+ if (!isFinalMergeEnabled()) {
+ numShuffleChunks.setValue(numSpills);
+ }
} finally {
if (out != null) out.close();
}
@@ -956,14 +957,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
}
writer.close();
- if (numSpills > 0) {
- additionalSpillBytesWritten.increment(writer.getCompressedLength());
- numAdditionalSpills.increment(1);
- outputBytesWithOverheadCounter.setValue(0);
- } else {
- // Set this up for the first write only. Subsequent ones will be handled in the final merge.
- outputBytesWithOverheadCounter.increment(writer.getRawLength());
- }
+ adjustSpillCounters(writer.getRawLength(), writer.getCompressedLength());
// record offsets
TezIndexRecord rec =
@@ -992,6 +986,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
++numSpills;
+ if (!isFinalMergeEnabled()) {
+ numShuffleChunks.setValue(numSpills);
+ }
} finally {
if (out != null) out.close();
}
@@ -1085,13 +1082,13 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
private void maybeSendEventForSpill(List<Event> events, boolean isLastEvent,
TezSpillRecord spillRecord, int index, boolean sendEvent) throws IOException {
- if (finalMergeEnabled) {
+ if (isFinalMergeEnabled()) {
return;
}
Preconditions.checkArgument(spillRecord != null, "Spill record can not be null");
String pathComponent = (outputContext.getUniqueIdentifier() + "_" + index);
- ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
+ ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent,
outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent);
LOG.info("Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index);
@@ -1102,7 +1099,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
}
private void maybeAddEventsForSpills() throws IOException {
- if (finalMergeEnabled) {
+ if (isFinalMergeEnabled()) {
return;
}
List<Event> events = Lists.newLinkedList();
@@ -1124,7 +1121,6 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
maybeSendEventForSpill(events, (i == numSpills - 1), spillRecord, i, false);
fileOutputByteCounter.increment(rfs.getFileStatus(spillFilePaths.get(i)).getLen());
}
-
outputContext.sendEvents(events);
}
@@ -1140,7 +1136,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
}
if (numSpills == 1) { //the spill is the final output
- if (finalMergeEnabled) {
+ if (isFinalMergeEnabled()) {
finalOutputFile = mapOutputFile.getOutputFileForWriteInVolume(filename[0]);
finalIndexFile = mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]);
sameVolRename(filename[0], finalOutputFile);
@@ -1160,6 +1156,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
fileOutputByteCounter.increment(rfs.getFileStatus(spillFilePaths.get(0)).getLen());
//No need to populate finalIndexFile, finalOutputFile etc when finalMerge is disabled
}
+ numShuffleChunks.setValue(numSpills);
return;
}
@@ -1170,7 +1167,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
}
//Check if it is needed to do final merge. Or else, exit early.
- if (numSpills > 0 && !finalMergeEnabled) {
+ if (numSpills > 0 && !isFinalMergeEnabled()) {
maybeAddEventsForSpills();
//No need to do final merge.
return;
@@ -1181,7 +1178,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
- if (finalMergeEnabled) {
+ if (isFinalMergeEnabled()) {
finalOutputFile = mapOutputFile.getOutputFileForWrite(finalOutFileSize);
finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
} else if (numSpills == 0) {
@@ -1219,12 +1216,13 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
} finally {
finalOut.close();
}
-
- if (!finalMergeEnabled) {
+ ++numSpills;
+ if (!isFinalMergeEnabled()) {
List<Event> events = Lists.newLinkedList();
maybeSendEventForSpill(events, true, sr, 0, true);
fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen());
}
+ numShuffleChunks.setValue(numSpills);
return;
}
else {
@@ -1277,6 +1275,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
runCombineProcessor(kvIter, writer);
}
writer.close();
+ outputBytesWithOverheadCounter.increment(writer.getRawLength());
// record offsets
final TezIndexRecord rec =
@@ -1286,6 +1285,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
writer.getCompressedLength());
spillRec.putIndex(rec, parts);
}
+ numShuffleChunks.setValue(1); //final merge has happened
spillRec.writeToFile(finalIndexFile, conf);
finalOut.close();
for(int i = 0; i < numSpills; i++) {
http://git-wip-us.apache.org/repos/asf/tez/blob/42b7756e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index 5de96c9..8bf91ce 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -13,6 +13,8 @@ import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.ExecutionContext;
@@ -60,7 +62,7 @@ import static org.mockito.internal.verification.VerificationModeFactory.times;
* limitations under the License.
*/
public class TestPipelinedSorter {
- private static final Configuration conf = new Configuration();
+ private static Configuration conf = new Configuration();
private static FileSystem localFs = null;
private static Path workDir = null;
private OutputContext outputContext;
@@ -114,6 +116,7 @@ public class TestPipelinedSorter {
public void reset() throws IOException {
cleanup();
localFs.mkdirs(workDir);
+ conf = new Configuration();
}
@Test
@@ -124,6 +127,17 @@ public class TestPipelinedSorter {
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5);
basicTest(1, 100000, 100, (10 * 1024l * 1024l), 3 << 20);
+ }
+
+ @Test
+ public void testWithEmptyData() throws IOException {
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5);
+ //# partition, # of keys, size per key, InitialMem, blockSize
+ basicTest(1, 0, 0, (10 * 1024l * 1024l), 3 << 20);
+ }
+
+ @Test
+ public void basicTestWithSmallBlockSize() throws IOException {
try {
//3 MB key & 3 MB value, whereas block size is just 3 MB
basicTest(1, 5, (3 << 20), (10 * 1024l * 1024l), 3 << 20);
@@ -133,11 +147,13 @@ public class TestPipelinedSorter {
ioe.getMessage().contains("Record too large for in-memory buffer."
+ " Exceeded buffer overflow limit"));
}
+ }
+ @Test
+ public void testWithLargeKeyValue() throws IOException {
//15 MB key & 15 MB value, 48 MB sort buffer. block size is 48MB (or 1 block)
//meta would be 16 MB
basicTest(1, 5, (15 << 20), (48 * 1024l * 1024l), 48 << 20);
-
}
@Test
@@ -154,7 +170,7 @@ public class TestPipelinedSorter {
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5);
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
- initialAvailableMem, 1<<20);
+ initialAvailableMem, 1 << 20);
//Write 100 keys each of size 10
writeData(sorter, 10000, 100);
@@ -172,6 +188,7 @@ public class TestPipelinedSorter {
writeData(sorter, numKeys, keySize);
+ verifyCounters(sorter, outputContext);
Path outputFile = sorter.finalOutputFile;
FileSystem fs = outputFile.getFileSystem(conf);
@@ -181,6 +198,41 @@ public class TestPipelinedSorter {
reader.close();
}
+ private void verifyCounters(PipelinedSorter sorter, OutputContext context) {
+ TezCounter numShuffleChunks = context.getCounters().findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT);
+ TezCounter additionalSpills =
+ context.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
+ TezCounter additionalSpillBytesWritten =
+ context.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
+ TezCounter additionalSpillBytesRead =
+ context.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
+
+ if (sorter.isFinalMergeEnabled()) {
+ assertTrue(additionalSpills.getValue() == (sorter.getNumSpills() - 1));
+ //Number of files served by shuffle-handler
+ assertTrue(1 == numShuffleChunks.getValue());
+ if (sorter.getNumSpills() > 1) {
+ assertTrue(additionalSpillBytesRead.getValue() > 0);
+ assertTrue(additionalSpillBytesWritten.getValue() > 0);
+ }
+ } else {
+ assertTrue(0 == additionalSpills.getValue());
+ //Number of files served by shuffle-handler
+ assertTrue(sorter.getNumSpills() == numShuffleChunks.getValue());
+ assertTrue(additionalSpillBytesRead.getValue() == 0);
+ assertTrue(additionalSpillBytesWritten.getValue() == 0);
+ }
+
+ TezCounter finalOutputBytes =
+ context.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
+ assertTrue(finalOutputBytes.getValue() > 0);
+
+ TezCounter outputBytesWithOverheadCounter = context.getCounters().findCounter
+ (TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
+ assertTrue(outputBytesWithOverheadCounter.getValue() > 0);
+ }
+
+
@Test
//Its not possible to allocate > 2 GB in test environment. Carry out basic checks here.
public void memTest() throws IOException {
http://git-wip-us.apache.org/repos/asf/tez/blob/42b7756e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
index 70dce13..072eafc 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.io.Text;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.api.Event;
@@ -171,6 +172,7 @@ public class TestDefaultSorter {
try {
writeData(sorter, 1000, 1000);
assertTrue(sorter.getNumSpills() > 2);
+ verifyCounters(sorter, context);
} catch(IOException ioe) {
fail(ioe.getMessage());
}
@@ -191,6 +193,7 @@ public class TestDefaultSorter {
sorter.flush();
sorter.close();
assertTrue(sorter.getFinalOutputFile().getParent().getName().equalsIgnoreCase(UniqueID));
+ verifyCounters(sorter, context);
} catch(Exception e) {
fail();
}
@@ -213,13 +216,13 @@ public class TestDefaultSorter {
sorter.close();
assertTrue(sorter.getFinalOutputFile().getParent().getName().equalsIgnoreCase(UniqueID +
"_0"));
- assertTrue(context.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL).getValue() > 0);
+ verifyCounters(sorter, context);
} catch(Exception e) {
fail();
}
}
- @Test(timeout = 30000)
+ @Test(timeout = 60000)
@SuppressWarnings("unchecked")
public void testWithSingleSpillWithFinalMergeDisabled() throws IOException {
OutputContext context = createTezOutputContext();
@@ -245,10 +248,10 @@ public class TestDefaultSorter {
}
}
- assertTrue(context.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL).getValue() > 0);
+ verifyCounters(sorter, context);
}
- @Test(timeout = 30000)
+ @Test(timeout = 60000)
@SuppressWarnings("unchecked")
public void testWithMultipleSpillsWithFinalMergeDisabled() throws IOException {
OutputContext context = createTezOutputContext();
@@ -277,7 +280,37 @@ public class TestDefaultSorter {
}
}
assertTrue(spillIndex == spillCount);
- assertTrue(context.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL).getValue() > 0);
+ verifyCounters(sorter, context);
+ }
+
+ private void verifyCounters(DefaultSorter sorter, OutputContext context) {
+ TezCounter numShuffleChunks = context.getCounters().findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT);
+ TezCounter additionalSpills = context.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
+ TezCounter additionalSpillBytesWritten = context.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
+ TezCounter additionalSpillBytesRead = context.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
+
+ if (sorter.isFinalMergeEnabled()) {
+ assertTrue(additionalSpills.getValue() == (sorter.getNumSpills() - 1));
+ //Number of files served by shuffle-handler
+ assertTrue(1 == numShuffleChunks.getValue());
+ if (sorter.getNumSpills() > 1) {
+ assertTrue(additionalSpillBytesRead.getValue() > 0);
+ assertTrue(additionalSpillBytesWritten.getValue() > 0);
+ }
+ } else {
+ assertTrue(0 == additionalSpills.getValue());
+ //Number of files served by shuffle-handler
+ assertTrue(sorter.getNumSpills() == numShuffleChunks.getValue());
+ assertTrue(additionalSpillBytesRead.getValue() == 0);
+ assertTrue(additionalSpillBytesWritten.getValue() == 0);
+ }
+
+ TezCounter finalOutputBytes = context.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
+ assertTrue(finalOutputBytes.getValue() > 0);
+
+ TezCounter outputBytesWithOverheadCounter = context.getCounters().findCounter
+ (TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
+ assertTrue(outputBytesWithOverheadCounter.getValue() > 0);
}
private void writeData(ExternalSorter sorter, int numKeys, int keyLen) throws IOException {
http://git-wip-us.apache.org/repos/asf/tez/blob/42b7756e/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java b/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java
index 25c149d..52342a2 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java
@@ -256,7 +256,7 @@ public class TestPipelinedShuffle {
TezCounters counters = dagStatus.getDAGCounters();
//Ensure that atleast 10 spills were there in this job.
- assertTrue(counters.findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT).getValue() > 10);
+ assertTrue(counters.findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT).getValue() > 10);
if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
System.out.println("DAG diagnostics: " + dagStatus.getDiagnostics());