You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/11/13 14:54:30 UTC
[1/2] tez git commit: TEZ-2918. Make progress notifications in IOs
(bikas)
Repository: tez
Updated Branches:
refs/heads/master 344a8cc14 -> 5ec498d8f
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
index 0de400e..c8ae67b 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
@@ -22,21 +22,19 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import java.io.IOException;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.io.RawComparator;
-import org.apache.tez.runtime.InputReadyTracker;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.Input;
-import org.apache.tez.runtime.api.MergedLogicalInput;
import org.apache.tez.runtime.api.MergedInputContext;
import org.apache.tez.runtime.api.Reader;
-import org.apache.tez.runtime.api.impl.TezMergedInputContextImpl;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.junit.Test;
@@ -44,8 +42,7 @@ import org.junit.Test;
public class TestSortedGroupedMergedInput {
MergedInputContext createMergedInputContext() {
- return new TezMergedInputContextImpl(null, "mergedInputName", new HashMap<String, MergedLogicalInput>(),
- mock(InputReadyTracker.class), null);
+ return mock(MergedInputContext.class);
}
@Test(timeout = 5000)
@@ -67,7 +64,8 @@ public class TestSortedGroupedMergedInput {
sInputs.add(sInput1);
sInputs.add(sInput2);
sInputs.add(sInput3);
- OrderedGroupedMergedKVInput input = new OrderedGroupedMergedKVInput(createMergedInputContext(), sInputs);
+ MergedInputContext mockContext = createMergedInputContext();
+ OrderedGroupedMergedKVInput input = new OrderedGroupedMergedKVInput(mockContext, sInputs);
KeyValuesReader kvsReader = input.getReader();
int keyCount = 0;
@@ -84,6 +82,7 @@ public class TestSortedGroupedMergedInput {
}
assertEquals(6, valCount);
}
+ verify(mockContext, times(4)).notifyProgress(); // one for each reader change and one to exit
getNextFromFinishedReader(kvsReader);
}
@@ -120,7 +119,8 @@ public class TestSortedGroupedMergedInput {
sInputs.add(sInput2);
sInputs.add(sInput3);
- OrderedGroupedMergedKVInput input = new OrderedGroupedMergedKVInput(createMergedInputContext(), sInputs);
+ OrderedGroupedMergedKVInput input = new OrderedGroupedMergedKVInput(
+ createMergedInputContext(), sInputs);
KeyValuesReader kvsReader = input.getReader();
int keyCount = 0;
@@ -384,8 +384,8 @@ public class TestSortedGroupedMergedInput {
sInputs.add(sInput1);
sInputs.add(sInput2);
sInputs.add(sInput3);
- ConcatenatedMergedKeyValueInput input =
- new ConcatenatedMergedKeyValueInput(createMergedInputContext(), sInputs);
+ MergedInputContext mockContext = createMergedInputContext();
+ ConcatenatedMergedKeyValueInput input = new ConcatenatedMergedKeyValueInput(mockContext, sInputs);
KeyValueReader kvReader = input.getReader();
int keyCount = 0;
@@ -395,6 +395,7 @@ public class TestSortedGroupedMergedInput {
Integer value = (Integer) kvReader.getCurrentValue();
}
assertTrue(keyCount == 30);
+ verify(mockContext, times(4)).notifyProgress(); // one for each reader change and one to exit
getNextFromFinishedReader(kvReader);
}
@@ -418,8 +419,8 @@ public class TestSortedGroupedMergedInput {
sInputs.add(sInput1);
sInputs.add(sInput2);
sInputs.add(sInput3);
- ConcatenatedMergedKeyValuesInput input =
- new ConcatenatedMergedKeyValuesInput(createMergedInputContext(), sInputs);
+ MergedInputContext mockContext = createMergedInputContext();
+ ConcatenatedMergedKeyValuesInput input = new ConcatenatedMergedKeyValuesInput(mockContext, sInputs);
KeyValuesReader kvsReader = input.getReader();
int keyCount = 0;
@@ -435,6 +436,7 @@ public class TestSortedGroupedMergedInput {
assertEquals(2, valCount);
}
assertEquals(9, keyCount);
+ verify(mockContext, times(4)).notifyProgress(); // one for each reader change and one to exit
getNextFromFinishedReader(kvsReader);
}
[2/2] tez git commit: TEZ-2918. Make progress notifications in IOs
(bikas)
Posted by bi...@apache.org.
TEZ-2918. Make progress notifications in IOs (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5ec498d8
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5ec498d8
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5ec498d8
Branch: refs/heads/master
Commit: 5ec498d8f5b6cecf4eaee8f995ea2cf9ca2acfcc
Parents: 344a8cc
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Nov 13 05:54:09 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Nov 13 05:54:09 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/runtime/api/MergedInputContext.java | 5 +
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 14 +-
.../org/apache/tez/mapreduce/input/MRInput.java | 10 +-
.../tez/mapreduce/input/MultiMRInput.java | 4 +-
.../org/apache/tez/mapreduce/lib/MRReader.java | 16 ++
.../tez/mapreduce/lib/MRReaderMapReduce.java | 10 +-
.../tez/mapreduce/lib/MRReaderMapred.java | 11 +-
.../apache/tez/mapreduce/output/MROutput.java | 1 +
.../apache/tez/mapreduce/input/TestMRInput.java | 3 +
.../tez/mapreduce/input/TestMultiMRInput.java | 4 +
.../tez/mapreduce/lib/TestKVReadersWithMR.java | 12 +-
.../tez/mapreduce/output/TestMROutput.java | 161 +++++++++++++++++++
.../runtime/LogicalIOProcessorRuntimeTask.java | 2 +-
.../org/apache/tez/runtime/RuntimeTask.java | 9 +-
.../api/impl/TezMergedInputContextImpl.java | 11 +-
.../runtime/api/impl/TezTaskContextImpl.java | 2 +-
.../tez/runtime/TestInputReadyTracker.java | 6 +-
.../common/readers/UnorderedKVReader.java | 8 +-
.../library/common/shuffle/ShuffleUtils.java | 1 +
.../common/shuffle/impl/ShuffleManager.java | 2 +
.../shuffle/orderedgrouped/MergeManager.java | 35 ++--
.../common/shuffle/orderedgrouped/Shuffle.java | 4 +-
.../orderedgrouped/ShuffleScheduler.java | 3 +-
.../common/sort/impl/ExternalSorter.java | 11 +-
.../common/sort/impl/PipelinedSorter.java | 10 +-
.../common/sort/impl/dflt/DefaultSorter.java | 10 +-
.../writers/UnorderedPartitionedKVWriter.java | 9 +-
.../input/ConcatenatedMergedKeyValueInput.java | 2 +
.../input/ConcatenatedMergedKeyValuesInput.java | 2 +
.../library/input/OrderedGroupedKVInput.java | 8 +-
.../input/OrderedGroupedMergedKVInput.java | 8 +-
.../runtime/library/input/UnorderedKVInput.java | 3 +-
.../common/readers/TestUnorderedKVReader.java | 5 +-
.../orderedgrouped/TestMergeManager.java | 3 +
.../orderedgrouped/TestShuffleScheduler.java | 2 +
.../common/sort/impl/TestPipelinedSorter.java | 38 ++---
.../sort/impl/dflt/TestDefaultSorter.java | 2 +
.../TestUnorderedPartitionedKVWriter.java | 4 +-
.../input/TestSortedGroupedMergedInput.java | 26 +--
40 files changed, 383 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3ce1640..52c73da 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
TEZ-2679. Admin forms of launch env settings
ALL CHANGES:
+ TEZ-2918. Make progress notifications in IOs
TEZ-2940. Invalid shuffle max slow start setting causes vertex to hang indefinitely
TEZ-2930. Tez UI: Parent controller is not polling at times
TEZ-1670. Add tests for all converter functions in HistoryEventTimelineConversion.
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java
index 41c519b..65bb087 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java
@@ -42,6 +42,11 @@ public interface MergedInputContext {
public void inputIsReady();
/**
+ * Inform the framework that progress has been made
+ */
+ public void notifyProgress();
+
+ /**
* Get the work directories for the Input
* @return an array of work dirs
*/
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 27eb69b..bfd1634 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -193,6 +193,7 @@ public class TaskAttemptImpl implements TaskAttempt,
org.apache.tez.runtime.api.impl.TaskStatistics statistics;
long lastNotifyProgressTimestamp = 0;
+ private final long hungIntervalMax;
// Used to store locality information when
Set<String> taskHosts = new HashSet<String>();
@@ -488,6 +489,10 @@ public class TaskAttemptImpl implements TaskAttempt,
this.taskResource = resource;
this.containerContext = containerContext;
this.leafVertex = leafVertex;
+ this.hungIntervalMax = conf.getLong(
+ TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS,
+ TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS_DEFAULT);
+
}
@Override
@@ -1378,14 +1383,11 @@ public class TaskAttemptImpl implements TaskAttempt,
ta.lastNotifyProgressTimestamp = ta.clock.getTime();
} else {
long currTime = ta.clock.getTime();
- long hungIntervalMax = ta.conf.getLong(
- TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS,
- TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS_DEFAULT);
- if (hungIntervalMax > 0 &&
- currTime - ta.lastNotifyProgressTimestamp > hungIntervalMax) {
+ if (ta.hungIntervalMax > 0 &&
+ currTime - ta.lastNotifyProgressTimestamp > ta.hungIntervalMax) {
// task is hung
String diagnostics = "Attempt failed because it appears to make no progress for " +
- hungIntervalMax + "ms";
+ ta.hungIntervalMax + "ms";
LOG.info(diagnostics + " " + ta.getID());
// send event that will fail this attempt
ta.sendEvent(
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 93161cb..b68d135 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -461,9 +461,10 @@ public class MRInput extends MRInputBase {
mrReader = new MRReaderMapReduce(jobConf, getContext().getCounters(), inputRecordCounter,
getContext().getApplicationId().getClusterTimestamp(), getContext()
.getTaskVertexIndex(), getContext().getApplicationId().getId(), getContext()
- .getTaskIndex(), getContext().getTaskAttemptNumber());
+ .getTaskIndex(), getContext().getTaskAttemptNumber(), getContext());
} else {
- mrReader = new MRReaderMapred(jobConf, getContext().getCounters(), inputRecordCounter);
+ mrReader = new MRReaderMapred(jobConf, getContext().getCounters(), inputRecordCounter,
+ getContext());
}
} else {
TaskSplitMetaInfo[] allMetaInfo = MRInputUtils.readSplits(jobConf);
@@ -477,14 +478,14 @@ public class MRInput extends MRInputBase {
mrReader = new MRReaderMapReduce(jobConf, newInputSplit, getContext().getCounters(),
inputRecordCounter, getContext().getApplicationId().getClusterTimestamp(),
getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(),
- getContext().getTaskIndex(), getContext().getTaskAttemptNumber());
+ getContext().getTaskIndex(), getContext().getTaskAttemptNumber(), getContext());
} else {
org.apache.hadoop.mapred.InputSplit oldInputSplit = MRInputUtils
.getOldSplitDetailsFromDisk(splitMetaInfo, jobConf, getContext().getCounters()
.findCounter(TaskCounter.SPLIT_RAW_BYTES));
mrReader =
new MRReaderMapred(jobConf, oldInputSplit, getContext().getCounters(),
- inputRecordCounter);
+ inputRecordCounter, getContext());
}
}
} finally {
@@ -508,6 +509,7 @@ public class MRInput extends MRInputBase {
return new KeyValueReader() {
@Override
public boolean next() throws IOException {
+ getContext().notifyProgress();
return false;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
index 4a792dc..2b60f29 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
@@ -177,7 +177,7 @@ public class MultiMRInput extends MRInputBase {
getContext().getCounters(), inputRecordCounter, getContext().getApplicationId()
.getClusterTimestamp(), getContext().getTaskVertexIndex(), getContext()
.getApplicationId().getId(), getContext().getTaskIndex(), getContext()
- .getTaskAttemptNumber());
+ .getTaskAttemptNumber(), getContext());
if (LOG.isDebugEnabled()) {
LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " +
split.getClass().getName() + ", NewSplit: " + split);
@@ -186,7 +186,7 @@ public class MultiMRInput extends MRInputBase {
} else {
split = MRInputUtils.getOldSplitDetailsFromEvent(splitProto, localJobConf);
reader = new MRReaderMapred(localJobConf, (org.apache.hadoop.mapred.InputSplit) split,
- getContext().getCounters(), inputRecordCounter);
+ getContext().getCounters(), inputRecordCounter, getContext());
if (LOG.isDebugEnabled()) {
LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " +
split.getClass().getName() + ", OldSplit: " + split);
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java
index 8a20827..aa35fec 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java
@@ -21,14 +21,30 @@ package org.apache.tez.mapreduce.lib;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.api.KeyValueReader;
@Private
public abstract class MRReader extends KeyValueReader {
+
+ private final InputContext context;
+
public abstract void setSplit(Object split) throws IOException;
public abstract boolean isSetup();
public abstract float getProgress() throws IOException, InterruptedException;
public abstract void close() throws IOException;
public abstract Object getSplit();
public abstract Object getRecordReader();
+
+ public MRReader(InputContext context) {
+ this.context = context;
+ }
+
+ protected final void notifyProgress() {
+ context.notifyProgress();
+ }
+
+ protected final void notifyDone() {
+ context.notifyProgress();
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
index 5fc3e49..10b871e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
@@ -20,6 +20,7 @@ package org.apache.tez.mapreduce.lib;
import java.io.IOException;
+import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,15 +52,16 @@ public class MRReaderMapReduce extends MRReader {
private boolean setupComplete = false;
public MRReaderMapReduce(JobConf jobConf, TezCounters tezCounters, TezCounter inputRecordCounter,
- long clusterId, int vertexIndex, int appId, int taskIndex, int taskAttemptNumber)
+ long clusterId, int vertexIndex, int appId, int taskIndex, int taskAttemptNumber, InputContext context)
throws IOException {
this(jobConf, null, tezCounters, inputRecordCounter, clusterId, vertexIndex, appId, taskIndex,
- taskAttemptNumber);
+ taskAttemptNumber, context);
}
public MRReaderMapReduce(JobConf jobConf, InputSplit inputSplit, TezCounters tezCounters,
TezCounter inputRecordCounter, long clusterId, int vertexIndex, int appId, int taskIndex,
- int taskAttemptNumber) throws IOException {
+ int taskAttemptNumber, InputContext context) throws IOException {
+ super(context);
this.inputRecordCounter = inputRecordCounter;
this.taskAttemptContext = new TaskAttemptContextImpl(jobConf, tezCounters, clusterId,
vertexIndex, appId, taskIndex, taskAttemptNumber, true, null);
@@ -121,9 +123,11 @@ public class MRReaderMapReduce extends MRReader {
}
if (hasNext) {
inputRecordCounter.increment(1);
+ notifyProgress();
} else {
hasCompletedProcessing();
completedProcessing = true;
+ notifyDone();
}
return hasNext;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
index 1bf71f6..d81debb 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
@@ -33,6 +33,7 @@ import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.runtime.api.InputContext;
import com.google.common.base.Preconditions;
@@ -56,13 +57,15 @@ public class MRReaderMapred extends MRReader {
private boolean setupComplete = false;
- public MRReaderMapred(JobConf jobConf, TezCounters tezCounters, TezCounter inputRecordCounter)
+ public MRReaderMapred(JobConf jobConf, TezCounters tezCounters, TezCounter inputRecordCounter,
+ InputContext context)
throws IOException {
- this(jobConf, null, tezCounters, inputRecordCounter);
+ this(jobConf, null, tezCounters, inputRecordCounter, context);
}
public MRReaderMapred(JobConf jobConf, InputSplit inputSplit, TezCounters tezCounters,
- TezCounter inputRecordCounter) throws IOException {
+ TezCounter inputRecordCounter, InputContext context) throws IOException {
+ super(context);
this.jobConf = jobConf;
this.tezCounters = tezCounters;
this.inputRecordCounter = inputRecordCounter;
@@ -113,9 +116,11 @@ public class MRReaderMapred extends MRReader {
boolean hasNext = recordReader.next(key, value);
if (hasNext) {
inputRecordCounter.increment(1);
+ notifyProgress();
} else {
hasCompletedProcessing();
completedProcessing = true;
+ notifyDone();
}
// The underlying reader does not throw InterruptedExceptions. Cannot convert to an
// IOInterruptedException without checking the interrupt flag on each request, which is also
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 12e5092..ec83bf5 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -510,6 +510,7 @@ public class MROutput extends AbstractLogicalOutput {
oldRecordWriter.write(key, value);
}
outputRecordCounter.increment(1);
+ getContext().notifyProgress();
}
};
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
index 50114b9..448b90c 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
@@ -19,6 +19,8 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.LinkedList;
@@ -57,6 +59,7 @@ public class TestMRInput {
mrInput.start();
assertFalse(mrInput.getReader().next());
+ verify(inputContext, times(1)).notifyProgress();
List<Event> events = new LinkedList<>();
try {
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
index 80e3e77..db5643e 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.ArrayList;
@@ -155,9 +157,11 @@ public class TestMultiMRInput {
input.handleEvents(eventList);
int readerCount = 0;
+ int recordCount = 0;
for (KeyValueReader reader : input.getKeyValueReaders()) {
readerCount++;
while (reader.next()) {
+ verify(inputContext, times(++recordCount) ).notifyProgress();
if (data1.size() == 0) {
fail("Found more records than expected");
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java
index 65f5ad0..dad18de 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.api.InputContext;
import org.junit.Before;
import org.junit.Test;
@@ -32,6 +33,9 @@ import java.io.IOException;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
public class TestKVReadersWithMR {
@@ -60,12 +64,14 @@ public class TestKVReadersWithMR {
}
public void testWithSpecificNumberOfKV(int kvPairs) throws IOException {
- MRReaderMapred reader = new MRReaderMapred(conf, counters, inputRecordCounter);
+ InputContext mockContext = mock(InputContext.class);
+ MRReaderMapred reader = new MRReaderMapred(conf, counters, inputRecordCounter, mockContext);
reader.recordReader = new DummyRecordReader(kvPairs);
int records = 0;
while (reader.next()) {
records++;
+ verify(mockContext, times(records)).notifyProgress();
}
assertTrue(kvPairs == records);
@@ -80,13 +86,15 @@ public class TestKVReadersWithMR {
}
public void testWithSpecificNumberOfKV_MapReduce(int kvPairs) throws IOException {
+ InputContext mockContext = mock(InputContext.class);
MRReaderMapReduce reader = new MRReaderMapReduce(conf, counters, inputRecordCounter, -1, 1,
- 10, 20, 30);
+ 10, 20, 30, mockContext);
reader.recordReader = new DummyRecordReaderMapReduce(kvPairs);
int records = 0;
while (reader.next()) {
records++;
+ verify(mockContext, times(records)).notifyProgress();
}
assertTrue(kvPairs == records);
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
index b898fe0..0129a8b 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
@@ -20,21 +20,52 @@ package org.apache.tez.mapreduce.output;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Writer;
+import java.util.HashMap;
+import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.mapreduce.TestUmbilical;
+import org.apache.tez.mapreduce.TezTestUtils;
import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+import org.junit.Ignore;
import org.junit.Test;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+
public class TestMROutput {
@@ -144,4 +175,134 @@ public class TestMROutput {
when(outputContext.getCounters()).thenReturn(new TezCounters());
return outputContext;
}
+
+ public static LogicalIOProcessorRuntimeTask createLogicalTask(
+ Configuration conf,
+ TezUmbilical umbilical, String dagName,
+ String vertexName) throws Exception {
+ ProcessorDescriptor procDesc = ProcessorDescriptor.create(TestProcessor.class.getName());
+ List<InputSpec> inputSpecs = Lists.newLinkedList();
+ List<OutputSpec> outputSpecs = Lists.newLinkedList();
+ outputSpecs.add(new OutputSpec("Null",
+ MROutput.createConfigBuilder(conf, TestOutputFormat.class).build().getOutputDescriptor(), 1));
+
+ TaskSpec taskSpec = new TaskSpec(
+ TezTestUtils.getMockTaskAttemptId(0, 0, 0, 0),
+ dagName, vertexName, -1,
+ procDesc,
+ inputSpecs,
+ outputSpecs, null);
+
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path workDir =
+ new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+ "TestMapOutput").makeQualified(fs.getUri(), fs.getWorkingDirectory());
+
+ LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask(
+ taskSpec,
+ 0,
+ conf,
+ new String[] {workDir.toString()},
+ umbilical,
+ null,
+ new HashMap<String, String>(),
+ HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"),
+ Runtime.getRuntime().maxMemory(), true);
+ return task;
+ }
+
+ public static class TestOutputCommitter extends OutputCommitter {
+
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException {
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskContext) throws IOException {
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskContext) throws IOException {
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskContext) throws IOException {
+ }
+
+ }
+
+ public static class TestOutputFormat extends OutputFormat<String, String> {
+ public static class TestRecordWriter extends RecordWriter<String, String> {
+ Writer writer;
+ boolean doWrite;
+ TestRecordWriter(boolean write) throws IOException {
+ this.doWrite = write;
+ if (doWrite) {
+ File f = File.createTempFile("test", null);
+ f.deleteOnExit();
+ writer = new BufferedWriter(new FileWriter(f));
+ }
+ }
+
+ @Override
+ public void write(String key, String value) throws IOException, InterruptedException {
+ if (doWrite) {
+ writer.write(key);
+ writer.write(value);
+ }
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ writer.close();
+ }
+
+ }
+
+ @Override
+ public RecordWriter<String, String> getRecordWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new TestRecordWriter(true);
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new TestOutputCommitter();
+ }
+ }
+
+ public static class TestProcessor extends SimpleProcessor {
+ public TestProcessor(ProcessorContext context) {
+ super(context);
+ }
+
+ @Override
+ public void run() throws Exception {
+ KeyValueWriter writer = (KeyValueWriter) getOutputs().values().iterator().next().getWriter();
+ for (int i=0; i<1000000; ++i) {
+ writer.write("key", "value");
+ }
+ }
+
+ }
+
+ @Ignore
+ @Test
+ public void testPerf() throws Exception {
+ LogicalIOProcessorRuntimeTask task = createLogicalTask(new Configuration(),
+ new TestUmbilical(), "dag", "vertex");
+ task.initialize();
+ task.run();
+ task.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 4b00c97..6b9b016 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -535,7 +535,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
MergedInputContext mergedInputContext =
new TezMergedInputContextImpl(groupInputSpec.getMergedInputDescriptor().getUserPayload(),
- groupInputSpec.getGroupName(), groupInputsMap, inputReadyTracker, localDirs);
+ groupInputSpec.getGroupName(), groupInputsMap, inputReadyTracker, localDirs, this);
List<Input> inputs = Lists.newArrayListWithCapacity(groupInputSpec.getGroupVertices().size());
for (String groupVertex : groupInputSpec.getGroupVertices()) {
inputs.add(inputsMap.get(groupVertex));
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index 23e57b1..529dde0 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -54,7 +54,7 @@ public abstract class RuntimeTask {
private final AtomicBoolean taskDone;
private final TaskCounterUpdater counterUpdater;
private final TaskStatistics statistics;
- private volatile boolean progressNotified;
+ private final AtomicBoolean progressNotified = new AtomicBoolean(false);
protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf,
TezUmbilical tezUmbilical, String pid, boolean setupSysCounterUpdater) {
@@ -105,13 +105,12 @@ public abstract class RuntimeTask {
this.fatalErrorMessage = message;
}
- public void notifyProgressInvocation() {
- progressNotified = true;
+ public final void notifyProgressInvocation() {
+ progressNotified.lazySet(true);
}
public boolean getAndClearProgressNotification() {
- boolean retVal = progressNotified;
- progressNotified = false;
+ boolean retVal = progressNotified.getAndSet(false);
return retVal;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
index 74592c6..e35e332 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.InputReadyTracker;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.runtime.api.MergedLogicalInput;
import org.apache.tez.runtime.api.MergedInputContext;
@@ -38,10 +39,12 @@ public class TezMergedInputContextImpl implements MergedInputContext {
private final Map<String, MergedLogicalInput> groupInputsMap;
private final InputReadyTracker inputReadyTracker;
private final String[] workDirs;
+ private final LogicalIOProcessorRuntimeTask runtimeTask;
public TezMergedInputContextImpl(@Nullable UserPayload userPayload, String groupInputName,
Map<String, MergedLogicalInput> groupInputsMap,
- InputReadyTracker inputReadyTracker, String[] workDirs) {
+ InputReadyTracker inputReadyTracker, String[] workDirs,
+ LogicalIOProcessorRuntimeTask runtimeTask) {
checkNotNull(groupInputName, "groupInputName is null");
checkNotNull(groupInputsMap, "input-group map is null");
checkNotNull(inputReadyTracker, "inputReadyTracker is null");
@@ -50,6 +53,7 @@ public class TezMergedInputContextImpl implements MergedInputContext {
this.userPayload = userPayload;
this.inputReadyTracker = inputReadyTracker;
this.workDirs = workDirs;
+ this.runtimeTask = runtimeTask;
}
@Override
@@ -67,4 +71,9 @@ public class TezMergedInputContextImpl implements MergedInputContext {
return Arrays.copyOf(workDirs, workDirs.length);
}
+ @Override
+ public final void notifyProgress() {
+ runtimeTask.notifyProgressInvocation();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index 211f9d7..c12b334 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -174,7 +174,7 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable {
}
@Override
- public void notifyProgress() {
+ public final void notifyProgress() {
runtimeTask.notifyProgressInvocation();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
index a77e38f..29c5023 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
@@ -156,8 +156,10 @@ public class TestInputReadyTracker {
group2Inputs.add(input4);
Map<String, MergedLogicalInput> mergedInputMap = new HashMap<String, MergedLogicalInput>();
- MergedInputContext mergedInputContext1 = new TezMergedInputContextImpl(null, "group1", mergedInputMap, inputReadyTracker, null);
- MergedInputContext mergedInputContext2 = new TezMergedInputContextImpl(null, "group2", mergedInputMap, inputReadyTracker, null);
+ MergedInputContext mergedInputContext1 = new TezMergedInputContextImpl(
+ null, "group1", mergedInputMap, inputReadyTracker, null, null);
+ MergedInputContext mergedInputContext2 = new TezMergedInputContextImpl(
+ null, "group2", mergedInputMap, inputReadyTracker, null, null);
AnyOneMergedInputForTest group1 = new AnyOneMergedInputForTest(mergedInputContext1, group1Inputs);
AllMergedInputForTest group2 = new AllMergedInputForTest(mergedInputContext2, group2Inputs);
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
index a8dd1b2..57bb121 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
@@ -20,6 +20,7 @@ package org.apache.tez.runtime.library.common.readers;
import java.io.IOException;
+import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,6 +62,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
private final int ifileBufferSize;
private final TezCounter inputRecordCounter;
+ private final InputContext context;
private K key;
private V value;
@@ -75,10 +77,10 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
public UnorderedKVReader(ShuffleManager shuffleManager, Configuration conf,
CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int ifileBufferSize,
- TezCounter inputRecordCounter)
+ TezCounter inputRecordCounter, InputContext context)
throws IOException {
this.shuffleManager = shuffleManager;
-
+ this.context = context;
this.codec = codec;
this.ifileReadAhead = ifileReadAhead;
this.ifileReadAheadLength = ifileReadAheadLength;
@@ -113,6 +115,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
public boolean next() throws IOException {
if (readNextFromCurrentReader()) {
inputRecordCounter.increment(1);
+ context.notifyProgress();
numRecordsRead++;
return true;
} else {
@@ -120,6 +123,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
while (nextInputExists) {
if(readNextFromCurrentReader()) {
inputRecordCounter.increment(1);
+ context.notifyProgress();
numRecordsRead++;
return true;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 818cfaa..431ba38 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -397,6 +397,7 @@ public class ShuffleUtils {
@Nullable long[] partitionStats) throws IOException {
Preconditions.checkArgument(eventList != null, "EventList can't be null");
+ context.notifyProgress();
if (finalMergeEnabled) {
Preconditions.checkArgument(isLastEvent, "Can not send multiple events when final merge is "
+ "enabled");
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index a7c1c59..b3e050a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -604,6 +604,7 @@ public class ShuffleManager implements FetcherCallback {
lock.unlock();
}
+ inputContext.notifyProgress();
boolean committed = false;
if (!completedInputSet.contains(inputIdentifier)) {
synchronized (completedInputSet) {
@@ -760,6 +761,7 @@ public class ShuffleManager implements FetcherCallback {
+ "InputIdentifier: " + srcAttemptIdentifier + ", connectFailed: "
+ connectFailed);
failedShufflesCounter.increment(1);
+ inputContext.notifyProgress();
if (srcAttemptIdentifier == null) {
reportFatalError(null, "Received fetchFailure for an unknown src (null)");
} else {
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index fb9b243..61ff338 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -84,7 +84,12 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
private final LocalDirAllocator localDirAllocator;
private final TezTaskOutputFiles mapOutputFile;
- private final Progressable nullProgressable = new NullProgressable();
+ private final Progressable progressable = new Progressable() {
+ @Override
+ public void progress() {
+ inputContext.notifyProgress();
+ }
+ };
private final Combiner combiner;
private final Set<MapOutput> inMemoryMergedMapOutputs =
@@ -624,6 +629,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
return;
}
+ inputContext.notifyProgress();
InputAttemptIdentifier dummyMapId = inputs.get(0).getAttemptIdentifier();
List<Segment> inMemorySegments = new ArrayList<Segment>();
@@ -650,8 +656,8 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
inMemorySegments, inMemorySegments.size(),
new Path(inputContext.getUniqueIdentifier()),
(RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
- nullProgressable, null, null, null, null);
- TezMerger.writeFile(rIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
+ progressable, null, null, null, null);
+ TezMerger.writeFile(rIter, writer, progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
writer.close();
LOG.info(inputContext.getSourceVertexName() +
@@ -697,6 +703,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
}
numMemToDiskMerges.increment(1);
+ inputContext.notifyProgress();
//name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to
@@ -716,7 +723,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
// TODO Maybe track serialized vs deserialized bytes.
- // All disk writes done by this merge are overhead - due to the lac of
+ // All disk writes done by this merge are overhead - due to the lack of
// adequate memory to keep all segments in memory.
outputPath = mapOutputFile.getInputFileForWrite(
srcTaskIdentifier.getInputIdentifier().getInputIndex(), srcTaskIdentifier.getSpillEventId(),
@@ -742,13 +749,13 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
(Class)ConfigUtils.getIntermediateInputValueClass(conf),
inMemorySegments, inMemorySegments.size(),
tmpDir, (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
- nullProgressable, spilledRecordsCounter, null, additionalBytesRead, null);
+ progressable, spilledRecordsCounter, null, additionalBytesRead, null);
// spilledRecordsCounter is tracking the number of keys that will be
// read from each of the segments being merged - which is essentially
// what will be written to disk.
if (null == combiner) {
- TezMerger.writeFile(rIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
+ TezMerger.writeFile(rIter, writer, progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
} else {
// TODO Counters for Combine
runCombineProcessor(rIter, writer);
@@ -818,7 +825,8 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
return;
}
numDiskToDiskMerges.increment(1);
-
+ inputContext.notifyProgress();
+
long approxOutputSize = 0;
int bytesPerSum =
conf.getInt("io.bytes.per.checksum", 512);
@@ -879,13 +887,13 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
inputSegments,
ioSortFactor, tmpDir,
(RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
- nullProgressable, true, spilledRecordsCounter, null,
+ progressable, true, spilledRecordsCounter, null,
mergedMapOutputsCounter, null);
// TODO Maybe differentiate between data written because of Merges and
// the finalMerge (i.e. final mem available may be different from
// initial merge mem)
- TezMerger.writeFile(iter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
+ TezMerger.writeFile(iter, writer, progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
writer.close();
additionalBytesWritten.increment(writer.getCompressedLength());
} catch (IOException e) {
@@ -1010,6 +1018,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
}
}
+ inputContext.notifyProgress();
// merge config params
Class keyClass = (Class)ConfigUtils.getIntermediateInputKeyClass(job);
@@ -1046,12 +1055,12 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
mapOutputFile.getInputFileForWrite(srcTaskId, Integer.MAX_VALUE,
inMemToDiskBytes).suffix(Constants.MERGED_OUTPUT_PREFIX);
final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs, keyClass, valueClass,
- memDiskSegments, numMemDiskSegments, tmpDir, comparator, nullProgressable,
+ memDiskSegments, numMemDiskSegments, tmpDir, comparator, progressable,
spilledRecordsCounter, null, additionalBytesRead, null);
final Writer writer = new Writer(job, fs, outputPath,
keyClass, valueClass, codec, null, null);
try {
- TezMerger.writeFile(rIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
+ TezMerger.writeFile(rIter, writer, progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
} catch (IOException e) {
if (null != outputPath) {
try {
@@ -1127,7 +1136,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
TezRawKeyValueIterator diskMerge = TezMerger.merge(
job, fs, keyClass, valueClass, codec, diskSegments,
ioSortFactor, numInMemSegments, tmpDir, comparator,
- nullProgressable, false, spilledRecordsCounter, null, additionalBytesRead, null);
+ progressable, false, spilledRecordsCounter, null, additionalBytesRead, null);
diskSegments.clear();
if (0 == finalSegments.size()) {
return diskMerge;
@@ -1138,7 +1147,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
// This is doing nothing but creating an iterator over the segments.
return TezMerger.merge(job, fs, keyClass, valueClass,
finalSegments, finalSegments.size(), tmpDir,
- comparator, nullProgressable, spilledRecordsCounter, null,
+ comparator, progressable, spilledRecordsCounter, null,
additionalBytesRead, null);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index b5dcd4c..de3b2cb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -309,6 +309,7 @@ public class Shuffle implements ExceptionReporter {
// Finish the on-going merges...
TezRawKeyValueIterator kvIter = null;
+ inputContext.notifyProgress();
try {
kvIter = merger.close();
} catch (Throwable e) {
@@ -317,7 +318,8 @@ public class Shuffle implements ExceptionReporter {
throw new ShuffleError("Error while doing final merge ", e);
}
mergePhaseTime.setValue(System.currentTimeMillis() - startTime);
-
+
+ inputContext.notifyProgress();
// Sanity check
synchronized (Shuffle.this) {
if (throwable.get() != null) {
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 22da46c..dcfb274 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -466,6 +466,7 @@ class ShuffleScheduler {
boolean isLocalFetch
) throws IOException {
+ inputContext.notifyProgress();
if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex())) {
if (!isLocalFetch) {
/**
@@ -626,7 +627,7 @@ class ShuffleScheduler {
boolean connectError,
boolean isLocalFetch) {
failedShuffleCounter.increment(1);
-
+ inputContext.notifyProgress();
int failures = incrementAndGetFailureAttempt(srcAttempt);
if (!isLocalFetch) {
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index aa521ea..7a2dc68 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -59,7 +59,6 @@ import org.apache.tez.runtime.library.common.combine.Combiner;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
-import org.apache.tez.runtime.library.hadoop.compat.NullProgressable;
import com.google.common.base.Preconditions;
@@ -72,6 +71,7 @@ public abstract class ExternalSorter {
spillFileIndexPaths.clear();
spillFilePaths.clear();
reportStatistics();
+ outputContext.notifyProgress();
}
public abstract void flush() throws IOException;
@@ -86,7 +86,13 @@ public abstract class ExternalSorter {
}
}
- protected final Progressable nullProgressable = new NullProgressable();
+ protected final Progressable progressable = new Progressable() {
+ @Override
+ public void progress() {
+ outputContext.notifyProgress();
+ }
+ };
+
protected final OutputContext outputContext;
protected final Combiner combiner;
protected final Partitioner partitioner;
@@ -298,6 +304,7 @@ public abstract class ExternalSorter {
protected void runCombineProcessor(TezRawKeyValueIterator kvIter,
Writer writer) throws IOException {
try {
+ outputContext.notifyProgress();
combiner.combine(kvIter, writer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 2d53a2e..33a65d2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -429,6 +429,7 @@ public class PipelinedSorter extends ExternalSorter {
span.kvmeta.put(valstart);
span.kvmeta.put(valend - valstart);
mapOutputRecordCounter.increment(1);
+ outputContext.notifyProgress();
mapOutputByteCounter.increment(valend - keystart);
}
@@ -545,6 +546,7 @@ public class PipelinedSorter extends ExternalSorter {
if (isThreadInterrupted()) {
return false;
}
+ outputContext.notifyProgress();
TezRawKeyValueIterator kvIter = merger.filter(i);
//write merged output to disk
long segmentStart = out.getPos();
@@ -611,6 +613,7 @@ public class PipelinedSorter extends ExternalSorter {
public void flush() throws IOException {
final String uniqueIdentifier = outputContext.getUniqueIdentifier();
+ outputContext.notifyProgress();
/**
* Possible that the thread got interrupted when flush was happening or when the flush was
* never invoked. As a part of cleanup activity in TezTaskRunner, it would invoke close()
@@ -698,6 +701,7 @@ public class PipelinedSorter extends ExternalSorter {
}
numShuffleChunks.setValue(numSpills);
fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen());
+ // ??? why are events not being sent here?
return;
}
@@ -742,7 +746,7 @@ public class PipelinedSorter extends ExternalSorter {
segmentList, mergeFactor,
new Path(uniqueIdentifier),
(RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf),
- nullProgressable, sortSegments, true,
+ progressable, sortSegments, true,
null, spilledRecordsCounter, additionalSpillBytesRead,
null); // Not using any Progress in TezMerger. Should just work.
//write merged output to disk
@@ -751,7 +755,7 @@ public class PipelinedSorter extends ExternalSorter {
new Writer(conf, finalOut, keyClass, valClass, codec,
spilledRecordsCounter, null, merger.needsRLE());
if (combiner == null || numSpills < minSpillsForCombine) {
- TezMerger.writeFile(kvIter, writer, nullProgressable,
+ TezMerger.writeFile(kvIter, writer, progressable,
TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
} else {
runCombineProcessor(kvIter, writer);
@@ -893,7 +897,7 @@ public class PipelinedSorter extends ExternalSorter {
public SpanIterator sort(IndexedSorter sorter) {
long start = System.currentTimeMillis();
if(length() > 1) {
- sorter.sort(this, 0, length(), nullProgressable);
+ sorter.sort(this, 0, length(), progressable);
}
LOG.info(outputContext.getDestinationVertexName() + ": " + "done sorting span=" + index + ", length=" + length() + ", "
+ "time=" + (System.currentTimeMillis() - start));
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index a833228..67da617 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -347,6 +347,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
int valend = bb.markRecord();
mapOutputRecordCounter.increment(1);
+ outputContext.notifyProgress();
mapOutputByteCounter.increment(
distanceTo(keystart, valend, bufvoid));
@@ -662,6 +663,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
@Override
public void flush() throws IOException {
LOG.info(outputContext.getDestinationVertexName() + ": " + "Starting flush of map output");
+ outputContext.notifyProgress();
if (Thread.currentThread().isInterrupted()) {
/**
* Possible that the thread got interrupted when flush was happening or when the flush was
@@ -710,6 +712,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
sameKeyCount = sameKey;
totalKeysCount = totalKeys;
}
+ outputContext.notifyProgress();
sortAndSpill(sameKeyCount, totalKeysCount);
}
} catch (InterruptedException e) {
@@ -835,7 +838,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
throws IOException, InterruptedException {
final int mstart = getMetaStart();
final int mend = getMetaEnd();
- sorter.sort(this, mstart, mend, nullProgressable);
+ sorter.sort(this, mstart, mend, progressable);
spill(mstart, mend, sameKeyCount, totalKeysCount);
}
@@ -1281,6 +1284,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
List<Segment> segmentList =
new ArrayList<Segment>(numSpills);
for(int i = 0; i < numSpills; i++) {
+ outputContext.notifyProgress();
TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
Segment s =
@@ -1309,7 +1313,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
segmentList, mergeFactor,
new Path(taskIdentifier),
(RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf),
- nullProgressable, sortSegments, true,
+ progressable, sortSegments, true,
null, spilledRecordsCounter, additionalSpillBytesRead,
null); // Not using any Progress in TezMerger. Should just work.
@@ -1320,7 +1324,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
spilledRecordsCounter, null);
if (combiner == null || numSpills < minSpillsForCombine) {
TezMerger.writeFile(kvIter, writer,
- nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
+ progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
} else {
runCombineProcessor(kvIter, writer);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 70b345f..ce410be 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -248,8 +248,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
throw new IOException("Exception during spill", new IOException(spillException));
}
if (skipBuffers) {
- //special case, where we have only one partition and pipeliing is disabled.
- writer.append(key, value);
+ //special case, where we have only one partition and pipelining is disabled.
+ writer.append(key, value); // ???? Why is outputrecordscounter not updated here?
+ outputContext.notifyProgress();
} else {
int partition = partitioner.getPartition(key, value, numPartitions);
write(key, value, partition);
@@ -319,6 +320,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
outputRecordBytesCounter.increment(currentBuffer.nextPosition - (metaStart + META_SIZE));
outputBytesWithOverheadCounter.increment((currentBuffer.nextPosition - metaStart) + metaSkip);
outputRecordsCounter.increment(1);
+ outputContext.notifyProgress();
currentBuffer.partitionPositions[partition] = metaStart;
currentBuffer.recordsPerPartition[partition]++;
currentBuffer.numRecords++;
@@ -407,6 +409,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
long compressedLength = 0;
for (int i = 0; i < numPartitions; i++) {
IFile.Writer writer = null;
+ outputContext.notifyProgress();
try {
long segmentStart = out.getPos();
if (wrappedBuffer.partitionPositions[i] == WrappedBuffer.PARTITION_ABSENT_POSITION) {
@@ -556,6 +559,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
boolean isLastSpill, String pathComponent, BitSet emptyPartitions)
throws IOException {
+ outputContext.notifyProgress();
DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
.newBuilder();
@@ -710,6 +714,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
}
synchronized (spillInfoList) {
for (SpillInfo spillInfo : spillInfoList) {
+ outputContext.notifyProgress();
TezIndexRecord indexRecord = spillInfo.spillRecord.getIndex(i);
if (indexRecord.getPartLength() == 0) {
// Skip empty partitions within a spill
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
index 45784d9..0b8ed21 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
@@ -53,6 +53,7 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
if (currentReaderIndex == getInputs().size()) {
hasCompletedProcessing();
completedProcessing = true;
+ getContext().notifyProgress();
return false;
}
try {
@@ -63,6 +64,7 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
}
currentReader = (KeyValueReader) reader;
currentReaderIndex++;
+ getContext().notifyProgress();
} catch (Exception e) {
// An InterruptedException is not expected here since this works off of
// underlying readers which take care of throwing IOInterruptedExceptions
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
index 27ff324..4a8969e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
@@ -54,6 +54,7 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
if (currentReaderIndex == getInputs().size()) {
hasCompletedProcessing();
completedProcessing = true;
+ getContext().notifyProgress();
return false;
}
try {
@@ -64,6 +65,7 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
}
currentReader = (KeyValuesReader) reader;
currentReaderIndex++;
+ getContext().notifyProgress();
} catch (Exception e) {
// An InterruptedException is not expected here since this works off of
// underlying readers which take care of throwing IOInterruptedExceptions
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index 39cc471..5e367cf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -229,6 +229,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
return new KeyValuesReader() {
@Override
public boolean next() throws IOException {
+ getContext().notifyProgress();
hasCompletedProcessing();
completedProcessing = true;
return false;
@@ -259,7 +260,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
synchronized(this) {
valuesIter = vIter;
}
- return new OrderedGroupedKeyValuesReader(valuesIter);
+ return new OrderedGroupedKeyValuesReader(valuesIter, getContext());
}
@Override
@@ -307,13 +308,16 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
private static class OrderedGroupedKeyValuesReader extends KeyValuesReader {
private final ValuesIterator valuesIter;
+ private final InputContext context;
- OrderedGroupedKeyValuesReader(ValuesIterator valuesIter) {
+ OrderedGroupedKeyValuesReader(ValuesIterator valuesIter, InputContext context) {
this.valuesIter = valuesIter;
+ this.context = context;
}
@Override
public boolean next() throws IOException {
+ context.notifyProgress();
return valuesIter.moveToNext();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
index 41ca7c9..2345bbb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
@@ -60,7 +60,7 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput {
*/
@Override
public KeyValuesReader getReader() throws Exception {
- return new OrderedGroupedMergedKeyValuesReader(getInputs());
+ return new OrderedGroupedMergedKeyValuesReader(getInputs(), getContext());
}
@Override
@@ -81,8 +81,10 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput {
private final ValuesIterable currentValues;
private KeyValuesReader nextKVReader;
private Object currentKey;
+ private final MergedInputContext context;
- public OrderedGroupedMergedKeyValuesReader(List<Input> inputs) throws Exception {
+ public OrderedGroupedMergedKeyValuesReader(List<Input> inputs, MergedInputContext context)
+ throws Exception {
keyComparator = ((OrderedGroupedKVInput) inputs.get(0))
.getInputKeyComparator();
pQueue = new PriorityQueue<KeyValuesReader>(inputs.size(),
@@ -95,6 +97,7 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput {
}
}
currentValues = new ValuesIterable();
+ this.context = context;
}
private void advanceAndAddToQueue(KeyValuesReader kvsReadr)
@@ -122,6 +125,7 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput {
finishedReaders.clear();
nextKVReader = pQueue.poll();
+ context.notifyProgress();
if (nextKVReader != null) {
currentKey = nextKVReader.getCurrentKey();
currentValues.moveToNext();
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index fad164f..dbbe23f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -169,6 +169,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
return new KeyValueReader() {
@Override
public boolean next() throws IOException {
+ getContext().notifyProgress();
hasCompletedProcessing();
completedProcessing = true;
return false;
@@ -241,7 +242,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
int ifileBufferSize, boolean ifileReadAheadEnabled, int ifileReadAheadLength)
throws IOException {
return new UnorderedKVReader(shuffleManager, conf, codec, ifileReadAheadEnabled,
- ifileReadAheadLength, ifileBufferSize, inputRecordCounter);
+ ifileReadAheadLength, ifileBufferSize, inputRecordCounter, getContext());
}
private static final Set<String> confKeys = new HashSet<String>();
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
index 80bdc42..c49a423 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.io.Text;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
@@ -125,7 +126,7 @@ public class TestUnorderedKVReader {
}).when(manager).getNextInput();
unorderedKVReader = new UnorderedKVReader<Text, Text>(manager,
- defaultConf, null, false, -1, -1, inputRecords);
+ defaultConf, null, false, -1, -1, inputRecords, mock(InputContext.class));
}
private void createIFile(Path path, int recordCount) throws IOException {
@@ -177,7 +178,7 @@ public class TestUnorderedKVReader {
TezCounter inputRecords = counters.findCounter(TaskCounter.INPUT_RECORDS_PROCESSED);
UnorderedKVReader<Text, Text> reader =
new UnorderedKVReader<Text, Text>(shuffleManager, defaultConf, null, false, -1, -1,
- inputRecords);
+ inputRecords, mock(InputContext.class));
try {
reader.next();
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index 214ec45..b8f99de 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -328,6 +329,8 @@ public class TestMergeManager {
assertEquals(m1Prefix, m2Prefix);
assertNotEquals(m1Prefix, m3Prefix);
assertNotEquals(m2Prefix, m3Prefix);
+
+ verify(inputContext, atLeastOnce()).notifyProgress();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
index 3fe540c..1a6c3be 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@ -152,6 +152,8 @@ public class TestShuffleScheduler {
scheduler.copySucceeded(identifiers[i], mapHosts[i], 20, 25, 100, mapOutput, false);
scheduler.freeHost(mapHosts[i]);
}
+
+ verify(inputContext, atLeast(numInputs)).notifyProgress();
// Ensure the executor exits, and without an error.
executorFuture.get();
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index 2cebea4..70819e5 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.tez.runtime.library.common.sort.impl;
import com.google.common.collect.Maps;
@@ -39,30 +57,13 @@ import java.util.TreeMap;
import java.util.UUID;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
public class TestPipelinedSorter {
private static FileSystem localFs = null;
private static Path workDir = null;
@@ -417,6 +418,7 @@ public class TestPipelinedSorter {
//Verify dataset
verifyData(reader);
reader.close();
+ verify(outputContext, atLeastOnce()).notifyProgress();
}
private void verifyCounters(PipelinedSorter sorter, OutputContext context) {
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
index b531464..e0374a3 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -429,6 +430,7 @@ public class TestDefaultSorter {
TezCounter outputBytesWithOverheadCounter = context.getCounters().findCounter
(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
assertTrue(outputBytesWithOverheadCounter.getValue() > 0);
+ verify(context, atLeastOnce()).notifyProgress();
}
private void writeData(ExternalSorter sorter, int numKeys, int keyLen) throws IOException {
http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 1a10eb8..e7a2125 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -25,7 +25,6 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -585,6 +584,8 @@ public class TestUnorderedPartitionedKVWriter {
assertTrue(eventProto.hasPathComponent());
}
+ verify(outputContext, atLeast(1)).notifyProgress();
+
// Verify if all spill files are available.
TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId);
@@ -784,6 +785,7 @@ public class TestUnorderedPartitionedKVWriter {
expectedValues.remove(i);
}
assertEquals(0, expectedValues.size());
+ verify(outputContext, atLeast(1)).notifyProgress();
}
private static String createRandomString(int size) {