You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/12/15 20:47:56 UTC
tez git commit: TEZ-2918. Make progress notifications in IOs (bikas)
(cherry picked from commit 5ec498d8f5b6cecf4eaee8f995ea2cf9ca2acfcc)
Repository: tez
Updated Branches:
refs/heads/branch-0.7 a625b9f02 -> 3dca7d3f7
TEZ-2918. Make progress notifications in IOs (bikas)
(cherry picked from commit 5ec498d8f5b6cecf4eaee8f995ea2cf9ca2acfcc)
Conflicts:
CHANGES.txt
tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3dca7d3f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3dca7d3f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3dca7d3f
Branch: refs/heads/branch-0.7
Commit: 3dca7d3f78ab8754d1fba4d511346f97d735a847
Parents: a625b9f
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Nov 13 05:54:09 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Dec 14 16:10:55 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/runtime/api/MergedInputContext.java | 5 +
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 14 +-
.../org/apache/tez/mapreduce/input/MRInput.java | 10 +-
.../tez/mapreduce/input/MultiMRInput.java | 4 +-
.../org/apache/tez/mapreduce/lib/MRReader.java | 16 ++
.../tez/mapreduce/lib/MRReaderMapReduce.java | 10 +-
.../tez/mapreduce/lib/MRReaderMapred.java | 11 +-
.../apache/tez/mapreduce/output/MROutput.java | 1 +
.../apache/tez/mapreduce/input/TestMRInput.java | 3 +
.../tez/mapreduce/input/TestMultiMRInput.java | 4 +
.../tez/mapreduce/lib/TestKVReadersWithMR.java | 12 +-
.../tez/mapreduce/output/TestMROutput.java | 161 +++++++++++++++++++
.../runtime/LogicalIOProcessorRuntimeTask.java | 2 +-
.../org/apache/tez/runtime/RuntimeTask.java | 9 +-
.../api/impl/TezMergedInputContextImpl.java | 11 +-
.../runtime/api/impl/TezTaskContextImpl.java | 2 +-
.../tez/runtime/TestInputReadyTracker.java | 6 +-
.../common/readers/UnorderedKVReader.java | 8 +-
.../library/common/shuffle/ShuffleUtils.java | 1 +
.../common/shuffle/impl/ShuffleManager.java | 2 +
.../shuffle/orderedgrouped/MergeManager.java | 35 ++--
.../common/shuffle/orderedgrouped/Shuffle.java | 4 +-
.../orderedgrouped/ShuffleScheduler.java | 3 +-
.../common/sort/impl/ExternalSorter.java | 11 +-
.../common/sort/impl/PipelinedSorter.java | 10 +-
.../common/sort/impl/dflt/DefaultSorter.java | 10 +-
.../writers/UnorderedPartitionedKVWriter.java | 9 +-
.../input/ConcatenatedMergedKeyValueInput.java | 2 +
.../input/ConcatenatedMergedKeyValuesInput.java | 2 +
.../library/input/OrderedGroupedKVInput.java | 8 +-
.../input/OrderedGroupedMergedKVInput.java | 8 +-
.../runtime/library/input/UnorderedKVInput.java | 3 +-
.../common/readers/TestUnorderedKVReader.java | 3 +-
.../orderedgrouped/TestMergeManager.java | 5 +
.../common/sort/impl/TestPipelinedSorter.java | 37 +++--
.../sort/impl/dflt/TestDefaultSorter.java | 2 +
.../TestUnorderedPartitionedKVWriter.java | 4 +-
.../input/TestSortedGroupedMergedInput.java | 26 +--
39 files changed, 382 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9f9868c..398dc6b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES
+ TEZ-2918. Make progress notifications in IOs
TEZ-2979. FlakyTest: org.apache.tez.history.TestHistoryParser.
TEZ-2952. NPE in TestOnFileUnorderedKVOutput
TEZ-808. Handle task attempts that are not making progress
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java
index 41c519b..65bb087 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java
@@ -42,6 +42,11 @@ public interface MergedInputContext {
public void inputIsReady();
/**
+ * Inform the framework that progress has been made
+ */
+ public void notifyProgress();
+
+ /**
* Get the work directories for the Input
* @return an array of work dirs
*/
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 2449cf3..98b1681 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -193,6 +193,7 @@ public class TaskAttemptImpl implements TaskAttempt,
org.apache.tez.runtime.api.impl.TaskStatistics statistics;
long lastNotifyProgressTimestamp = 0;
+ private final long hungIntervalMax;
// Used to store locality information when
Set<String> taskHosts = new HashSet<String>();
@@ -500,6 +501,10 @@ public class TaskAttemptImpl implements TaskAttempt,
this.taskResource = resource;
this.containerContext = containerContext;
this.leafVertex = leafVertex;
+ this.hungIntervalMax = conf.getLong(
+ TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS,
+ TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS_DEFAULT);
+
}
@@ -1396,14 +1401,11 @@ public class TaskAttemptImpl implements TaskAttempt,
ta.lastNotifyProgressTimestamp = ta.clock.getTime();
} else {
long currTime = ta.clock.getTime();
- long hungIntervalMax = ta.conf.getLong(
- TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS,
- TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS_DEFAULT);
- if (hungIntervalMax > 0 &&
- currTime - ta.lastNotifyProgressTimestamp > hungIntervalMax) {
+ if (ta.hungIntervalMax > 0 &&
+ currTime - ta.lastNotifyProgressTimestamp > ta.hungIntervalMax) {
// task is hung
String diagnostics = "Attempt failed because it appears to make no progress for " +
- hungIntervalMax + "ms";
+ ta.hungIntervalMax + "ms";
LOG.info(diagnostics + " " + ta.getID());
// send event that will fail this attempt
ta.sendEvent(
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 93161cb..b68d135 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -461,9 +461,10 @@ public class MRInput extends MRInputBase {
mrReader = new MRReaderMapReduce(jobConf, getContext().getCounters(), inputRecordCounter,
getContext().getApplicationId().getClusterTimestamp(), getContext()
.getTaskVertexIndex(), getContext().getApplicationId().getId(), getContext()
- .getTaskIndex(), getContext().getTaskAttemptNumber());
+ .getTaskIndex(), getContext().getTaskAttemptNumber(), getContext());
} else {
- mrReader = new MRReaderMapred(jobConf, getContext().getCounters(), inputRecordCounter);
+ mrReader = new MRReaderMapred(jobConf, getContext().getCounters(), inputRecordCounter,
+ getContext());
}
} else {
TaskSplitMetaInfo[] allMetaInfo = MRInputUtils.readSplits(jobConf);
@@ -477,14 +478,14 @@ public class MRInput extends MRInputBase {
mrReader = new MRReaderMapReduce(jobConf, newInputSplit, getContext().getCounters(),
inputRecordCounter, getContext().getApplicationId().getClusterTimestamp(),
getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(),
- getContext().getTaskIndex(), getContext().getTaskAttemptNumber());
+ getContext().getTaskIndex(), getContext().getTaskAttemptNumber(), getContext());
} else {
org.apache.hadoop.mapred.InputSplit oldInputSplit = MRInputUtils
.getOldSplitDetailsFromDisk(splitMetaInfo, jobConf, getContext().getCounters()
.findCounter(TaskCounter.SPLIT_RAW_BYTES));
mrReader =
new MRReaderMapred(jobConf, oldInputSplit, getContext().getCounters(),
- inputRecordCounter);
+ inputRecordCounter, getContext());
}
}
} finally {
@@ -508,6 +509,7 @@ public class MRInput extends MRInputBase {
return new KeyValueReader() {
@Override
public boolean next() throws IOException {
+ getContext().notifyProgress();
return false;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
index 4a792dc..2b60f29 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
@@ -177,7 +177,7 @@ public class MultiMRInput extends MRInputBase {
getContext().getCounters(), inputRecordCounter, getContext().getApplicationId()
.getClusterTimestamp(), getContext().getTaskVertexIndex(), getContext()
.getApplicationId().getId(), getContext().getTaskIndex(), getContext()
- .getTaskAttemptNumber());
+ .getTaskAttemptNumber(), getContext());
if (LOG.isDebugEnabled()) {
LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " +
split.getClass().getName() + ", NewSplit: " + split);
@@ -186,7 +186,7 @@ public class MultiMRInput extends MRInputBase {
} else {
split = MRInputUtils.getOldSplitDetailsFromEvent(splitProto, localJobConf);
reader = new MRReaderMapred(localJobConf, (org.apache.hadoop.mapred.InputSplit) split,
- getContext().getCounters(), inputRecordCounter);
+ getContext().getCounters(), inputRecordCounter, getContext());
if (LOG.isDebugEnabled()) {
LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " +
split.getClass().getName() + ", OldSplit: " + split);
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java
index 8a20827..aa35fec 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java
@@ -21,14 +21,30 @@ package org.apache.tez.mapreduce.lib;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.api.KeyValueReader;
@Private
public abstract class MRReader extends KeyValueReader {
+
+ private final InputContext context;
+
public abstract void setSplit(Object split) throws IOException;
public abstract boolean isSetup();
public abstract float getProgress() throws IOException, InterruptedException;
public abstract void close() throws IOException;
public abstract Object getSplit();
public abstract Object getRecordReader();
+
+ public MRReader(InputContext context) {
+ this.context = context;
+ }
+
+ protected final void notifyProgress() {
+ context.notifyProgress();
+ }
+
+ protected final void notifyDone() {
+ context.notifyProgress();
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
index 0495751..c6f174a 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
@@ -20,6 +20,7 @@ package org.apache.tez.mapreduce.lib;
import java.io.IOException;
+import org.apache.tez.runtime.api.InputContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.mapred.JobConf;
@@ -50,15 +51,16 @@ public class MRReaderMapReduce extends MRReader {
private boolean setupComplete = false;
public MRReaderMapReduce(JobConf jobConf, TezCounters tezCounters, TezCounter inputRecordCounter,
- long clusterId, int vertexIndex, int appId, int taskIndex, int taskAttemptNumber)
+ long clusterId, int vertexIndex, int appId, int taskIndex, int taskAttemptNumber, InputContext context)
throws IOException {
this(jobConf, null, tezCounters, inputRecordCounter, clusterId, vertexIndex, appId, taskIndex,
- taskAttemptNumber);
+ taskAttemptNumber, context);
}
public MRReaderMapReduce(JobConf jobConf, InputSplit inputSplit, TezCounters tezCounters,
TezCounter inputRecordCounter, long clusterId, int vertexIndex, int appId, int taskIndex,
- int taskAttemptNumber) throws IOException {
+ int taskAttemptNumber, InputContext context) throws IOException {
+ super(context);
this.inputRecordCounter = inputRecordCounter;
this.taskAttemptContext = new TaskAttemptContextImpl(jobConf, tezCounters, clusterId,
vertexIndex, appId, taskIndex, taskAttemptNumber, true, null);
@@ -120,9 +122,11 @@ public class MRReaderMapReduce extends MRReader {
}
if (hasNext) {
inputRecordCounter.increment(1);
+ notifyProgress();
} else {
hasCompletedProcessing();
completedProcessing = true;
+ notifyDone();
}
return hasNext;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
index 366e7a7..b0c046f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
@@ -33,6 +33,7 @@ import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.runtime.api.InputContext;
import com.google.common.base.Preconditions;
@@ -56,13 +57,15 @@ public class MRReaderMapred extends MRReader {
private boolean setupComplete = false;
- public MRReaderMapred(JobConf jobConf, TezCounters tezCounters, TezCounter inputRecordCounter)
+ public MRReaderMapred(JobConf jobConf, TezCounters tezCounters, TezCounter inputRecordCounter,
+ InputContext context)
throws IOException {
- this(jobConf, null, tezCounters, inputRecordCounter);
+ this(jobConf, null, tezCounters, inputRecordCounter, context);
}
public MRReaderMapred(JobConf jobConf, InputSplit inputSplit, TezCounters tezCounters,
- TezCounter inputRecordCounter) throws IOException {
+ TezCounter inputRecordCounter, InputContext context) throws IOException {
+ super(context);
this.jobConf = jobConf;
this.tezCounters = tezCounters;
this.inputRecordCounter = inputRecordCounter;
@@ -113,9 +116,11 @@ public class MRReaderMapred extends MRReader {
boolean hasNext = recordReader.next(key, value);
if (hasNext) {
inputRecordCounter.increment(1);
+ notifyProgress();
} else {
hasCompletedProcessing();
completedProcessing = true;
+ notifyDone();
}
return hasNext;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 7136482..0503b22 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -509,6 +509,7 @@ public class MROutput extends AbstractLogicalOutput {
oldRecordWriter.write(key, value);
}
outputRecordCounter.increment(1);
+ getContext().notifyProgress();
}
};
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
index 61b6f81..50d7aa0 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
@@ -19,6 +19,8 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.LinkedList;
@@ -57,6 +59,7 @@ public class TestMRInput {
mrInput.start();
assertFalse(mrInput.getReader().next());
+ verify(inputContext, times(1)).notifyProgress();
List<Event> events = new LinkedList<Event>();
try {
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
index f390d8a..59c459f 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.ArrayList;
@@ -155,9 +157,11 @@ public class TestMultiMRInput {
input.handleEvents(eventList);
int readerCount = 0;
+ int recordCount = 0;
for (KeyValueReader reader : input.getKeyValueReaders()) {
readerCount++;
while (reader.next()) {
+ verify(inputContext, times(++recordCount) ).notifyProgress();
if (data1.size() == 0) {
fail("Found more records than expected");
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java
index 65f5ad0..dad18de 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.api.InputContext;
import org.junit.Before;
import org.junit.Test;
@@ -32,6 +33,9 @@ import java.io.IOException;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
public class TestKVReadersWithMR {
@@ -60,12 +64,14 @@ public class TestKVReadersWithMR {
}
public void testWithSpecificNumberOfKV(int kvPairs) throws IOException {
- MRReaderMapred reader = new MRReaderMapred(conf, counters, inputRecordCounter);
+ InputContext mockContext = mock(InputContext.class);
+ MRReaderMapred reader = new MRReaderMapred(conf, counters, inputRecordCounter, mockContext);
reader.recordReader = new DummyRecordReader(kvPairs);
int records = 0;
while (reader.next()) {
records++;
+ verify(mockContext, times(records)).notifyProgress();
}
assertTrue(kvPairs == records);
@@ -80,13 +86,15 @@ public class TestKVReadersWithMR {
}
public void testWithSpecificNumberOfKV_MapReduce(int kvPairs) throws IOException {
+ InputContext mockContext = mock(InputContext.class);
MRReaderMapReduce reader = new MRReaderMapReduce(conf, counters, inputRecordCounter, -1, 1,
- 10, 20, 30);
+ 10, 20, 30, mockContext);
reader.recordReader = new DummyRecordReaderMapReduce(kvPairs);
int records = 0;
while (reader.next()) {
records++;
+ verify(mockContext, times(records)).notifyProgress();
}
assertTrue(kvPairs == records);
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
index b898fe0..45a24cb 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
@@ -20,21 +20,52 @@ package org.apache.tez.mapreduce.output;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Writer;
+import java.util.HashMap;
+import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.mapreduce.TestUmbilical;
+import org.apache.tez.mapreduce.TezTestUtils;
import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+import org.junit.Ignore;
import org.junit.Test;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+
public class TestMROutput {
@@ -144,4 +175,134 @@ public class TestMROutput {
when(outputContext.getCounters()).thenReturn(new TezCounters());
return outputContext;
}
+
+ public static LogicalIOProcessorRuntimeTask createLogicalTask(
+ Configuration conf,
+ TezUmbilical umbilical, String dagName,
+ String vertexName) throws Exception {
+ ProcessorDescriptor procDesc = ProcessorDescriptor.create(TestProcessor.class.getName());
+ List<InputSpec> inputSpecs = Lists.newLinkedList();
+ List<OutputSpec> outputSpecs = Lists.newLinkedList();
+ outputSpecs.add(new OutputSpec("Null",
+ MROutput.createConfigBuilder(conf, TestOutputFormat.class).build().getOutputDescriptor(), 1));
+
+ TaskSpec taskSpec = new TaskSpec(
+ TezTestUtils.getMockTaskAttemptId(0, 0, 0, 0),
+ dagName, vertexName, -1,
+ procDesc,
+ inputSpecs,
+ outputSpecs, null);
+
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path workDir =
+ new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+ "TestMapOutput").makeQualified(fs.getUri(), fs.getWorkingDirectory());
+
+ LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask(
+ taskSpec,
+ 0,
+ conf,
+ new String[] {workDir.toString()},
+ umbilical,
+ null,
+ new HashMap<String, String>(),
+ HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"),
+ Runtime.getRuntime().maxMemory());
+ return task;
+ }
+
+ public static class TestOutputCommitter extends OutputCommitter {
+
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException {
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskContext) throws IOException {
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskContext) throws IOException {
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskContext) throws IOException {
+ }
+
+ }
+
+ public static class TestOutputFormat extends OutputFormat<String, String> {
+ public static class TestRecordWriter extends RecordWriter<String, String> {
+ Writer writer;
+ boolean doWrite;
+ TestRecordWriter(boolean write) throws IOException {
+ this.doWrite = write;
+ if (doWrite) {
+ File f = File.createTempFile("test", null);
+ f.deleteOnExit();
+ writer = new BufferedWriter(new FileWriter(f));
+ }
+ }
+
+ @Override
+ public void write(String key, String value) throws IOException, InterruptedException {
+ if (doWrite) {
+ writer.write(key);
+ writer.write(value);
+ }
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ writer.close();
+ }
+
+ }
+
+ @Override
+ public RecordWriter<String, String> getRecordWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new TestRecordWriter(true);
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new TestOutputCommitter();
+ }
+ }
+
+ public static class TestProcessor extends SimpleProcessor {
+ public TestProcessor(ProcessorContext context) {
+ super(context);
+ }
+
+ @Override
+ public void run() throws Exception {
+ KeyValueWriter writer = (KeyValueWriter) getOutputs().values().iterator().next().getWriter();
+ for (int i=0; i<1000000; ++i) {
+ writer.write("key", "value");
+ }
+ }
+
+ }
+
+ @Ignore
+ @Test
+ public void testPerf() throws Exception {
+ LogicalIOProcessorRuntimeTask task = createLogicalTask(new Configuration(),
+ new TestUmbilical(), "dag", "vertex");
+ task.initialize();
+ task.run();
+ task.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index dde3548..f393769 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -528,7 +528,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
MergedInputContext mergedInputContext =
new TezMergedInputContextImpl(groupInputSpec.getMergedInputDescriptor().getUserPayload(),
- groupInputSpec.getGroupName(), groupInputsMap, inputReadyTracker, localDirs);
+ groupInputSpec.getGroupName(), groupInputsMap, inputReadyTracker, localDirs, this);
List<Input> inputs = Lists.newArrayListWithCapacity(groupInputSpec.getGroupVertices().size());
for (String groupVertex : groupInputSpec.getGroupVertices()) {
inputs.add(inputsMap.get(groupVertex));
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index a0820c7..70dc867 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -54,7 +54,7 @@ public abstract class RuntimeTask {
private final AtomicBoolean taskDone;
private final TaskCounterUpdater counterUpdater;
private final TaskStatistics statistics;
- private volatile boolean progressNotified;
+ private final AtomicBoolean progressNotified = new AtomicBoolean(false);
protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf,
TezUmbilical tezUmbilical, String pid) {
@@ -97,13 +97,12 @@ public abstract class RuntimeTask {
this.fatalErrorMessage = message;
}
- public void notifyProgressInvocation() {
- progressNotified = true;
+ public final void notifyProgressInvocation() {
+ progressNotified.lazySet(true);
}
public boolean getAndClearProgressNotification() {
- boolean retVal = progressNotified;
- progressNotified = false;
+ boolean retVal = progressNotified.getAndSet(false);
return retVal;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
index 74592c6..e35e332 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.InputReadyTracker;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.runtime.api.MergedLogicalInput;
import org.apache.tez.runtime.api.MergedInputContext;
@@ -38,10 +39,12 @@ public class TezMergedInputContextImpl implements MergedInputContext {
private final Map<String, MergedLogicalInput> groupInputsMap;
private final InputReadyTracker inputReadyTracker;
private final String[] workDirs;
+ private final LogicalIOProcessorRuntimeTask runtimeTask;
public TezMergedInputContextImpl(@Nullable UserPayload userPayload, String groupInputName,
Map<String, MergedLogicalInput> groupInputsMap,
- InputReadyTracker inputReadyTracker, String[] workDirs) {
+ InputReadyTracker inputReadyTracker, String[] workDirs,
+ LogicalIOProcessorRuntimeTask runtimeTask) {
checkNotNull(groupInputName, "groupInputName is null");
checkNotNull(groupInputsMap, "input-group map is null");
checkNotNull(inputReadyTracker, "inputReadyTracker is null");
@@ -50,6 +53,7 @@ public class TezMergedInputContextImpl implements MergedInputContext {
this.userPayload = userPayload;
this.inputReadyTracker = inputReadyTracker;
this.workDirs = workDirs;
+ this.runtimeTask = runtimeTask;
}
@Override
@@ -67,4 +71,9 @@ public class TezMergedInputContextImpl implements MergedInputContext {
return Arrays.copyOf(workDirs, workDirs.length);
}
+ @Override
+ public final void notifyProgress() {
+ runtimeTask.notifyProgressInvocation();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index 211f9d7..c12b334 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -174,7 +174,7 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable {
}
@Override
- public void notifyProgress() {
+ public final void notifyProgress() {
runtimeTask.notifyProgressInvocation();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
index a77e38f..29c5023 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
@@ -156,8 +156,10 @@ public class TestInputReadyTracker {
group2Inputs.add(input4);
Map<String, MergedLogicalInput> mergedInputMap = new HashMap<String, MergedLogicalInput>();
- MergedInputContext mergedInputContext1 = new TezMergedInputContextImpl(null, "group1", mergedInputMap, inputReadyTracker, null);
- MergedInputContext mergedInputContext2 = new TezMergedInputContextImpl(null, "group2", mergedInputMap, inputReadyTracker, null);
+ MergedInputContext mergedInputContext1 = new TezMergedInputContextImpl(
+ null, "group1", mergedInputMap, inputReadyTracker, null, null);
+ MergedInputContext mergedInputContext2 = new TezMergedInputContextImpl(
+ null, "group2", mergedInputMap, inputReadyTracker, null, null);
AnyOneMergedInputForTest group1 = new AnyOneMergedInputForTest(mergedInputContext1, group1Inputs);
AllMergedInputForTest group2 = new AllMergedInputForTest(mergedInputContext2, group2Inputs);
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
index b14a461..a7fd7c8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
@@ -20,6 +20,7 @@ package org.apache.tez.runtime.library.common.readers;
import java.io.IOException;
+import org.apache.tez.runtime.api.InputContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -60,6 +61,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
private final int ifileBufferSize;
private final TezCounter inputRecordCounter;
+ private final InputContext context;
private K key;
private V value;
@@ -74,10 +76,10 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
public UnorderedKVReader(ShuffleManager shuffleManager, Configuration conf,
CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int ifileBufferSize,
- TezCounter inputRecordCounter)
+ TezCounter inputRecordCounter, InputContext context)
throws IOException {
this.shuffleManager = shuffleManager;
-
+ this.context = context;
this.codec = codec;
this.ifileReadAhead = ifileReadAhead;
this.ifileReadAheadLength = ifileReadAheadLength;
@@ -112,6 +114,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
public boolean next() throws IOException {
if (readNextFromCurrentReader()) {
inputRecordCounter.increment(1);
+ context.notifyProgress();
numRecordsRead++;
return true;
} else {
@@ -119,6 +122,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
while (nextInputExists) {
if(readNextFromCurrentReader()) {
inputRecordCounter.increment(1);
+ context.notifyProgress();
numRecordsRead++;
return true;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 8aca3af..48d7eb2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -419,6 +419,7 @@ public class ShuffleUtils {
@Nullable long[] partitionStats) throws IOException {
Preconditions.checkArgument(eventList != null, "EventList can't be null");
+ context.notifyProgress();
if (finalMergeEnabled) {
Preconditions.checkArgument(isLastEvent, "Can not send multiple events when final merge is "
+ "enabled");
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 99fc18a..151cac4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -604,6 +604,7 @@ public class ShuffleManager implements FetcherCallback {
lock.unlock();
}
+ inputContext.notifyProgress();
boolean committed = false;
if (!completedInputSet.contains(inputIdentifier)) {
synchronized (completedInputSet) {
@@ -760,6 +761,7 @@ public class ShuffleManager implements FetcherCallback {
+ "InputIdentifier: " + srcAttemptIdentifier + ", connectFailed: "
+ connectFailed);
failedShufflesCounter.increment(1);
+ inputContext.notifyProgress();
if (srcAttemptIdentifier == null) {
reportFatalError(null, "Received fetchFailure for an unknown src (null)");
} else {
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 9516f27..9f5287d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -82,7 +82,12 @@ public class MergeManager {
private final LocalDirAllocator localDirAllocator;
private final TezTaskOutputFiles mapOutputFile;
- private final Progressable nullProgressable = new NullProgressable();
+ private final Progressable progressable = new Progressable() {
+ @Override
+ public void progress() {
+ inputContext.notifyProgress();
+ }
+ };
private final Combiner combiner;
private final Set<MapOutput> inMemoryMergedMapOutputs =
@@ -567,6 +572,7 @@ public class MergeManager {
return;
}
+ inputContext.notifyProgress();
InputAttemptIdentifier dummyMapId = inputs.get(0).getAttemptIdentifier();
List<Segment> inMemorySegments = new ArrayList<Segment>();
@@ -593,8 +599,8 @@ public class MergeManager {
inMemorySegments, inMemorySegments.size(),
new Path(inputContext.getUniqueIdentifier()),
(RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
- nullProgressable, null, null, null, null);
- TezMerger.writeFile(rIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
+ progressable, null, null, null, null);
+ TezMerger.writeFile(rIter, writer, progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
writer.close();
LOG.info(inputContext.getSourceVertexName() +
@@ -625,6 +631,7 @@ public class MergeManager {
}
numMemToDiskMerges.increment(1);
+ inputContext.notifyProgress();
//name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to
@@ -644,7 +651,7 @@ public class MergeManager {
// TODO Maybe track serialized vs deserialized bytes.
- // All disk writes done by this merge are overhead - due to the lac of
+ // All disk writes done by this merge are overhead - due to the lack of
// adequate memory to keep all segments in memory.
Path outputPath = mapOutputFile.getInputFileForWrite(
srcTaskIdentifier.getInputIdentifier().getInputIndex(), srcTaskIdentifier.getSpillEventId(),
@@ -670,13 +677,13 @@ public class MergeManager {
inMemorySegments, inMemorySegments.size(),
new Path(inputContext.getUniqueIdentifier()),
(RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
- nullProgressable, spilledRecordsCounter, null, additionalBytesRead, null);
+ progressable, spilledRecordsCounter, null, additionalBytesRead, null);
// spilledRecordsCounter is tracking the number of keys that will be
// read from each of the segments being merged - which is essentially
// what will be written to disk.
if (null == combiner) {
- TezMerger.writeFile(rIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
+ TezMerger.writeFile(rIter, writer, progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
} else {
// TODO Counters for Combine
runCombineProcessor(rIter, writer);
@@ -729,7 +736,8 @@ public class MergeManager {
return;
}
numDiskToDiskMerges.increment(1);
-
+ inputContext.notifyProgress();
+
long approxOutputSize = 0;
int bytesPerSum =
conf.getInt("io.bytes.per.checksum", 512);
@@ -790,13 +798,13 @@ public class MergeManager {
inputSegments,
ioSortFactor, tmpDir,
(RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
- nullProgressable, true, spilledRecordsCounter, null,
+ progressable, true, spilledRecordsCounter, null,
mergedMapOutputsCounter, null);
// TODO Maybe differentiate between data written because of Merges and
// the finalMerge (i.e. final mem available may be different from
// initial merge mem)
- TezMerger.writeFile(iter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
+ TezMerger.writeFile(iter, writer, progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
writer.close();
additionalBytesWritten.increment(writer.getCompressedLength());
} catch (IOException e) {
@@ -907,6 +915,7 @@ public class MergeManager {
}
}
+ inputContext.notifyProgress();
// merge config params
Class keyClass = (Class)ConfigUtils.getIntermediateInputKeyClass(job);
@@ -943,12 +952,12 @@ public class MergeManager {
mapOutputFile.getInputFileForWrite(srcTaskId, Integer.MAX_VALUE,
inMemToDiskBytes).suffix(Constants.MERGED_OUTPUT_PREFIX);
final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs, keyClass, valueClass,
- memDiskSegments, numMemDiskSegments, tmpDir, comparator, nullProgressable,
+ memDiskSegments, numMemDiskSegments, tmpDir, comparator, progressable,
spilledRecordsCounter, null, additionalBytesRead, null);
final Writer writer = new Writer(job, fs, outputPath,
keyClass, valueClass, codec, null, null);
try {
- TezMerger.writeFile(rIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
+ TezMerger.writeFile(rIter, writer, progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
} catch (IOException e) {
if (null != outputPath) {
try {
@@ -1024,7 +1033,7 @@ public class MergeManager {
TezRawKeyValueIterator diskMerge = TezMerger.merge(
job, fs, keyClass, valueClass, codec, diskSegments,
ioSortFactor, numInMemSegments, tmpDir, comparator,
- nullProgressable, false, spilledRecordsCounter, null, additionalBytesRead, null);
+ progressable, false, spilledRecordsCounter, null, additionalBytesRead, null);
diskSegments.clear();
if (0 == finalSegments.size()) {
return diskMerge;
@@ -1035,7 +1044,7 @@ public class MergeManager {
// This is doing nothing but creating an iterator over the segments.
return TezMerger.merge(job, fs, keyClass, valueClass,
finalSegments, finalSegments.size(), tmpDir,
- comparator, nullProgressable, spilledRecordsCounter, null,
+ comparator, progressable, spilledRecordsCounter, null,
additionalBytesRead, null);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index cad820f..b0b83f0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -372,13 +372,15 @@ public class Shuffle implements ExceptionReporter {
// Finish the on-going merges...
TezRawKeyValueIterator kvIter = null;
+ inputContext.notifyProgress();
try {
kvIter = merger.close();
} catch (Throwable e) {
throw new ShuffleError("Error while doing final merge " , e);
}
mergePhaseTime.setValue(System.currentTimeMillis() - startTime);
-
+
+ inputContext.notifyProgress();
// Sanity check
synchronized (Shuffle.this) {
if (throwable.get() != null) {
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 4a71268..dc83eae 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -311,6 +311,7 @@ class ShuffleScheduler {
boolean isLocalFetch
) throws IOException {
+ inputContext.notifyProgress();
if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex())) {
if (!isLocalFetch) {
/**
@@ -471,7 +472,7 @@ class ShuffleScheduler {
boolean isLocalFetch
) {
failedShuffleCounter.increment(1);
-
+ inputContext.notifyProgress();
int failures = incrementAndGetFailureAttempt(srcAttempt);
if (!isLocalFetch) {
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index ac5acb8..31f1862 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -58,7 +58,6 @@ import org.apache.tez.runtime.library.common.combine.Combiner;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
-import org.apache.tez.runtime.library.hadoop.compat.NullProgressable;
import com.google.common.base.Preconditions;
@@ -71,6 +70,7 @@ public abstract class ExternalSorter {
spillFileIndexPaths.clear();
spillFilePaths.clear();
reportStatistics();
+ outputContext.notifyProgress();
}
public abstract void flush() throws IOException;
@@ -85,7 +85,13 @@ public abstract class ExternalSorter {
}
}
- protected final Progressable nullProgressable = new NullProgressable();
+ protected final Progressable progressable = new Progressable() {
+ @Override
+ public void progress() {
+ outputContext.notifyProgress();
+ }
+ };
+
protected final OutputContext outputContext;
protected final Combiner combiner;
protected final Partitioner partitioner;
@@ -292,6 +298,7 @@ public abstract class ExternalSorter {
protected void runCombineProcessor(TezRawKeyValueIterator kvIter,
Writer writer) throws IOException {
try {
+ outputContext.notifyProgress();
combiner.combine(kvIter, writer);
} catch (InterruptedException e) {
throw new IOException(e);
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 9708d7c..8e5c598 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -355,6 +355,7 @@ public class PipelinedSorter extends ExternalSorter {
span.kvmeta.put(valstart);
span.kvmeta.put(valend - valstart);
mapOutputRecordCounter.increment(1);
+ outputContext.notifyProgress();
mapOutputByteCounter.increment(valend - keystart);
}
@@ -453,6 +454,7 @@ public class PipelinedSorter extends ExternalSorter {
merger.ready(); // wait for all the future results from sort threads
LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + filename.toString());
for (int i = 0; i < partitions; ++i) {
+ outputContext.notifyProgress();
TezRawKeyValueIterator kvIter = merger.filter(i);
//write merged output to disk
long segmentStart = out.getPos();
@@ -505,6 +507,8 @@ public class PipelinedSorter extends ExternalSorter {
public void flush() throws IOException {
final String uniqueIdentifier = outputContext.getUniqueIdentifier();
+ outputContext.notifyProgress();
+
LOG.info(outputContext.getDestinationVertexName() + ": Starting flush of map output");
span.end();
merger.add(span.sort(sorter));
@@ -621,7 +625,7 @@ public class PipelinedSorter extends ExternalSorter {
segmentList, mergeFactor,
new Path(uniqueIdentifier),
(RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf),
- nullProgressable, sortSegments, true,
+ progressable, sortSegments, true,
null, spilledRecordsCounter, additionalSpillBytesRead,
null); // Not using any Progress in TezMerger. Should just work.
@@ -631,7 +635,7 @@ public class PipelinedSorter extends ExternalSorter {
new Writer(conf, finalOut, keyClass, valClass, codec,
spilledRecordsCounter, null, merger.needsRLE());
if (combiner == null || numSpills < minSpillsForCombine) {
- TezMerger.writeFile(kvIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
+ TezMerger.writeFile(kvIter, writer, progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
} else {
runCombineProcessor(kvIter, writer);
}
@@ -762,7 +766,7 @@ public class PipelinedSorter extends ExternalSorter {
public SpanIterator sort(IndexedSorter sorter) {
long start = System.currentTimeMillis();
if(length() > 1) {
- sorter.sort(this, 0, length(), nullProgressable);
+ sorter.sort(this, 0, length(), progressable);
}
LOG.info(outputContext.getDestinationVertexName() + ": " + "done sorting span=" + index + ", length=" + length() + ", "
+ "time=" + (System.currentTimeMillis() - start));
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 6c15a5d..6fe98b5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -343,6 +343,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
int valend = bb.markRecord();
mapOutputRecordCounter.increment(1);
+ outputContext.notifyProgress();
mapOutputByteCounter.increment(
distanceTo(keystart, valend, bufvoid));
@@ -638,6 +639,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
@Override
public void flush() throws IOException {
LOG.info("Starting flush of map output");
+ outputContext.notifyProgress();
spillLock.lock();
try {
while (spillInProgress) {
@@ -669,6 +671,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
sameKeyCount = sameKey;
totalKeysCount = totalKeys;
}
+ outputContext.notifyProgress();
sortAndSpill(sameKeyCount, totalKeysCount);
}
} catch (InterruptedException e) {
@@ -792,7 +795,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
throws IOException, InterruptedException {
final int mstart = getMetaStart();
final int mend = getMetaEnd();
- sorter.sort(this, mstart, mend, nullProgressable);
+ sorter.sort(this, mstart, mend, progressable);
spill(mstart, mend, sameKeyCount, totalKeysCount);
}
@@ -1233,6 +1236,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
List<Segment> segmentList =
new ArrayList<Segment>(numSpills);
for(int i = 0; i < numSpills; i++) {
+ outputContext.notifyProgress();
TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
Segment s =
@@ -1261,7 +1265,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
segmentList, mergeFactor,
new Path(taskIdentifier),
(RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf),
- nullProgressable, sortSegments, true,
+ progressable, sortSegments, true,
null, spilledRecordsCounter, additionalSpillBytesRead,
null); // Not using any Progress in TezMerger. Should just work.
@@ -1272,7 +1276,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
spilledRecordsCounter, null);
if (combiner == null || numSpills < minSpillsForCombine) {
TezMerger.writeFile(kvIter, writer,
- nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
+ progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
} else {
runCombineProcessor(kvIter, writer);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 4de167a..08dd130 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -247,8 +247,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
throw new IOException("Exception during spill", new IOException(spillException));
}
if (skipBuffers) {
- //special case, where we have only one partition and pipeliing is disabled.
- writer.append(key, value);
+ //special case, where we have only one partition and pipelining is disabled.
+ writer.append(key, value); // ???? Why is outputrecordscounter not updated here?
+ outputContext.notifyProgress();
} else {
int partition = partitioner.getPartition(key, value, numPartitions);
write(key, value, partition);
@@ -318,6 +319,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
outputRecordBytesCounter.increment(currentBuffer.nextPosition - (metaStart + META_SIZE));
outputBytesWithOverheadCounter.increment((currentBuffer.nextPosition - metaStart) + metaSkip);
outputRecordsCounter.increment(1);
+ outputContext.notifyProgress();
currentBuffer.partitionPositions[partition] = metaStart;
currentBuffer.recordsPerPartition[partition]++;
currentBuffer.numRecords++;
@@ -406,6 +408,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
long compressedLength = 0;
for (int i = 0; i < numPartitions; i++) {
IFile.Writer writer = null;
+ outputContext.notifyProgress();
try {
long segmentStart = out.getPos();
if (wrappedBuffer.partitionPositions[i] == WrappedBuffer.PARTITION_ABSENT_POSITION) {
@@ -555,6 +558,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
boolean isLastSpill, String pathComponent, BitSet emptyPartitions)
throws IOException {
+ outputContext.notifyProgress();
DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
.newBuilder();
@@ -709,6 +713,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
}
synchronized (spillInfoList) {
for (SpillInfo spillInfo : spillInfoList) {
+ outputContext.notifyProgress();
TezIndexRecord indexRecord = spillInfo.spillRecord.getIndex(i);
if (indexRecord.getPartLength() == 0) {
// Skip empty partitions within a spill
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
index 14b1e2c..728121a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
@@ -53,6 +53,7 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
if (currentReaderIndex == getInputs().size()) {
hasCompletedProcessing();
completedProcessing = true;
+ getContext().notifyProgress();
return false;
}
try {
@@ -63,6 +64,7 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
}
currentReader = (KeyValueReader) reader;
currentReaderIndex++;
+ getContext().notifyProgress();
} catch (Exception e) {
throw new IOException(e);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
index 2a1e4c6..f9a5959 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
@@ -54,6 +54,7 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
if (currentReaderIndex == getInputs().size()) {
hasCompletedProcessing();
completedProcessing = true;
+ getContext().notifyProgress();
return false;
}
try {
@@ -64,6 +65,7 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
}
currentReader = (KeyValuesReader) reader;
currentReaderIndex++;
+ getContext().notifyProgress();
} catch (Exception e) {
throw new IOException(e);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index 02c4176..278c8d0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -220,6 +220,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
return new KeyValuesReader() {
@Override
public boolean next() throws IOException {
+ getContext().notifyProgress();
hasCompletedProcessing();
completedProcessing = true;
return false;
@@ -250,7 +251,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
synchronized(this) {
valuesIter = vIter;
}
- return new OrderedGroupedKeyValuesReader(valuesIter);
+ return new OrderedGroupedKeyValuesReader(valuesIter, getContext());
}
@Override
@@ -298,13 +299,16 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
private static class OrderedGroupedKeyValuesReader extends KeyValuesReader {
private final ValuesIterator valuesIter;
+ private final InputContext context;
- OrderedGroupedKeyValuesReader(ValuesIterator valuesIter) {
+ OrderedGroupedKeyValuesReader(ValuesIterator valuesIter, InputContext context) {
this.valuesIter = valuesIter;
+ this.context = context;
}
@Override
public boolean next() throws IOException {
+ context.notifyProgress();
return valuesIter.moveToNext();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
index 41ca7c9..2345bbb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
@@ -60,7 +60,7 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput {
*/
@Override
public KeyValuesReader getReader() throws Exception {
- return new OrderedGroupedMergedKeyValuesReader(getInputs());
+ return new OrderedGroupedMergedKeyValuesReader(getInputs(), getContext());
}
@Override
@@ -81,8 +81,10 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput {
private final ValuesIterable currentValues;
private KeyValuesReader nextKVReader;
private Object currentKey;
+ private final MergedInputContext context;
- public OrderedGroupedMergedKeyValuesReader(List<Input> inputs) throws Exception {
+ public OrderedGroupedMergedKeyValuesReader(List<Input> inputs, MergedInputContext context)
+ throws Exception {
keyComparator = ((OrderedGroupedKVInput) inputs.get(0))
.getInputKeyComparator();
pQueue = new PriorityQueue<KeyValuesReader>(inputs.size(),
@@ -95,6 +97,7 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput {
}
}
currentValues = new ValuesIterable();
+ this.context = context;
}
private void advanceAndAddToQueue(KeyValuesReader kvsReadr)
@@ -122,6 +125,7 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput {
finishedReaders.clear();
nextKVReader = pQueue.poll();
+ context.notifyProgress();
if (nextKVReader != null) {
currentKey = nextKVReader.getCurrentKey();
currentValues.moveToNext();
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index f5c8091..f1b15d2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -168,6 +168,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
return new KeyValueReader() {
@Override
public boolean next() throws IOException {
+ getContext().notifyProgress();
hasCompletedProcessing();
completedProcessing = true;
return false;
@@ -240,7 +241,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
int ifileBufferSize, boolean ifileReadAheadEnabled, int ifileReadAheadLength)
throws IOException {
return new UnorderedKVReader(shuffleManager, conf, codec, ifileReadAheadEnabled,
- ifileReadAheadLength, ifileBufferSize, inputRecordCounter);
+ ifileReadAheadLength, ifileBufferSize, inputRecordCounter, getContext());
}
private static final Set<String> confKeys = new HashSet<String>();
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
index 51ea42d..2053642 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.io.Text;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
@@ -123,7 +124,7 @@ public class TestUnorderedKVReader {
}).when(manager).getNextInput();
unorderedKVReader = new UnorderedKVReader<Text, Text>(manager,
- defaultConf, null, false, -1, -1, inputRecords);
+ defaultConf, null, false, -1, -1, inputRecords, mock(InputContext.class));
}
private void createIFile(Path path, int recordCount) throws IOException {
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index dad3aec..a1b3003 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -21,6 +21,9 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -410,6 +413,8 @@ public class TestMergeManager {
assertEquals(m1Prefix, m2Prefix);
assertNotEquals(m1Prefix, m3Prefix);
assertNotEquals(m2Prefix, m3Prefix);
+
+ verify(inputContext, atLeastOnce()).notifyProgress();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index 129f4d1..f5076a6 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.tez.runtime.library.common.sort.impl;
import com.google.common.collect.Maps;
@@ -41,28 +59,12 @@ import java.util.UUID;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
public class TestPipelinedSorter {
private static Configuration conf = new Configuration();
private static FileSystem localFs = null;
@@ -352,6 +354,7 @@ public class TestPipelinedSorter {
//Verify dataset
verifyData(reader);
reader.close();
+ verify(outputContext, atLeastOnce()).notifyProgress();
}
private void verifyCounters(PipelinedSorter sorter, OutputContext context) {
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
index 4022525..91112da 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -429,6 +430,7 @@ public class TestDefaultSorter {
TezCounter outputBytesWithOverheadCounter = context.getCounters().findCounter
(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
assertTrue(outputBytesWithOverheadCounter.getValue() > 0);
+ verify(context, atLeastOnce()).notifyProgress();
}
private void writeData(ExternalSorter sorter, int numKeys, int keyLen) throws IOException {
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 1a10eb8..e7a2125 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -25,7 +25,6 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -585,6 +584,8 @@ public class TestUnorderedPartitionedKVWriter {
assertTrue(eventProto.hasPathComponent());
}
+ verify(outputContext, atLeast(1)).notifyProgress();
+
// Verify if all spill files are available.
TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId);
@@ -784,6 +785,7 @@ public class TestUnorderedPartitionedKVWriter {
expectedValues.remove(i);
}
assertEquals(0, expectedValues.size());
+ verify(outputContext, atLeast(1)).notifyProgress();
}
private static String createRandomString(int size) {
http://git-wip-us.apache.org/repos/asf/tez/blob/3dca7d3f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
index 0de400e..c8ae67b 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
@@ -22,21 +22,19 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import java.io.IOException;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.io.RawComparator;
-import org.apache.tez.runtime.InputReadyTracker;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.Input;
-import org.apache.tez.runtime.api.MergedLogicalInput;
import org.apache.tez.runtime.api.MergedInputContext;
import org.apache.tez.runtime.api.Reader;
-import org.apache.tez.runtime.api.impl.TezMergedInputContextImpl;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.junit.Test;
@@ -44,8 +42,7 @@ import org.junit.Test;
public class TestSortedGroupedMergedInput {
MergedInputContext createMergedInputContext() {
- return new TezMergedInputContextImpl(null, "mergedInputName", new HashMap<String, MergedLogicalInput>(),
- mock(InputReadyTracker.class), null);
+ return mock(MergedInputContext.class);
}
@Test(timeout = 5000)
@@ -67,7 +64,8 @@ public class TestSortedGroupedMergedInput {
sInputs.add(sInput1);
sInputs.add(sInput2);
sInputs.add(sInput3);
- OrderedGroupedMergedKVInput input = new OrderedGroupedMergedKVInput(createMergedInputContext(), sInputs);
+ MergedInputContext mockContext = createMergedInputContext();
+ OrderedGroupedMergedKVInput input = new OrderedGroupedMergedKVInput(mockContext, sInputs);
KeyValuesReader kvsReader = input.getReader();
int keyCount = 0;
@@ -84,6 +82,7 @@ public class TestSortedGroupedMergedInput {
}
assertEquals(6, valCount);
}
+ verify(mockContext, times(4)).notifyProgress(); // one for each reader change and one to exit
getNextFromFinishedReader(kvsReader);
}
@@ -120,7 +119,8 @@ public class TestSortedGroupedMergedInput {
sInputs.add(sInput2);
sInputs.add(sInput3);
- OrderedGroupedMergedKVInput input = new OrderedGroupedMergedKVInput(createMergedInputContext(), sInputs);
+ OrderedGroupedMergedKVInput input = new OrderedGroupedMergedKVInput(
+ createMergedInputContext(), sInputs);
KeyValuesReader kvsReader = input.getReader();
int keyCount = 0;
@@ -384,8 +384,8 @@ public class TestSortedGroupedMergedInput {
sInputs.add(sInput1);
sInputs.add(sInput2);
sInputs.add(sInput3);
- ConcatenatedMergedKeyValueInput input =
- new ConcatenatedMergedKeyValueInput(createMergedInputContext(), sInputs);
+ MergedInputContext mockContext = createMergedInputContext();
+ ConcatenatedMergedKeyValueInput input = new ConcatenatedMergedKeyValueInput(mockContext, sInputs);
KeyValueReader kvReader = input.getReader();
int keyCount = 0;
@@ -395,6 +395,7 @@ public class TestSortedGroupedMergedInput {
Integer value = (Integer) kvReader.getCurrentValue();
}
assertTrue(keyCount == 30);
+ verify(mockContext, times(4)).notifyProgress(); // one for each reader change and one to exit
getNextFromFinishedReader(kvReader);
}
@@ -418,8 +419,8 @@ public class TestSortedGroupedMergedInput {
sInputs.add(sInput1);
sInputs.add(sInput2);
sInputs.add(sInput3);
- ConcatenatedMergedKeyValuesInput input =
- new ConcatenatedMergedKeyValuesInput(createMergedInputContext(), sInputs);
+ MergedInputContext mockContext = createMergedInputContext();
+ ConcatenatedMergedKeyValuesInput input = new ConcatenatedMergedKeyValuesInput(mockContext, sInputs);
KeyValuesReader kvsReader = input.getReader();
int keyCount = 0;
@@ -435,6 +436,7 @@ public class TestSortedGroupedMergedInput {
assertEquals(2, valCount);
}
assertEquals(9, keyCount);
+ verify(mockContext, times(4)).notifyProgress(); // one for each reader change and one to exit
getNextFromFinishedReader(kvsReader);
}