You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2017/01/25 07:33:02 UTC
tez git commit: TEZ-3582. Exception swallowed in PipelinedSorter
causing incorrect results (rbalamohan)
Repository: tez
Updated Branches:
refs/heads/master 4765b558c -> abab52694
TEZ-3582. Exception swallowed in PipelinedSorter causing incorrect results (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/abab5269
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/abab5269
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/abab5269
Branch: refs/heads/master
Commit: abab526940f6353866d866b93d6da685edfa6014
Parents: 4765b55
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Wed Jan 25 13:02:53 2017 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Wed Jan 25 13:02:53 2017 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../common/sort/impl/PipelinedSorter.java | 20 ++++++++++++++++----
2 files changed, 17 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/abab5269/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 597df5d..8170d10 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3582. Exception swallowed in PipelinedSorter causing incorrect results.
TEZ-3584. amKeepAliveService in TezClient should shutdown in case of AM failure.
TEZ-3462. Task attempt failure during container shutdown loses useful container diagnostics
TEZ-3579. Wrong configuration key for max slow start fraction in CartesianProductVertexManager.
http://git-wip-us.apache.org/repos/asf/tez/blob/abab5269/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 9b3aadb..4258fff 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -339,6 +339,7 @@ public class PipelinedSorter extends ExternalSorter {
} else {
// queue up the sort
SortTask task = new SortTask(span, sorter);
+ LOG.debug("Submitting span={} for sort", span.toString());
Future<SpanIterator> future = sortmaster.submit(task);
merger.add(future);
span = newSpan;
@@ -975,8 +976,15 @@ public class PipelinedSorter extends ExternalSorter {
items = 1024*1024;
perItem = 16;
}
- newSpan = new SortSpan(remaining, items, perItem,
- ConfigUtils.getIntermediateOutputKeyComparator(conf));
+ final RawComparator newComparator = ConfigUtils.getIntermediateOutputKeyComparator(conf);
+ if (this.comparator == newComparator) {
+ LOG.warn("Same comparator used. comparator={}, newComparator={},"
+ + " hashCode: comparator={}, newComparator={}",
+ this.comparator, newComparator,
+ System.identityHashCode(this.comparator),
+ System.identityHashCode(newComparator));
+ }
+ newSpan = new SortSpan(remaining, items, perItem, newComparator);
newSpan.index = index+1;
LOG.info(String.format(outputContext.getDestinationVertexName() + ": " + "New Span%d.length = %d, perItem = %d", newSpan.index, newSpan
.length(), perItem) + ", counter:" + mapOutputRecordCounter.getValue());
@@ -1284,6 +1292,7 @@ public class PipelinedSorter extends ExternalSorter {
}
public final boolean ready() throws IOException, InterruptedException {
+ int numSpanItr = futures.size();
try {
SpanIterator iter = null;
while(this.futures.size() > 0) {
@@ -1305,8 +1314,11 @@ public class PipelinedSorter extends ExternalSorter {
LOG.info(outputContext.getDestinationVertexName() + ": " + "Heap = " + sb.toString());
return true;
} catch(ExecutionException e) {
- LOG.info(outputContext.getDestinationVertexName() + ": " + e.toString());
- return false;
+ LOG.error("Heap size={}, total={}, eq={}, partition={}, gallop={}, totalItr={},"
+ + " futures.size={}, destVertexName={}",
+ heap.size(), total, eq, partition, gallop, numSpanItr, futures.size(),
+ outputContext.getDestinationVertexName(), e);
+ throw new IOException(e);
}
}