You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/09/21 19:36:46 UTC
[2/2] tez git commit: TEZ-2775. Improve and consolidate logging in
Runtime components. Contributed by Siddharth Seth and Rajesh Balamohan.
TEZ-2775. Improve and consolidate logging in Runtime components.
Contributed by Siddharth Seth and Rajesh Balamohan.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/983ceeee
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/983ceeee
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/983ceeee
Branch: refs/heads/master
Commit: 983ceeee19535b92e60b1f8c61bb1e8786b8ffa0
Parents: 7ed7025
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Sep 21 10:36:32 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Sep 21 10:36:32 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../resources/tez-container-log4j.properties | 2 +-
.../hadoop/mapred/split/TezGroupedSplit.java | 12 ++
.../hadoop/mapreduce/split/TezGroupedSplit.java | 12 ++
.../org/apache/tez/mapreduce/input/MRInput.java | 36 ++++--
.../tez/mapreduce/input/MRInputLegacy.java | 8 +-
.../tez/mapreduce/input/MultiMRInput.java | 22 ++--
.../apache/tez/mapreduce/output/MROutput.java | 13 ++-
.../runtime/LogicalIOProcessorRuntimeTask.java | 39 ++++++-
.../apache/tez/runtime/task/TaskReporter.java | 2 +-
.../org/apache/tez/http/HttpConnection.java | 4 +-
.../library/common/InputAttemptIdentifier.java | 2 +-
.../runtime/library/common/TezRuntimeUtils.java | 8 +-
.../runtime/library/common/ValuesIterator.java | 2 -
.../runtime/library/common/shuffle/Fetcher.java | 109 +++++++++++++------
.../common/shuffle/ShuffleEventHandler.java | 1 +
.../library/common/shuffle/ShuffleUtils.java | 40 +++++--
.../impl/ShuffleInputEventHandlerImpl.java | 41 +++++--
.../common/shuffle/impl/ShuffleManager.java | 84 ++++++++------
.../impl/SimpleFetchedInputAllocator.java | 26 +++--
.../orderedgrouped/FetcherOrderedGrouped.java | 66 ++++++++---
.../shuffle/orderedgrouped/MergeManager.java | 81 +++++++-------
.../common/shuffle/orderedgrouped/Shuffle.java | 27 +++--
.../ShuffleInputEventHandlerOrderedGrouped.java | 42 +++++--
.../orderedgrouped/ShuffleScheduler.java | 75 ++++++++-----
.../common/sort/impl/ExternalSorter.java | 26 +++--
.../common/sort/impl/PipelinedSorter.java | 107 +++++++++++-------
.../common/sort/impl/dflt/DefaultSorter.java | 75 +++++++------
.../writers/UnorderedPartitionedKVWriter.java | 49 +++++----
.../library/input/OrderedGroupedKVInput.java | 20 +++-
.../runtime/library/input/UnorderedKVInput.java | 16 ++-
.../output/OrderedPartitionedKVOutput.java | 6 +-
.../library/output/UnorderedKVOutput.java | 6 +-
.../output/UnorderedPartitionedKVOutput.java | 4 +-
.../impl/TestSimpleFetchedInputAllocator.java | 2 +-
.../sort/impl/dflt/TestDefaultSorter.java | 10 +-
36 files changed, 713 insertions(+), 364 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0e93702..ee1e255 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2775. Improve and consolidate logging in Runtime components.
TEZ-2097. TEZ-UI Add dag logs backend support
TEZ-2812. Preemption sometimes does not respect heartbeats between preemptions
TEZ-814. Improve heuristic for determining a task has failed outputs
@@ -185,6 +186,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2775. Improve and consolidate logging in Runtime components.
TEZ-2097. TEZ-UI Add dag logs backend support
TEZ-2812. Preemption sometimes does not respect heartbeats between preemptions
TEZ-814. Improve heuristic for determining a task has failed outputs
http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-dag/src/main/resources/tez-container-log4j.properties
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/resources/tez-container-log4j.properties b/tez-dag/src/main/resources/tez-container-log4j.properties
index c53994e..4620a78 100644
--- a/tez-dag/src/main/resources/tez-container-log4j.properties
+++ b/tez-dag/src/main/resources/tez-container-log4j.properties
@@ -28,7 +28,7 @@ log4j.appender.CLA=org.apache.tez.common.TezContainerLogAppender
log4j.appender.CLA.containerLogDir=${yarn.app.container.log.dir}
log4j.appender.CLA.layout=org.apache.log4j.PatternLayout
-log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} [%p] [%t]|| %c{2} %m%n:
+log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} [%p] [%t] |%c{2}|: %m%n
#
# Event Counter Appender
http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java
index 4f3a0f2..a9893aa 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java
@@ -22,6 +22,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -175,4 +176,15 @@ public class TezGroupedSplit implements InputSplit, Configurable {
public String getRack() {
return rack;
}
+
+ @Override
+ public String toString() {
+ return "TezGroupedSplit{" +
+ "wrappedSplits=" + wrappedSplits +
+ ", wrappedInputFormatName='" + wrappedInputFormatName + '\'' +
+ ", locations=" + Arrays.toString(locations) +
+ ", rack='" + rack + '\'' +
+ ", length=" + length +
+ '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java
index f85bbcd..430d2ec 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java
@@ -22,6 +22,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -185,4 +186,15 @@ public class TezGroupedSplit extends InputSplit
public String getRack() {
return rack;
}
+
+ @Override
+ public String toString() {
+ return "TezGroupedSplit{" +
+ "wrappedSplits=" + wrappedSplits +
+ ", wrappedInputFormatName='" + wrappedInputFormatName + '\'' +
+ ", locations=" + Arrays.toString(locations) +
+ ", rack='" + rack + '\'' +
+ ", length=" + length +
+ '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 70365cd..93161cb 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -437,8 +437,9 @@ public class MRInput extends MRInputBase {
getContext().inputIsReady();
this.splitInfoViaEvents = jobConf.getBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS_DEFAULT);
- LOG.info("Using New mapreduce API: " + useNewApi
- + ", split information via event: " + splitInfoViaEvents);
+ LOG.info(getContext().getSourceVertexName() + " using newmapreduce API=" + useNewApi +
+ ", split via event=" + splitInfoViaEvents + ", numPhysicalInputs=" +
+ getNumPhysicalInputs());
initializeInternal();
return null;
}
@@ -447,7 +448,6 @@ public class MRInput extends MRInputBase {
public void start() {
Preconditions.checkState(getNumPhysicalInputs() == 0 || getNumPhysicalInputs() == 1,
"Expecting 0 or 1 physical input for MRInput");
- LOG.info("MRInput setup to received {} events", getNumPhysicalInputs());
}
@Private
@@ -490,7 +490,7 @@ public class MRInput extends MRInputBase {
} finally {
rrLock.unlock();
}
- LOG.info("Initialzed MRInput: " + getContext().getSourceVertexName());
+ LOG.info("Initialized MRInput: " + getContext().getSourceVertexName());
}
/**
@@ -588,7 +588,9 @@ public class MRInput extends MRInputBase {
rrLock.lock();
try {
initFromEventInternal(event);
- LOG.info("Notifying on RecordReader Initialized");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getContext().getSourceVertexName() + " notifying on RecordReader initialized");
+ }
rrInited.signal();
} finally {
rrLock.unlock();
@@ -599,7 +601,9 @@ public class MRInput extends MRInputBase {
assert rrLock.getHoldCount() == 1;
rrLock.lock();
try {
- LOG.info("Awaiting RecordReader initialization");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getContext().getSourceVertexName() + " awaiting RecordReader initialization");
+ }
rrInited.await();
} catch (Exception e) {
throw new IOException(
@@ -621,22 +625,30 @@ public class MRInput extends MRInputBase {
}
private void initFromEventInternal(InputDataInformationEvent initEvent) throws IOException {
- LOG.info("Initializing RecordReader from event");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getContext().getSourceVertexName() + " initializing RecordReader from event");
+ }
Preconditions.checkState(initEvent != null, "InitEvent must be specified");
MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(initEvent.getUserPayload()));
Object split = null;
if (useNewApi) {
split = MRInputUtils.getNewSplitDetailsFromEvent(splitProto, jobConf);
- LOG.info("Split Details -> SplitClass: " + split.getClass().getName() + ", NewSplit: "
- + split);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " +
+ split.getClass().getName() + ", NewSplit: "
+ + split);
+ }
} else {
split = MRInputUtils.getOldSplitDetailsFromEvent(splitProto, jobConf);
- LOG.info("Split Details -> SplitClass: " + split.getClass().getName() + ", OldSplit: "
- + split);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " +
+ split.getClass().getName() + ", OldSplit: "
+ + split);
+ }
}
mrReader.setSplit(split);
- LOG.info("Initialized RecordReader from event");
+ LOG.info(getContext().getSourceVertexName() + " initialized RecordReader from event");
}
private static class MRInputHelpersInternal extends MRInputHelpers {
http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
index d825d53..e83c36a 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
@@ -73,7 +73,7 @@ public class MRInputLegacy extends MRInput {
@Private
protected void initializeInternal() throws IOException {
- LOG.info("MRInputLegacy deferring initialization");
+ LOG.info(getContext().getSourceVertexName() + " MRInputLegacy deferring initialization");
}
@Private
@@ -130,7 +130,11 @@ public class MRInputLegacy extends MRInput {
}
if (splitInfoViaEvents && !inited) {
if (initEvent == null) {
- LOG.info("Awaiting init event before initializing record reader");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getContext().getSourceVertexName() +
+ " awaiting init event before initializing record reader");
+ }
+
try {
eventCondition.await();
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
index 44d9c96..4a792dc 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
@@ -108,8 +108,8 @@ public class MultiMRInput extends MRInputBase {
@Override
public List<Event> initialize() throws IOException {
super.initialize();
- LOG.info("Using New mapreduce API: " + useNewApi + ", numPhysicalInputs: "
- + getNumPhysicalInputs());
+ LOG.info(getContext().getSourceVertexName() + " using newmapreduce API=" + useNewApi +
+ ", numPhysicalInputs=" + getNumPhysicalInputs());
if (getNumPhysicalInputs() == 0) {
getContext().inputIsReady();
}
@@ -164,7 +164,9 @@ public class MultiMRInput extends MRInputBase {
private MRReader initFromEvent(InputDataInformationEvent event) throws IOException {
Preconditions.checkState(event != null, "Event must be specified");
- LOG.info("Initializing Reader: " + eventCount.get());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getContext().getSourceVertexName() + " initializing Reader: " + eventCount.get());
+ }
MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(event.getUserPayload()));
Object split = null;
MRReader reader = null;
@@ -176,17 +178,21 @@ public class MultiMRInput extends MRInputBase {
.getClusterTimestamp(), getContext().getTaskVertexIndex(), getContext()
.getApplicationId().getId(), getContext().getTaskIndex(), getContext()
.getTaskAttemptNumber());
- LOG.info("Split Details -> SplitClass: " + split.getClass().getName() + ", NewSplit: "
- + split);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " +
+ split.getClass().getName() + ", NewSplit: " + split);
+ }
} else {
split = MRInputUtils.getOldSplitDetailsFromEvent(splitProto, localJobConf);
reader = new MRReaderMapred(localJobConf, (org.apache.hadoop.mapred.InputSplit) split,
getContext().getCounters(), inputRecordCounter);
- LOG.info("Split Details -> SplitClass: " + split.getClass().getName() + ", OldSplit: "
- + split);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " +
+ split.getClass().getName() + ", OldSplit: " + split);
+ }
}
- LOG.info("Initialized RecordReader from event");
+ LOG.info(getContext().getSourceVertexName() + " initialized RecordReader from event");
return reader;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index a3b19ed..12e5092 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -344,7 +344,6 @@ public class MROutput extends AbstractLogicalOutput {
@Override
public List<Event> initialize() throws IOException, InterruptedException {
- LOG.info("Initializing Simple Output");
getContext().requestInitialMemory(0l, null); //mandatory call
taskNumberFormat.setMinimumIntegerDigits(5);
taskNumberFormat.setGroupingUsed(false);
@@ -381,6 +380,8 @@ public class MROutput extends AbstractLogicalOutput {
}
}
+ String outputFormatClassName;
+
outputRecordCounter = getContext().getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);
if (useNewApi) {
@@ -389,6 +390,7 @@ public class MROutput extends AbstractLogicalOutput {
newOutputFormat =
org.apache.hadoop.util.ReflectionUtils.newInstance(
newApiTaskAttemptContext.getOutputFormatClass(), jobConf);
+ outputFormatClassName = newOutputFormat.getClass().getName();
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
@@ -405,6 +407,7 @@ public class MROutput extends AbstractLogicalOutput {
jobConf, taskAttemptId,
new MRTaskReporter(getContext()));
oldOutputFormat = jobConf.getOutputFormat();
+ outputFormatClassName = oldOutputFormat.getClass().getName();
FileSystem fs = FileSystem.get(jobConf);
String finalName = getOutputName();
@@ -415,8 +418,9 @@ public class MROutput extends AbstractLogicalOutput {
}
initCommitter(jobConf, useNewApi);
- LOG.info("Initialized Simple Output"
- + ", using_new_api: " + useNewApi);
+ LOG.info(getContext().getDestinationVertexName() + ": "
+ + "outputFormat=" + outputFormatClassName
+ + ", using newmapreduce API=" + useNewApi);
return null;
}
@@ -518,6 +522,7 @@ public class MROutput extends AbstractLogicalOutput {
@Override
public synchronized List<Event> close() throws IOException {
flush();
+ LOG.info(getContext().getDestinationVertexName() + " closed");
long outputRecords = getContext().getCounters()
.findCounter(TaskCounter.OUTPUT_RECORDS).getValue();
getContext().getStatisticsReporter().reportItemsProcessed(outputRecords);
@@ -535,7 +540,6 @@ public class MROutput extends AbstractLogicalOutput {
return;
}
- LOG.info("Flushing Simple Output");
if (useNewApi) {
try {
newRecordWriter.close(newApiTaskAttemptContext);
@@ -545,7 +549,6 @@ public class MROutput extends AbstractLogicalOutput {
} else {
oldRecordWriter.close(null);
}
- LOG.info("Flushed Simple Output");
}
/**
http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index cc2e234..5b0e62f 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -407,6 +407,17 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
@Override
protected Void callInternal() throws Exception {
+ String oldThreadName = Thread.currentThread().getName();
+ try {
+ Thread.currentThread().setName(oldThreadName + "{" + inputSpec.getSourceVertexName() + "}");
+ return _callInternal();
+ } finally {
+ Thread.currentThread().setName(oldThreadName);
+ }
+ }
+
+ protected Void _callInternal() throws Exception {
+
if (LOG.isDebugEnabled()) {
LOG.debug("Initializing Input using InputSpec: " + inputSpec);
}
@@ -442,6 +453,17 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
@Override
protected Void callInternal() throws Exception {
+ String oldThreadName = Thread.currentThread().getName();
+ try {
+ Thread.currentThread().setName(oldThreadName + " Start: {" + srcVertexName + "}");
+ return _callInternal();
+ } finally {
+ Thread.currentThread().setName(oldThreadName);
+ }
+ }
+
+ protected Void _callInternal() throws Exception {
+ Thread.currentThread().setName("InitializerStart {" + srcVertexName + "}");
if (LOG.isDebugEnabled()) {
LOG.debug("Starting Input with src edge: " + srcVertexName);
}
@@ -464,6 +486,17 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
@Override
protected Void callInternal() throws Exception {
+ String oldThreadName = Thread.currentThread().getName();
+ try {
+ Thread.currentThread().setName(oldThreadName + "{" + outputSpec.getDestinationVertexName() + "}");
+ return _callInternal();
+ } finally {
+ Thread.currentThread().setName(oldThreadName);
+ }
+ }
+
+ protected Void _callInternal() throws Exception {
+ Thread.currentThread().setName("Initializer {" + outputSpec.getDestinationVertexName() + "}");
if (LOG.isDebugEnabled()) {
LOG.debug("Initializing Output using OutputSpec: " + outputSpec);
}
@@ -741,8 +774,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
});
- eventRouterThread.setName("TezTaskEventRouter["
- + taskSpec.getTaskAttemptID().toString() + "]");
+ eventRouterThread.setName("TezTaskEventRouter{"
+ + taskSpec.getTaskAttemptID().toString() + "}");
eventRouterThread.start();
}
@@ -776,7 +809,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
public void cleanup() throws InterruptedException {
- LOG.info("Final Counters : " + getCounters().toShortString());
+ LOG.info("Final Counters for " + taskSpec.getTaskAttemptID() + ": " + getCounters().toShortString());
setTaskDone();
if (eventRouterThread != null) {
eventRouterThread.interrupt();
http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index 3579e3f..6705020 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -282,7 +282,7 @@ public class TaskReporter implements TaskReporterInterface {
int numEventsReceived = 0;
if (task.isTaskDone() || task.hadFatalError()) {
if (response.getEvents() != null && !response.getEvents().isEmpty()) {
- LOG.info("Current task already complete, Ignoring all event in"
+ LOG.info("Current task already complete, Ignoring all events in"
+ " heartbeat response, eventCount=" + response.getEvents().size());
}
} else {
http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
index 841e542..c94262a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
@@ -266,7 +266,9 @@ public class HttpConnection extends BaseHttpConnection {
stopWatch.reset().start();
try {
if (input != null) {
- LOG.info("Closing input on " + logIdentifier);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing input on " + logIdentifier);
+ }
input.close();
input = null;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
index 59fb638..d70942c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
@@ -146,6 +146,6 @@ public class InputAttemptIdentifier {
public String toString() {
return "InputAttemptIdentifier [inputIdentifier=" + inputIdentifier
+ ", attemptNumber=" + attemptNumber + ", pathComponent="
- + pathComponent + ", fetchTypeInfo=" + fetchTypeInfo + ", spillEventId=" + spillEventId +"]";
+ + pathComponent + ", spillType=" + fetchTypeInfo.ordinal() + ", spillId=" + spillEventId +"]";
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
index 69436ba..819423f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
@@ -64,7 +64,9 @@ public class TezRuntimeUtils {
if (className == null) {
return null;
}
- LOG.info("Using Combiner class: " + className);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using Combiner class: " + className);
+ }
try {
clazz = (Class<? extends Combiner>) conf.getClassByName(className);
} catch (ClassNotFoundException e) {
@@ -105,7 +107,9 @@ public class TezRuntimeUtils {
+ conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS), e);
}
- LOG.info("Using partitioner class: " + clazz.getName());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using partitioner class: " + clazz.getName());
+ }
Partitioner partitioner = null;
http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
index 24f9f8a..f4da742 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
@@ -45,7 +45,6 @@ import com.google.common.base.Preconditions;
@Private
public class ValuesIterator<KEY,VALUE> {
- private static final Logger LOG = LoggerFactory.getLogger(ValuesIterator.class.getName());
protected TezRawKeyValueIterator in; //input iterator
private KEY key; // current key
private KEY nextKey;
@@ -82,7 +81,6 @@ public class ValuesIterator<KEY,VALUE> {
this.keyDeserializer.open(keyIn);
this.valDeserializer = serializationFactory.getDeserializer(valClass);
this.valDeserializer.open(this.valueIn);
- LOG.info("keyDeserializer=" + keyDeserializer + "; valueDeserializer=" + valDeserializer);
}
TezRawKeyValueIterator getRawIterator() { return in; }
http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 8057be8..261f2e7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -219,8 +219,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
fetcherCallback.fetchFailed(host, left, hostFetchResult.connectFailed);
}
} else {
- LOG.info("Ignoring failed fetch reports for " + hostFetchResult.failedInputs.length +
- " inputs since the fetcher has already been stopped");
+ if (isDebugEnabled) {
+ LOG.debug("Ignoring failed fetch reports for " + hostFetchResult.failedInputs.length +
+ " inputs since the fetcher has already been stopped");
+ }
}
}
@@ -431,9 +433,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
// indirectly penalizing the host
InputAttemptIdentifier[] failedFetches = null;
if (isShutDown.get()) {
- LOG.info(
- "Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." +
- e.getClass().getName() + ", Message: " + e.getMessage());
+ if (isDebugEnabled) {
+ LOG.debug(
+ "Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." +
+ e.getClass().getName() + ", Message: " + e.getMessage());
+ }
} else {
failedFetches = srcAttemptsRemaining.values().
toArray(new InputAttemptIdentifier[srcAttemptsRemaining.values().size()]);
@@ -443,7 +447,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
if (isShutDown.get()) {
// shutdown would have no effect if in the process of establishing the connection.
shutdownInternal();
- LOG.info("Detected fetcher has been shutdown after connection establishment. Returning");
+ if (isDebugEnabled) {
+ LOG.debug("Detected fetcher has been shutdown after connection establishment. Returning");
+ }
return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), null, false);
}
@@ -457,9 +463,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
// with the first map, typically lost map. So, penalize only that map
// and add the rest
if (isShutDown.get()) {
- LOG.info(
- "Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." +
- e.getClass().getName() + ", Message: " + e.getMessage());
+ if (isDebugEnabled) {
+ LOG.debug(
+ "Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." +
+ e.getClass().getName() + ", Message: " + e.getMessage());
+ }
} else {
InputAttemptIdentifier firstAttempt = attempts.iterator().next();
LOG.warn("Fetch Failure from host while connecting: " + host + ", attempt: " + firstAttempt
@@ -489,7 +497,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
if (isShutDown.get()) {
// shutdown would have no effect if in the process of establishing the connection.
shutdownInternal();
- LOG.info("Detected fetcher has been shutdown after opening stream. Returning");
+ if (isDebugEnabled) {
+ LOG.debug("Detected fetcher has been shutdown after opening stream. Returning");
+ }
return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), null, false);
}
// After this point, closing the stream and connection, should cause a
@@ -504,7 +514,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
while (!srcAttemptsRemaining.isEmpty() && failedInputs == null) {
if (isShutDown.get()) {
shutdownInternal(true);
- LOG.info("Fetcher already shutdown. Aborting queued fetches for " + srcAttemptsRemaining.size() + " inputs");
+ if (isDebugEnabled) {
+ LOG.debug("Fetcher already shutdown. Aborting queued fetches for " +
+ srcAttemptsRemaining.size() + " inputs");
+ }
return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), null,
false);
}
@@ -514,7 +527,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
//clean up connection
shutdownInternal(true);
if (isShutDown.get()) {
- LOG.info("Fetcher already shutdown. Aborting reconnection and queued fetches for " + srcAttemptsRemaining.size() + " inputs");
+ if (isDebugEnabled) {
+ LOG.debug("Fetcher already shutdown. Aborting reconnection and queued fetches for " +
+ srcAttemptsRemaining.size() + " inputs");
+ }
return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), null,
false);
}
@@ -527,8 +543,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
}
if (isShutDown.get() && failedInputs != null && failedInputs.length > 0) {
- LOG.info("Fetcher already shutdown. Not reporting fetch failures for: " +
- failedInputs.length + " failed inputs");
+ if (isDebugEnabled) {
+ LOG.debug("Fetcher already shutdown. Not reporting fetch failures for: " +
+ failedInputs.length + " failed inputs");
+ }
failedInputs = null;
}
return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), failedInputs,
@@ -546,7 +564,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
Iterator<Entry<String, InputAttemptIdentifier>> iterator = srcAttemptsRemaining.entrySet().iterator();
while (iterator.hasNext()) {
if (isShutDown.get()) {
- LOG.info("Already shutdown. Skipping fetch for " + srcAttemptsRemaining.size() + " inputs");
+ if (isDebugEnabled) {
+ LOG.debug(
+ "Already shutdown. Skipping fetch for " + srcAttemptsRemaining.size() + " inputs");
+ }
break;
}
InputAttemptIdentifier srcAttemptId = iterator.next().getValue();
@@ -571,9 +592,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
@Override
public void freeResources(FetchedInput fetchedInput) {}
});
- LOG.info("fetcher" + " about to shuffle output of srcAttempt (direct disk)" + srcAttemptId
- + " decomp: " + idxRecord.getRawLength() + " len: " + idxRecord.getPartLength()
- + " to " + fetchedInput.getType());
+ if (isDebugEnabled) {
+ LOG.debug("fetcher" + " about to shuffle output of srcAttempt (direct disk)" + srcAttemptId
+ + " decomp: " + idxRecord.getRawLength() + " len: " + idxRecord.getPartLength()
+ + " to " + fetchedInput.getType());
+ }
long endTime = System.currentTimeMillis();
fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput, idxRecord.getPartLength(),
@@ -582,9 +605,12 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
} catch (IOException e) {
cleanupFetchedInput(fetchedInput);
if (isShutDown.get()) {
- LOG.info(
- "Already shutdown. Ignoring Local Fetch Failure for " + srcAttemptId + " from host " +
- host + " : " + e.getClass().getName() + ", message=" + e.getMessage());
+ if (isDebugEnabled) {
+ LOG.debug(
+ "Already shutdown. Ignoring Local Fetch Failure for " + srcAttemptId +
+ " from host " +
+ host + " : " + e.getClass().getName() + ", message=" + e.getMessage());
+ }
break;
}
if (failMissing) {
@@ -598,8 +624,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
InputAttemptIdentifier[] failedFetches = null;
if (failMissing && srcAttemptsRemaining.size() > 0) {
if (isShutDown.get()) {
- LOG.info("Already shutdown, not reporting fetch failures for: " + srcAttemptsRemaining.size() +
- " remaining inputs");
+ if (isDebugEnabled) {
+ LOG.debug(
+ "Already shutdown, not reporting fetch failures for: " + srcAttemptsRemaining.size() +
+ " remaining inputs");
+ }
} else {
failedFetches = srcAttemptsRemaining.values().
toArray(new InputAttemptIdentifier[srcAttemptsRemaining.values().size()]);
@@ -653,6 +682,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
public void shutdown() {
if (!isShutDown.getAndSet(true)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Shutting down fetcher for host: " + host);
+ }
shutdownInternal();
}
}
@@ -673,7 +705,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
} catch (IOException e) {
LOG.info("Exception while shutting down fetcher on " + logIdentifier + " : "
+ e.getMessage());
- if (LOG.isDebugEnabled()) {
+ if (isDebugEnabled) {
LOG.debug(StringUtils.EMPTY, e);
}
}
@@ -708,7 +740,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
// Don't know which one was bad, so consider all of them as bad
return srcAttemptsRemaining.values().toArray(new InputAttemptIdentifier[srcAttemptsRemaining.size()]);
} else {
- LOG.info("Already shutdown. Ignoring badId error with message: " + e.getMessage());
+ if (isDebugEnabled) {
+ LOG.debug("Already shutdown. Ignoring badId error with message: " + e.getMessage());
+ }
return null;
}
}
@@ -724,12 +758,14 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
assert (srcAttemptId != null);
return new InputAttemptIdentifier[]{srcAttemptId};
} else {
- LOG.info("Already shutdown. Ignoring verification failure.");
+ if (isDebugEnabled) {
+ LOG.debug("Already shutdown. Ignoring verification failure.");
+ }
return null;
}
}
- if (LOG.isDebugEnabled()) {
+ if (isDebugEnabled) {
LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength
+ ", decomp len: " + decompressedLength);
}
@@ -753,10 +789,12 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
// }
// Go!
- LOG.info("fetcher" + " about to shuffle output of srcAttempt "
- + fetchedInput.getInputAttemptIdentifier() + " decomp: "
- + decompressedLength + " len: " + compressedLength + " to "
- + fetchedInput.getType());
+ if (isDebugEnabled) {
+ LOG.debug("fetcher" + " about to shuffle output of srcAttempt "
+ + fetchedInput.getInputAttemptIdentifier() + " decomp: "
+ + decompressedLength + " len: " + compressedLength + " to "
+ + fetchedInput.getType());
+ }
if (fetchedInput.getType() == Type.MEMORY) {
ShuffleUtils.shuffleToMemory(((MemoryFetchedInput) fetchedInput).getBytes(),
@@ -765,7 +803,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
fetchedInput.getInputAttemptIdentifier().toString());
} else if (fetchedInput.getType() == Type.DISK) {
ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(),
- (host +":" +port), input, compressedLength, LOG,
+ (host +":" +port), input, compressedLength, decompressedLength, LOG,
fetchedInput.getInputAttemptIdentifier().toString());
} else {
throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " +
@@ -795,8 +833,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
} catch (IOException ioe) {
if (isShutDown.get()) {
cleanupFetchedInput(fetchedInput);
- LOG.info("Already shutdown. Ignoring exception during fetch " + ioe.getClass().getName() +
- ", Message: " + ioe.getMessage());
+ if (isDebugEnabled) {
+ LOG.debug(
+ "Already shutdown. Ignoring exception during fetch " + ioe.getClass().getName() +
+ ", Message: " + ioe.getMessage());
+ }
return null;
}
if (shouldRetry(srcAttemptId, ioe)) {
http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleEventHandler.java
index ff66158..da7c944 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleEventHandler.java
@@ -25,4 +25,5 @@ import org.apache.tez.runtime.api.Event;
public interface ShuffleEventHandler {
public void handleEvents(List<Event> events) throws IOException;
+ public void logProgress(boolean updateOnClose);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index c7cc907..818cfaa 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -117,12 +117,16 @@ public class ShuffleUtils {
Logger LOG, String identifier) throws IOException {
try {
IFile.Reader.readToMemory(shuffleData, input, compressedLength, codec,
- ifileReadAhead, ifileReadAheadLength);
+ ifileReadAhead, ifileReadAheadLength);
// metrics.inputBytes(shuffleData.length);
- LOG.info("Read " + shuffleData.length + " bytes from input for "
- + identifier);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Read " + shuffleData.length + " bytes from input for "
+ + identifier);
+ }
} catch (IOException ioe) {
// Close the streams
+ LOG.info("Failed to read data to memory for " + identifier + ". len=" + compressedLength +
+ ", decomp=" + decompressedLength + ". ExceptionMessage=" + ioe.getMessage());
ioCleanup(input);
// Re-throw
throw ioe;
@@ -130,7 +134,7 @@ public class ShuffleUtils {
}
public static void shuffleToDisk(OutputStream output, String hostIdentifier,
- InputStream input, long compressedLength, Logger LOG, String identifier)
+ InputStream input, long compressedLength, long decompressedLength, Logger LOG, String identifier)
throws IOException {
// Copy data to local-disk
long bytesLeft = compressedLength;
@@ -148,12 +152,16 @@ public class ShuffleUtils {
// metrics.inputBytes(n);
}
- LOG.info("Read " + (compressedLength - bytesLeft)
- + " bytes from input for " + identifier);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Read " + (compressedLength - bytesLeft)
+ + " bytes from input for " + identifier);
+ }
output.close();
} catch (IOException ioe) {
// Close the streams
+ LOG.info("Failed to read data to disk for " + identifier + ". len=" + compressedLength +
+ ", decomp=" + decompressedLength + ". ExceptionMessage=" + ioe.getMessage());
ioCleanup(input, output);
// Re-throw
throw ioe;
@@ -479,12 +487,28 @@ public class ShuffleUtils {
}
log.info(
"Completed fetch for attempt: "
- + srcAttemptIdentifier + " to " + outputType +
- ", CompressedSize=" + bytesCompressed + ", DecompressedSize=" + bytesDecompressed +
+ + toShortString(srcAttemptIdentifier)
+ +" to " + outputType +
+ ", csize=" + bytesCompressed + ", dsize=" + bytesDecompressed +
", EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
MBPS_FORMAT.get().format(rate) + " MB/s");
}
+ private static String toShortString(InputAttemptIdentifier inputAttemptIdentifier) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{");
+ sb.append(inputAttemptIdentifier.getInputIdentifier().getInputIndex());
+ sb.append(", ").append(inputAttemptIdentifier.getAttemptNumber());
+ sb.append(", ").append(inputAttemptIdentifier.getPathComponent());
+ if (inputAttemptIdentifier.getFetchTypeInfo()
+ != InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED) {
+ sb.append(", ").append(inputAttemptIdentifier.getFetchTypeInfo().ordinal());
+ sb.append(", ").append(inputAttemptIdentifier.getSpillEventId());
+ }
+ sb.append("}");
+ return sb.toString();
+ }
+
/**
* Build {@link org.apache.tez.http.HttpConnectionParams} from configuration
*
http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
index 61b3e3a..8fb1568 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
@@ -22,6 +22,7 @@ package org.apache.tez.runtime.library.common.shuffle.impl;
import java.io.IOException;
import java.util.BitSet;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import com.google.protobuf.ByteString;
@@ -32,20 +33,15 @@ import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.InputIdentifier;
-import org.apache.tez.runtime.library.common.shuffle.DiskFetchedInput;
-import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
-import org.apache.tez.runtime.library.common.shuffle.MemoryFetchedInput;
import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
-import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataProto;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -57,16 +53,24 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
private static final Logger LOG = LoggerFactory.getLogger(ShuffleInputEventHandlerImpl.class);
private final ShuffleManager shuffleManager;
+ //TODO: unused. Consider removing later?
private final FetchedInputAllocator inputAllocator;
private final CompressionCodec codec;
private final boolean ifileReadAhead;
private final int ifileReadAheadLength;
private final boolean useSharedInputs;
+ private final InputContext inputContext;
+
+ private final AtomicInteger nextToLogEventCount = new AtomicInteger(0);
+ private final AtomicInteger numDmeEvents = new AtomicInteger(0);
+ private final AtomicInteger numObsoletionEvents = new AtomicInteger(0);
+ private final AtomicInteger numDmeEventsNoData = new AtomicInteger(0);
public ShuffleInputEventHandlerImpl(InputContext inputContext,
ShuffleManager shuffleManager,
FetchedInputAllocator inputAllocator, CompressionCodec codec,
boolean ifileReadAhead, int ifileReadAheadLength) {
+ this.inputContext = inputContext;
this.shuffleManager = shuffleManager;
this.inputAllocator = inputAllocator;
this.codec = codec;
@@ -86,13 +90,29 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
private void handleEvent(Event event) throws IOException {
if (event instanceof DataMovementEvent) {
+ numDmeEvents.incrementAndGet();
processDataMovementEvent((DataMovementEvent)event);
shuffleManager.updateEventReceivedTime();
} else if (event instanceof InputFailedEvent) {
- processInputFailedEvent((InputFailedEvent)event);
+ numObsoletionEvents.incrementAndGet();
+ processInputFailedEvent((InputFailedEvent) event);
} else {
throw new TezUncheckedException("Unexpected event type: " + event.getClass().getName());
}
+ if (numDmeEvents.get() + numObsoletionEvents.get() > nextToLogEventCount.get()) {
+ logProgress(false);
+ // Log every 50 events seen.
+ nextToLogEventCount.addAndGet(50);
+ }
+ }
+
+ @Override
+ public void logProgress(boolean updateOnClose) {
+ LOG.info(inputContext.getSourceVertexName() + ": "
+ + "numDmeEventsSeen=" + numDmeEvents.get()
+ + ", numDmeEventsSeenWithNoData=" + numDmeEventsNoData.get()
+ + ", numObsoletionEventsSeen=" + numObsoletionEvents.get()
+ + (updateOnClose == true ? ", updateOnClose" : ""));
}
private void processDataMovementEvent(DataMovementEvent dme) throws IOException {
@@ -104,9 +124,11 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
}
int srcIndex = dme.getSourceIndex();
- LOG.info("DME srcIdx: " + srcIndex + ", targetIndex: " + dme.getTargetIndex()
- + ", attemptNum: " + dme.getVersion() + ", payload: " + ShuffleUtils
- .stringify(shufflePayload));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("DME srcIdx: " + srcIndex + ", targetIndex: " + dme.getTargetIndex()
+ + ", attemptNum: " + dme.getVersion() + ", payload: " + ShuffleUtils
+ .stringify(shufflePayload));
+ }
if (shufflePayload.hasEmptyPartitions()) {
byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload
@@ -119,6 +141,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
LOG.debug("Source partition: " + srcIndex + " did not generate any data. SrcAttempt: ["
+ srcAttemptIdentifier + "]. Not fetching.");
}
+ numDmeEventsNoData.incrementAndGet();
shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier);
return;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 1977d5f..a7c1c59 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -226,11 +226,11 @@ public class ShuffleManager implements FetcherCallback {
ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(
numFetchers,
new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("Fetcher [" + srcNameTrimmed + "] #%d").build());
+ .setNameFormat("Fetcher {" + srcNameTrimmed + "} #%d").build());
this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
ExecutorService schedulerRawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
- .setDaemon(true).setNameFormat("ShuffleRunner [" + srcNameTrimmed + "]").build());
+ .setDaemon(true).setNameFormat("ShuffleRunner {" + srcNameTrimmed + "}").build());
this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor);
this.schedulerCallable = new RunShuffleCallable(conf);
@@ -268,7 +268,7 @@ public class ShuffleManager implements FetcherCallback {
shuffleInfoEventsMap = new ConcurrentHashMap<InputIdentifier, ShuffleEventInfo>();
- LOG.info(this.getClass().getSimpleName() + " : numInputs=" + numInputs + ", compressionCodec="
+ LOG.info(srcNameTrimmed + ": numInputs=" + numInputs + ", compressionCodec="
+ (codec == null ? "NoCompressionCodec" : codec.getClass().getName()) + ", numFetchers="
+ numFetchers + ", ifileBufferSize=" + ifileBufferSize + ", ifileReadAheadEnabled="
+ ifileReadAhead + ", ifileReadAheadLength=" + ifileReadAheadLength +", "
@@ -315,7 +315,7 @@ public class ShuffleManager implements FetcherCallback {
}
if (LOG.isDebugEnabled()) {
- LOG.debug("NumCompletedInputs: " + numCompletedInputs);
+ LOG.debug(srcNameTrimmed + ": " + "NumCompletedInputs: " + numCompletedInputs);
}
if (numCompletedInputs.get() < numInputs && !isShutdown.get()) {
lock.lock();
@@ -328,7 +328,7 @@ public class ShuffleManager implements FetcherCallback {
inputHost = pendingHosts.take();
} catch (InterruptedException e) {
if (isShutdown.get()) {
- LOG.info("Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
+ LOG.info(srcNameTrimmed + ": " + "Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
Thread.currentThread().interrupt();
break;
} else {
@@ -336,13 +336,13 @@ public class ShuffleManager implements FetcherCallback {
}
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Processing pending host: " + inputHost.toDetailedString());
+ LOG.debug(srcNameTrimmed + ": " + "Processing pending host: " + inputHost.toDetailedString());
}
if (inputHost.getNumPendingInputs() > 0 && !isShutdown.get()) {
Fetcher fetcher = constructFetcherForHost(inputHost, conf);
runningFetchers.add(fetcher);
if (isShutdown.get()) {
- LOG.info("hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
+ LOG.info(srcNameTrimmed + ": " + "hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
break;
}
ListenableFuture<FetchResult> future = fetcherExecutor
@@ -353,7 +353,7 @@ public class ShuffleManager implements FetcherCallback {
}
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping host: " + inputHost.getIdentifier()
+ LOG.debug(srcNameTrimmed + ": " + "Skipping host: " + inputHost.getIdentifier()
+ " since it has no inputs to process");
}
}
@@ -364,8 +364,7 @@ public class ShuffleManager implements FetcherCallback {
}
}
shufflePhaseTime.setValue(System.currentTimeMillis() - startTime);
- LOG.info("Shutting down FetchScheduler, Was Interrupted: " + Thread.currentThread().isInterrupted());
- // TODO NEWTEZ Maybe clean up inputs.
+ LOG.info(srcNameTrimmed + ": " + "Shutting down FetchScheduler, Was Interrupted: " + Thread.currentThread().isInterrupted());
if (!fetcherExecutor.isShutdown()) {
fetcherExecutor.shutdownNow();
}
@@ -450,9 +449,11 @@ public class ShuffleManager implements FetcherCallback {
}
fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(),
inputHost.getSrcPhysicalIndex(), pendingInputsForHost);
- LOG.info("Created Fetcher for host: " + inputHost.getHost()
- + ", info: " + inputHost.getAdditionalInfo()
- + ", with inputs: " + pendingInputsForHost);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created Fetcher for host: " + inputHost.getHost()
+ + ", info: " + inputHost.getAdditionalInfo()
+ + ", with inputs: " + pendingInputsForHost);
+ }
return fetcherBuilder.build();
}
@@ -471,7 +472,7 @@ public class ShuffleManager implements FetcherCallback {
}
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Adding input: " + srcAttemptIdentifier + ", to host: " + host);
+ LOG.debug(srcNameTrimmed + ": " + "Adding input: " + srcAttemptIdentifier + ", to host: " + host);
}
if (!validateInputAttemptForPipelinedShuffle(srcAttemptIdentifier)) {
@@ -501,7 +502,9 @@ public class ShuffleManager implements FetcherCallback {
public void addCompletedInputWithNoData(
InputAttemptIdentifier srcAttemptIdentifier) {
InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
- LOG.info("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
+ }
if (!completedInputSet.contains(inputIdentifier)) {
synchronized (completedInputSet) {
@@ -574,8 +577,10 @@ public class ShuffleManager implements FetcherCallback {
}
boolean isDone() {
- LOG.info("finalEventId=" + finalEventId + ", eventsProcessed cardinality=" +
- eventsProcessed.cardinality());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("finalEventId=" + finalEventId + ", eventsProcessed cardinality=" +
+ eventsProcessed.cardinality());
+ }
return ((finalEventId != -1) && (finalEventId + 1) == eventsProcessed.cardinality());
}
@@ -631,10 +636,10 @@ public class ShuffleManager implements FetcherCallback {
lock.lock();
try {
totalBytesShuffledTillNow += fetchedBytes;
+ logProgress();
} finally {
lock.unlock();
}
- logProgress();
}
}
}
@@ -751,7 +756,7 @@ public class ShuffleManager implements FetcherCallback {
InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
// TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
// For now, reporting immediately.
- LOG.info("Fetch failed for src: " + srcAttemptIdentifier
+ LOG.info(srcNameTrimmed + ": " + "Fetch failed for src: " + srcAttemptIdentifier
+ "InputIdentifier: " + srcAttemptIdentifier + ", connectFailed: "
+ connectFailed);
failedShufflesCounter.increment(1);
@@ -778,7 +783,7 @@ public class ShuffleManager implements FetcherCallback {
if (Thread.currentThread().isInterrupted()) {
//TODO: need to cleanup all FetchedInput (DiskFetchedInput, LocalDisFetchedInput), lockFile
//As of now relying on job cleanup (when all directories would be cleared)
- LOG.info("Thread interrupted. Need to cleanup the local dirs");
+ LOG.info(srcNameTrimmed + ": " + "Thread interrupted. Need to cleanup the local dirs");
}
if (!isShutdown.getAndSet(true)) {
// Shut down any pending fetchers
@@ -890,16 +895,23 @@ public class ShuffleManager implements FetcherCallback {
}
}
+ private final AtomicInteger nextProgressLineEventCount = new AtomicInteger(0);
+
private void logProgress() {
- double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);
int inputsDone = numCompletedInputs.get();
- long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
- double transferRate = mbs / secsSinceStart;
- LOG.info("copy(" + inputsDone + " (spillsFetched=" + numFetchedSpills.get() + ") of " +
- numInputs +
- ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) "
- + mbpsFormat.format(transferRate) + " MB/s)");
+ if (inputsDone > nextProgressLineEventCount.get() || inputsDone == numInputs) {
+ nextProgressLineEventCount.addAndGet(50);
+ double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);
+ long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
+
+ double transferRate = mbs / secsSinceStart;
+ LOG.info("copy(" + inputsDone + " (spillsFetched=" + numFetchedSpills.get() + ") of " +
+ numInputs +
+ ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) "
+ + mbpsFormat.format(transferRate) + " MB/s)");
+
+ }
}
@@ -907,15 +919,17 @@ public class ShuffleManager implements FetcherCallback {
@Override
public void onSuccess(Void result) {
- LOG.info("Scheduler thread completed");
+ LOG.info(srcNameTrimmed + ": " + "Scheduler thread completed");
}
@Override
public void onFailure(Throwable t) {
if (isShutdown.get()) {
- LOG.info("Already shutdown. Ignoring error: " + t);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(srcNameTrimmed + ": " + "Already shutdown. Ignoring error: " + t);
+ }
} else {
- LOG.error("Scheduler failed with error: ", t);
+ LOG.error(srcNameTrimmed + ": " + "Scheduler failed with error: ", t);
inputContext.fatalError(t, "Shuffle Scheduler Failed");
}
}
@@ -944,7 +958,9 @@ public class ShuffleManager implements FetcherCallback {
public void onSuccess(FetchResult result) {
fetcher.shutdown();
if (isShutdown.get()) {
- LOG.info("Already shutdown. Ignoring event from fetcher");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(srcNameTrimmed + ": " + "Already shutdown. Ignoring event from fetcher");
+ }
} else {
Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs();
if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
@@ -965,9 +981,11 @@ public class ShuffleManager implements FetcherCallback {
// Unsuccessful - the fetcher may not have shutdown correctly. Try shutting it down.
fetcher.shutdown();
if (isShutdown.get()) {
- LOG.info("Already shutdown. Ignoring error from fetcher: " + t);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(srcNameTrimmed + ": " + "Already shutdown. Ignoring error from fetcher: " + t);
+ }
} else {
- LOG.error("Fetcher failed with error: ", t);
+ LOG.error(srcNameTrimmed + ": " + "Fetcher failed with error: ", t);
shuffleError = t;
inputContext.fatalError(t, "Fetch failed");
doBookKeepingForFetcherComplete();
http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java
index 31a8651..604d213 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java
@@ -60,11 +60,14 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
private final long maxAvailableTaskMemory;
private final long initialMemoryAvailable;
+
+ private final String srcNameTrimmed;
private volatile long usedMemory = 0;
- public SimpleFetchedInputAllocator(String uniqueIdentifier, Configuration conf,
+ public SimpleFetchedInputAllocator(String srcNameTrimmed, String uniqueIdentifier, Configuration conf,
long maxTaskAvailableMemory, long memoryAvailable) {
+ this.srcNameTrimmed = srcNameTrimmed;
this.conf = conf;
this.maxAvailableTaskMemory = maxTaskAvailableMemory;
this.initialMemoryAvailable = memoryAvailable;
@@ -92,8 +95,6 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
this.memoryLimit = initialMemoryAvailable;
}
- LOG.info("RequestedMem=" + memReq + ", Allocated: " + this.memoryLimit);
-
final float singleShuffleMemoryLimitPercent = conf.getFloat(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT_DEFAULT);
@@ -107,9 +108,13 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
//TODO: cap it to MAX_VALUE until MemoryFetchedInput can handle > 2 GB
this.maxSingleShuffleLimit = (long) Math.min((memoryLimit * singleShuffleMemoryLimitPercent),
Integer.MAX_VALUE);
-
- LOG.info("SimpleInputManager -> " + "MemoryLimit: " +
- this.memoryLimit + ", maxSingleMemLimit: " + this.maxSingleShuffleLimit);
+
+ LOG.info(srcNameTrimmed + ": "
+ + "RequestedMemory=" + memReq
+ + ", AssignedMemorty=" + this.memoryLimit
+ + ", maxSingleShuffleLimit=" + this.maxSingleShuffleLimit
+ );
+
}
@Private
@@ -137,7 +142,10 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
fileNameAllocator);
} else {
this.usedMemory += actualSize;
- LOG.info("Used memory after allocating " + actualSize + " : " + usedMemory);
+ if (LOG.isDebugEnabled()) {
+ LOG.info(srcNameTrimmed + ": " + "Used memory after allocating " + actualSize + " : " +
+ usedMemory);
+ }
return new MemoryFetchedInput(actualSize, compressedSize, inputAttemptIdentifier, this);
}
}
@@ -196,7 +204,9 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
private synchronized void unreserve(long size) {
this.usedMemory -= size;
- LOG.info("Used memory after freeing " + size + " : " + usedMemory);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(srcNameTrimmed + ": " + "Used memory after freeing " + size + " : " + usedMemory);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 47df8f2..1b4031d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -190,6 +190,9 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
public void shutDown() {
if (!stopped) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Fetcher stopped for host " + mapHost);
+ }
stopped = true;
// An interrupt will come in while shutting down the thread.
cleanupCurrentConnection(false);
@@ -264,14 +267,19 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
// Setup connection again if disconnected
cleanupCurrentConnection(true);
if (stopped) {
- LOG.info("Not re-establishing connection since Fetcher has been stopped");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not re-establishing connection since Fetcher has been stopped");
+ }
return;
}
// Connect with retry
if (!setupConnection(host, remaining.values())) {
if (stopped) {
cleanupCurrentConnection(true);
- LOG.info("Not reporting connection re-establishment failure since fetcher is stopped");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Not reporting connection re-establishment failure since fetcher is stopped");
+ }
return;
}
failedTasks = new InputAttemptIdentifier[] {getNextRemainingAttempt()};
@@ -282,8 +290,10 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
if (failedTasks != null && failedTasks.length > 0) {
if (stopped) {
- LOG.info("Ignoring copyMapOutput failures for tasks: " + Arrays.toString(failedTasks) +
- " since Fetcher has been stopped");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring copyMapOutput failures for tasks: " + Arrays.toString(failedTasks) +
+ " since Fetcher has been stopped");
+ }
} else {
LOG.warn("copyMapOutput failed for tasks " + Arrays.toString(failedTasks));
for (InputAttemptIdentifier left : failedTasks) {
@@ -316,7 +326,9 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
connectSucceeded = httpConnection.connect();
if (stopped) {
- LOG.info("Detected fetcher has been shutdown after connection establishment. Returning");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Detected fetcher has been shutdown after connection establishment. Returning");
+ }
return false;
}
input = httpConnection.getInputStream();
@@ -327,7 +339,9 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
Thread.currentThread().interrupt(); //reset status
}
if (stopped) {
- LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not reporting fetch failure, since an Exception was caught after shutdown");
+ }
return false;
}
ioErrs.increment(1);
@@ -393,7 +407,9 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
InputAttemptIdentifier.PATH_PREFIX + ", partition: " + header.forReduce);
return new InputAttemptIdentifier[] {getNextRemainingAttempt()};
} else {
- LOG.info("Already shutdown. Ignoring invalid map id error");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Already shutdown. Ignoring invalid map id error");
+ }
return EMPTY_ATTEMPT_ID_ARRAY;
}
}
@@ -410,8 +426,10 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
// the remaining because we dont know where to start reading from. YARN-1773
return new InputAttemptIdentifier[] {getNextRemainingAttempt()};
} else {
- LOG.info("Already shutdown. Ignoring invalid map id error. Exception: " +
- e.getClass().getName() + ", Message: " + e.getMessage());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Already shutdown. Ignoring invalid map id error. Exception: " +
+ e.getClass().getName() + ", Message: " + e.getMessage());
+ }
return EMPTY_ATTEMPT_ID_ARRAY;
}
}
@@ -427,7 +445,9 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
assert (srcAttemptId != null);
return new InputAttemptIdentifier[]{srcAttemptId};
} else {
- LOG.info("Already stopped. Ignoring verification failure.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Already stopped. Ignoring verification failure.");
+ }
return EMPTY_ATTEMPT_ID_ARRAY;
}
}
@@ -446,7 +466,9 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
ioErrs.increment(1);
scheduler.reportLocalError(e);
} else {
- LOG.info("Already stopped. Ignoring error from merger.reserve");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Already stopped. Ignoring error from merger.reserve");
+ }
}
return EMPTY_ATTEMPT_ID_ARRAY;
}
@@ -459,16 +481,19 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
}
// Go!
- LOG.info("fetcher#" + id + " about to shuffle output of map " +
- mapOutput.getAttemptIdentifier() + " decomp: " +
- decompressedLength + " len: " + compressedLength + " to " + mapOutput.getType());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("fetcher#" + id + " about to shuffle output of map " +
+ mapOutput.getAttemptIdentifier() + " decomp: " +
+ decompressedLength + " len: " + compressedLength + " to " + mapOutput.getType());
+ }
+
if (mapOutput.getType() == Type.MEMORY) {
ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input,
(int) decompressedLength, (int) compressedLength, codec, ifileReadAhead,
ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier().toString());
} else if (mapOutput.getType() == Type.DISK) {
ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(),
- input, compressedLength, LOG, mapOutput.getAttemptIdentifier().toString());
+ input, compressedLength, decompressedLength, LOG, mapOutput.getAttemptIdentifier().toString());
} else {
throw new IOException("Unknown mapOutput type while fetching shuffle data:" +
mapOutput.getType());
@@ -487,8 +512,10 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
return null;
} catch (IOException ioe) {
if (stopped) {
- LOG.info("Not reporting fetch failure for exception during data copy: ["
- + ioe.getClass().getName() + ", " + ioe.getMessage() + "]");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not reporting fetch failure for exception during data copy: ["
+ + ioe.getClass().getName() + ", " + ioe.getMessage() + "]");
+ }
cleanupCurrentConnection(true);
if (mapOutput != null) {
mapOutput.abort(); // Release resources
@@ -654,7 +681,10 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " +
host.getHostIdentifier(), e);
} else {
- LOG.info("Ignoring fetch error during local disk copy since fetcher has already been stopped");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Ignoring fetch error during local disk copy since fetcher has already been stopped");
+ }
return;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 0a44a6b..eb2cece 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -17,27 +17,12 @@
*/
package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
import com.google.common.annotations.VisibleForTesting;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
import com.google.common.base.Preconditions;
import org.apache.commons.io.FilenameUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumFileSystem;
import org.apache.hadoop.fs.FileStatus;
@@ -61,12 +46,25 @@ import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.combine.Combiner;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
-import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
-import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
+import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
import org.apache.tez.runtime.library.hadoop.compat.NullProgressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -229,10 +227,15 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
} else {
this.postMergeMemLimit = maxRedBuffer;
}
-
- LOG.info("InitialRequest: ShuffleMem=" + memLimit + ", postMergeMem=" + maxRedBuffer
- + ", RuntimeTotalAvailable=" + this.initialMemoryAvailable + ". Updated to: ShuffleMem="
- + this.memoryLimit + ", postMergeMem: " + this.postMergeMemLimit);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ inputContext.getSourceVertexName() + ": " + "InitialRequest: ShuffleMem=" + memLimit +
+ ", postMergeMem=" + maxRedBuffer
+ + ", RuntimeTotalAvailable=" + this.initialMemoryAvailable +
+ ". Updated to: ShuffleMem="
+ + this.memoryLimit + ", postMergeMem: " + this.postMergeMemLimit);
+ }
this.ioSortFactor =
conf.getInt(
@@ -262,10 +265,11 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
conf.getFloat(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT_DEFAULT));
- LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " +
+ LOG.info(inputContext.getSourceVertexName() + ": MergerManager: memoryLimit=" + memoryLimit + ", " +
"maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +
"mergeThreshold=" + mergeThreshold + ", " +
"ioSortFactor=" + ioSortFactor + ", " +
+ "postMergeMem=" + postMergeMemLimit + ", " +
"memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);
if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
@@ -320,8 +324,6 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
long memLimit = conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
(long)(maxAvailableTaskMemory * maxInMemCopyUse));
- LOG.info("Initial Shuffle Memory Required: " + memLimit + ", based on INPUT_BUFFER_factor: " + maxInMemCopyUse);
-
float maxRedPer = conf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT,
TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_BUFFER_PERCENT_DEFAULT);
if (maxRedPer > 1.0 || maxRedPer < 0.0) {
@@ -329,7 +331,9 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
}
long maxRedBuffer = (long) (maxAvailableTaskMemory * maxRedPer);
- LOG.info("Initial Memory required for final merged output: " + maxRedBuffer + ", using factor: " + maxRedPer);
+ LOG.info("Initial Memory required for SHUFFLE_BUFFER=" + memLimit +
+ " based on INPUT_BUFFER_FACTOR=" + maxInMemCopyUse + ", for final merged output=" +
+ maxRedBuffer + ", using factor: " + maxRedPer);
long reqMem = Math.max(maxRedBuffer, memLimit);
return reqMem;
@@ -384,9 +388,11 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
int fetcher
) throws IOException {
if (!canShuffleToMemory(requestedSize)) {
- LOG.info(srcAttemptIdentifier + ": Shuffling to disk since " + requestedSize +
- " is greater than maxSingleShuffleLimit (" +
- maxSingleShuffleLimit + ")");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(srcAttemptIdentifier + ": Shuffling to disk since " + requestedSize +
+ " is greater than maxSingleShuffleLimit (" +
+ maxSingleShuffleLimit + ")");
+ }
return MapOutput.createDiskMapOutput(srcAttemptIdentifier, this, compressedLength, conf,
fetcher, true, mapOutputFile);
}
@@ -450,9 +456,9 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
public synchronized void closeInMemoryFile(MapOutput mapOutput) {
inMemoryMapOutputs.add(mapOutput);
LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
- + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
- + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory + ", mapOutput=" +
- mapOutput);
+ + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
+ + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory + ", mapOutput=" +
+ mapOutput);
commitMemory+= mapOutput.getSize();
@@ -474,7 +480,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
private void startMemToDiskMerge() {
synchronized (inMemoryMerger) {
if (!inMemoryMerger.isInProgress()) {
- LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
+ LOG.info(inputContext.getSourceVertexName() + ": " + "Starting inMemoryMerger's merge since commitMemory=" +
commitMemory + " > mergeThreshold=" + mergeThreshold +
". Current usedMemory=" + usedMemory);
inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
@@ -486,7 +492,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
public synchronized void closeInMemoryMergedFile(MapOutput mapOutput) {
inMemoryMergedMapOutputs.add(mapOutput);
- LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() +
+ LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() +
", inMemoryMergedMapOutputs.size() -> " +
inMemoryMergedMapOutputs.size());
}
@@ -631,7 +637,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
Writer writer =
new InMemoryWriter(mergedMapOutputs.getArrayStream());
- LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
+ LOG.info(inputContext.getSourceVertexName() + ": " + "Initiating Memory-to-Memory merge with " + noInMemorySegments +
" segments of total-size: " + mergeOutputSize);
// Nothing will be materialized to disk because the sort factor is being
@@ -648,7 +654,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
TezMerger.writeFile(rIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
writer.close();
- LOG.info(inputContext.getUniqueIdentifier() +
+ LOG.info(inputContext.getSourceVertexName() +
" Memory-to-Memory merge of the " + noInMemorySegments +
" files in-memory complete.");
@@ -715,7 +721,6 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
outputPath = mapOutputFile.getInputFileForWrite(
srcTaskIdentifier.getInputIdentifier().getInputIndex(), srcTaskIdentifier.getSpillEventId(),
mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX);
- LOG.info("Patch..InMemoryMerger outputPath: " + outputPath);
Writer writer = null;
long outFileLen = 0;
@@ -891,7 +896,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
final long outputLen = localFS.getFileStatus(outputPath).getLen();
closeOnDiskFile(new FileChunk(outputPath, 0, outputLen));
- LOG.info(inputContext.getUniqueIdentifier() +
+ LOG.info(inputContext.getSourceVertexName() +
" Finished merging " + inputs.size() +
" map output files on disk of total-size " +
approxOutputSize + "." +