You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/12/14 03:20:54 UTC
tez git commit: TEZ-2496. Consider scheduling tasks in
ShuffleVertexManager based on the partition sizes from the source.
(rbalamohan)
Repository: tez
Updated Branches:
refs/heads/branch-0.7 3e30e9498 -> 62e4b6ea1
TEZ-2496. Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source. (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/62e4b6ea
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/62e4b6ea
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/62e4b6ea
Branch: refs/heads/branch-0.7
Commit: 62e4b6ea1cd4021791ae3b133e8e80c9a82456d6
Parents: 3e30e94
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Mon Dec 14 07:50:22 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Mon Dec 14 07:50:22 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
pom.xml | 5 +
tez-runtime-library/findbugs-exclude.xml | 9 +
tez-runtime-library/pom.xml | 4 +
.../vertexmanager/ShuffleVertexManager.java | 178 ++++++++++++++++---
.../library/api/TezRuntimeConfiguration.java | 9 +
.../library/common/shuffle/ShuffleUtils.java | 37 +++-
.../common/sort/impl/ExternalSorter.java | 27 +++
.../common/sort/impl/PipelinedSorter.java | 16 +-
.../common/sort/impl/dflt/DefaultSorter.java | 23 ++-
.../output/OrderedPartitionedKVOutput.java | 12 +-
.../runtime/library/utils/DATA_RANGE_IN_MB.java | 49 +++++
.../src/main/proto/ShufflePayloads.proto | 1 +
.../vertexmanager/TestShuffleVertexManager.java | 150 +++++++++++++++-
.../common/shuffle/TestShuffleUtils.java | 6 +-
.../common/sort/impl/TestPipelinedSorter.java | 17 ++
.../sort/impl/dflt/TestDefaultSorter.java | 34 ++++
.../TestOrderedPartitionedKVEdgeConfig.java | 4 +
.../library/output/TestOnFileSortedOutput.java | 8 +
19 files changed, 546 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 60ea352..023be60 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES
+ TEZ-2496. Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source.
TEZ-2995. Timeline primary filter should only be on callerId and not type.
TEZ-2943. Change shuffle vertex manager to use per vertex data for auto
reduce and slow start
http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 169353f..bc8f487 100644
--- a/pom.xml
+++ b/pom.xml
@@ -181,6 +181,11 @@
<version>3.1.0</version>
</dependency>
<dependency>
+ <groupId>org.roaringbitmap</groupId>
+ <artifactId>RoaringBitmap</artifactId>
+ <version>0.4.9</version>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/findbugs-exclude.xml b/tez-runtime-library/findbugs-exclude.xml
index 45c194c..6b9e851 100644
--- a/tez-runtime-library/findbugs-exclude.xml
+++ b/tez-runtime-library/findbugs-exclude.xml
@@ -99,4 +99,13 @@
</Match>
+ <Match>
+ <Bug pattern="EI_EXPOSE_REP"/>
+ <Or>
+ <Class name="org.apache.tez.runtime.library.common.sort.impl.ExteralSorter" />
+ <Method name="getPartitionStats"/>
+ <Field name="partitionStats"/>
+ </Or>
+ </Match>
+
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/pom.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/pom.xml b/tez-runtime-library/pom.xml
index 9230576..a68ee88 100644
--- a/tez-runtime-library/pom.xml
+++ b/tez-runtime-library/pom.xml
@@ -26,6 +26,10 @@
<dependencies>
<dependency>
+ <groupId>org.roaringbitmap</groupId>
+ <artifactId>RoaringBitmap</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.tez</groupId>
<artifactId>tez-api</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 01dc5a0..1950df2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -28,6 +28,9 @@ import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB;
+import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -59,12 +62,17 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexMan
import javax.annotation.Nullable;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
+import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -136,7 +144,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
int totalNumBipartiteSourceTasks = 0;
int numBipartiteSourceTasksCompleted = 0;
int numVertexManagerEventsReceived = 0;
- List<Integer> pendingTasks = Lists.newLinkedList();
+ List<PendingTaskInfo> pendingTasks = Lists.newLinkedList();
int totalTasksToSchedule = 0;
private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
@@ -150,6 +158,12 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
long completedSourceTasksOutputSize = 0;
List<VertexStateUpdate> pendingStateUpdates = Lists.newArrayList();
+ private int[][] targetIndexes;
+ private int basePartitionRange;
+ private int remainderRangeForLastShuffler;
+ @VisibleForTesting
+ long[] stats; //approximate amount of data to be fetched
+
static class SourceVertexInfo {
EdgeProperty edgeProperty;
boolean vertexIsConfigured;
@@ -172,10 +186,32 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
}
}
+ static class PendingTaskInfo {
+ private int index;
+ private long outputStats;
+
+ public PendingTaskInfo(int index) {
+ this.index = index;
+ }
+
+ public String toString() {
+ return "[index=" + index + ", outputStats=" + outputStats + "]";
+ }
+ }
+
public ShuffleVertexManager(VertexManagerPluginContext context) {
super(context);
}
+ static int[] createIndices(int partitionRange, int taskIndex, int offSetPerTask) {
+ int startIndex = taskIndex * offSetPerTask;
+ int[] indices = new int[partitionRange];
+ for (int currentIndex = 0; currentIndex < partitionRange; ++currentIndex) {
+ indices[currentIndex] = (startIndex + currentIndex);
+ }
+ return indices;
+ }
+
public static class CustomShuffleEdgeManager extends EdgeManagerPluginOnDemand {
int numSourceTaskOutputs;
int numDestinationTasks;
@@ -242,7 +278,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
} else {
partitionRange = remainderRangeForLastShuffler;
}
-
+
// all inputs from a source task are next to each other in original order
int targetIndex =
sourceTaskIndex * partitionRange
@@ -273,14 +309,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
return EventRouteMetadata.create(1, new int[]{targetIndex});
}
- private int[] createIndices(int partitionRange, int taskIndex, int offSetPerTask) {
- int startIndex = taskIndex * offSetPerTask;
- int[] indices = new int[partitionRange];
- for (int currentIndex = 0; currentIndex < partitionRange; ++currentIndex) {
- indices[currentIndex] = (startIndex + currentIndex);
- }
- return indices;
- }
+
@Override
public void prepareForRouting() throws Exception {
@@ -494,6 +523,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
schedulePendingTasks();
}
+
@Override
public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
String srcVertexName = attempt.getTaskIdentifier().getVertexIdentifier().getName();
@@ -516,7 +546,25 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
}
schedulePendingTasks();
}
-
+
+ @VisibleForTesting
+ void parsePartitionStats(RoaringBitmap partitionStats) {
+ Preconditions.checkState(stats != null, "Stats should be initialized");
+ Iterator<Integer> it = partitionStats.iterator();
+ final DATA_RANGE_IN_MB[] RANGES = DATA_RANGE_IN_MB.values();
+ final int RANGE_LEN = RANGES.length;
+ while (it.hasNext()) {
+ int pos = it.next();
+ int index = ((pos) / RANGE_LEN);
+ int rangeIndex = ((pos) % RANGE_LEN);
+ //Add to aggregated stats and normalize to DATA_RANGE_IN_MB.
+ if (RANGES[rangeIndex].getSizeInMB() > 0) {
+ stats[index] += RANGES[rangeIndex].getSizeInMB();
+ }
+ }
+ }
+
+
@Override
public synchronized void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
// currently events from multiple attempts of the same task can be ignored because
@@ -544,6 +592,19 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
}
sourceTaskOutputSize = proto.getOutputSize();
+ if (proto.hasPartitionStats()) {
+ try {
+ RoaringBitmap partitionStats = new RoaringBitmap();
+ ByteString compressedPartitionStats = proto.getPartitionStats();
+ byte[] rawData = TezCommonUtils.decompressByteStringToByteArray(compressedPartitionStats);
+ ByteArrayInputStream bin = new ByteArrayInputStream(rawData);
+ partitionStats.deserialize(new DataInputStream(bin));
+
+ parsePartitionStats(partitionStats);
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
srcInfo.numVMEventsReceived++;
srcInfo.outputSize += sourceTaskOutputSize;
completedSourceTasksOutputSize += sourceTaskOutputSize;
@@ -558,13 +619,17 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
+ " total output size: " + completedSourceTasksOutputSize);
}
}
-
+
+
void updatePendingTasks() {
pendingTasks.clear();
- for (int i=0; i<getContext().getVertexNumTasks(getContext().getVertexName()); ++i) {
- pendingTasks.add(i);
+ for (int i = 0; i < getContext().getVertexNumTasks(getContext().getVertexName()); ++i) {
+ pendingTasks.add(new PendingTaskInfo(i));
}
totalTasksToSchedule = pendingTasks.size();
+ if (stats == null) {
+ stats = new long[totalTasksToSchedule]; // TODO lost previous data
+ }
}
Iterable<Map.Entry<String, SourceVertexInfo>> getBipartiteInfo() {
@@ -633,7 +698,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
}
// most shufflers will be assigned this range
- int basePartitionRange = currentParallelism/desiredTaskParallelism;
+ basePartitionRange = currentParallelism/desiredTaskParallelism;
if (basePartitionRange <= 1) {
// nothing to do if range is equal 1 partition. shuffler does it by default
@@ -641,7 +706,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
}
int numShufflersWithBaseRange = currentParallelism / basePartitionRange;
- int remainderRangeForLastShuffler = currentParallelism % basePartitionRange;
+ remainderRangeForLastShuffler = currentParallelism % basePartitionRange;
int finalTaskParallelism = (remainderRangeForLastShuffler > 0) ?
(numShufflersWithBaseRange + 1) : (numShufflersWithBaseRange);
@@ -678,14 +743,30 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
oldEdgeProp.getEdgeSource(), oldEdgeProp.getEdgeDestination());
edgeProperties.put(vertex, newEdgeProp);
}
-
+
getContext().reconfigureVertex(finalTaskParallelism, null, edgeProperties);
-
updatePendingTasks();
+ configureTargetMapping(finalTaskParallelism);
}
return true;
}
+ void configureTargetMapping(int tasks) {
+ targetIndexes = new int[tasks][];
+ for (int idx = 0; idx < tasks; ++idx) {
+ int partitionRange = basePartitionRange;
+ if (idx == (tasks - 1)) {
+ partitionRange = ((remainderRangeForLastShuffler > 0)
+ ? remainderRangeForLastShuffler : basePartitionRange);
+ }
+ // skip the basePartitionRange per destination task
+ targetIndexes[idx] = createIndices(partitionRange, idx, basePartitionRange);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("targetIdx[" + idx + "] to " + Arrays.toString(targetIndexes[idx]));
+ }
+ }
+ }
+
void schedulePendingTasks(int numTasksToSchedule, float minSourceVertexCompletedTaskFraction) {
// determine parallelism before scheduling the first time
// this is the latest we can wait before determining parallelism.
@@ -702,11 +783,15 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
}
getContext().doneReconfiguringVertex();
}
+ if (totalNumBipartiteSourceTasks > 0) {
+ //Sort in case partition stats are available
+ sortPendingTasksBasedOnDataSize();
+ }
List<ScheduleTaskRequest> scheduledTasks = Lists.newArrayListWithCapacity(numTasksToSchedule);
while(!pendingTasks.isEmpty() && numTasksToSchedule > 0) {
numTasksToSchedule--;
- Integer taskIndex = pendingTasks.get(0);
+ Integer taskIndex = pendingTasks.get(0).index;
scheduledTasks.add(ScheduleTaskRequest.create(taskIndex, null));
pendingTasks.remove(0);
}
@@ -718,6 +803,59 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
}
}
+ private void sortPendingTasksBasedOnDataSize() {
+ //Get partition sizes from all source vertices
+ boolean statsUpdated = computePartitionSizes();
+
+ if (statsUpdated) {
+ //Order the pending tasks based on task size in reverse order
+ Collections.sort(pendingTasks, new Comparator<PendingTaskInfo>() {
+ @Override
+ public int compare(PendingTaskInfo left, PendingTaskInfo right) {
+ return (left.outputStats > right.outputStats) ? -1 :
+ ((left.outputStats == right.outputStats) ? 0 : 1);
+ }
+ });
+
+ if (LOG.isDebugEnabled()) {
+ for (PendingTaskInfo pendingTask : pendingTasks) {
+ LOG.debug("Pending task:" + pendingTask.toString());
+ }
+ }
+ }
+ }
+
+ /**
+ * Compute partition sizes in case statistics are available in vertex.
+ *
+ * @return boolean indicating whether stats are computed
+ */
+ private synchronized boolean computePartitionSizes() {
+ boolean computedPartitionSizes = false;
+ for (PendingTaskInfo taskInfo : pendingTasks) {
+ int index = taskInfo.index;
+ if (targetIndexes != null) { //parallelism has changed.
+ Preconditions.checkState(index < targetIndexes.length,
+ "index=" + index +", targetIndexes length=" + targetIndexes.length);
+ int[] mapping = targetIndexes[index];
+ long totalStats = 0;
+ for (int i : mapping) {
+ totalStats += stats[i];
+ }
+ if ((totalStats > 0) && (taskInfo.outputStats != totalStats)) {
+ computedPartitionSizes = true;
+ taskInfo.outputStats = totalStats;
+ }
+ } else {
+ if ((stats[index] > 0) && (stats[index] != taskInfo.outputStats)) {
+ computedPartitionSizes = true;
+ taskInfo.outputStats = stats[index];
+ }
+ }
+ }
+ return computedPartitionSizes;
+ }
+
/**
* Verify whether each of the source vertices have completed at least 1 task
*
@@ -865,7 +1003,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
+ slowStartMaxSrcCompletionFraction + " auto:" + enableAutoParallelism
+ " desiredTaskIput:" + desiredTaskInputDataSize + " minTasks:"
+ minTaskParallelism);
-
+
if (enableAutoParallelism) {
getContext().vertexReconfigurationPlanned();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index 3cfbf8e..440c9f4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -134,6 +134,14 @@ public class TezRuntimeConfiguration {
public static final int TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS_DEFAULT = 2;
/**
+ * Report partition statistics (e.g better scheduling in ShuffleVertexManager). TEZ-2496
+ * This can be enabled/disabled at vertex level.
+ */
+ public static final String TEZ_RUNTIME_REPORT_PARTITION_STATS = TEZ_RUNTIME_PREFIX +
+ "report.partition.stats";
+ public static final boolean TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT = true;
+
+ /**
* Size of the buffer to use if not writing directly to disk.
*/
public static final String TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB = TEZ_RUNTIME_PREFIX +
@@ -457,6 +465,7 @@ public class TezRuntimeConfiguration {
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION);
+ tezRuntimeKeys.add(TEZ_RUNTIME_REPORT_PARTITION_STATS);
tezRuntimeKeys.add(TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT);
tezRuntimeKeys.add(TEZ_RUNTIME_GROUP_COMPARATOR_CLASS);
tezRuntimeKeys.add(TEZ_RUNTIME_INTERNAL_SORTER_CLASS);
http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 1873485..8aca3af 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -29,11 +29,15 @@ import java.text.DecimalFormat;
import java.util.BitSet;
import java.util.List;
+import javax.annotation.Nullable;
import javax.crypto.SecretKey;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB;
+import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -406,12 +410,13 @@ public class ShuffleUtils {
* @param spillRecord
* @param numPhysicalOutputs
* @param pathComponent
+ * @param partitionStats
* @throws IOException
*/
public static void generateEventOnSpill(List<Event> eventList, boolean finalMergeEnabled,
boolean isLastEvent, OutputContext context, int spillId, TezSpillRecord spillRecord,
- int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String pathComponent)
- throws IOException {
+ int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String pathComponent,
+ @Nullable long[] partitionStats) throws IOException {
Preconditions.checkArgument(eventList != null, "EventList can't be null");
if (finalMergeEnabled) {
@@ -439,6 +444,16 @@ public class ShuffleUtils {
// up adding up to final outputsize. This is needed for auto-reduce parallelism to work
// properly.
vmBuilder.setOutputSize(outputSize);
+
+ //set partition stats
+ if (partitionStats != null && partitionStats.length > 0) {
+ RoaringBitmap stats = getPartitionStatsForPhysicalOutput(partitionStats);
+ DataOutputBuffer dout = new DataOutputBuffer();
+ stats.serialize(dout);
+ ByteString partitionStatsBytes = TezCommonUtils.compressByteArrayToByteString(dout.getData());
+ vmBuilder.setPartitionStats(partitionStatsBytes);
+ }
+
VertexManagerEvent vmEvent = VertexManagerEvent.create(
context.getDestinationVertexName(), vmBuilder.build().toByteString().asReadOnlyByteBuffer());
eventList.add(vmEvent);
@@ -450,6 +465,24 @@ public class ShuffleUtils {
eventList.add(csdme);
}
+ /**
+ * Data size for the destinations
+ *
+ * @param sizes for physical outputs
+ */
+ public static RoaringBitmap getPartitionStatsForPhysicalOutput(long[] sizes) {
+ RoaringBitmap partitionStats = new RoaringBitmap();
+ if (sizes == null || sizes.length == 0) {
+ return partitionStats;
+ }
+ final int RANGE_LEN = DATA_RANGE_IN_MB.values().length;
+ for (int i = 0; i < sizes.length; i++) {
+ int bucket = DATA_RANGE_IN_MB.getRange(sizes[i]).ordinal();
+ int index = i * (RANGE_LEN);
+ partitionStats.add(index + bucket);
+ }
+ return partitionStats;
+ }
/**
http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index aba04e0..ac5acb8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -26,6 +26,7 @@ import java.util.Map;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
+import org.apache.tez.runtime.api.OutputStatisticsReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -69,6 +70,7 @@ public abstract class ExternalSorter {
public void close() throws IOException {
spillFileIndexPaths.clear();
spillFilePaths.clear();
+ reportStatistics();
}
public abstract void flush() throws IOException;
@@ -116,6 +118,8 @@ public abstract class ExternalSorter {
protected Path finalIndexFile;
protected int numSpills;
+ protected OutputStatisticsReporter statsReporter;
+ protected final long[] partitionStats;
protected final boolean finalMergeEnabled;
protected final boolean sendEmptyPartitionDetails;
@@ -152,6 +156,10 @@ public abstract class ExternalSorter {
this.outputContext = outputContext;
this.conf = conf;
this.partitions = numOutputs;
+ boolean reportPartitionStats = conf.getBoolean(TezRuntimeConfiguration
+ .TEZ_RUNTIME_REPORT_PARTITION_STATS,
+ TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT);
+ this.partitionStats = (reportPartitionStats) ? (new long[partitions]) : null;
rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw();
@@ -241,6 +249,8 @@ public abstract class ExternalSorter {
this.conf.setInt(TezRuntimeFrameworkConfigs.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, this.partitions);
this.partitioner = TezRuntimeUtils.instantiatePartitioner(this.conf);
this.combiner = TezRuntimeUtils.instantiateCombiner(this.conf, outputContext);
+
+ this.statsReporter = outputContext.getStatisticsReporter();
this.finalMergeEnabled = conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT,
TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT);
@@ -339,4 +349,21 @@ public abstract class ExternalSorter {
public int getNumSpills() {
return numSpills;
}
+
+ public long[] getPartitionStats() {
+ return partitionStats;
+ }
+
+ protected boolean reportPartitionStats() {
+ return (partitionStats != null);
+ }
+
+ protected synchronized void reportStatistics() {
+ // This works for non-started outputs since new counters will be created with an initial value of 0
+ long outputSize = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue();
+ statsReporter.reportDataSize(outputSize);
+ long outputRecords = outputContext.getCounters()
+ .findCounter(TaskCounter.OUTPUT_RECORDS).getValue();
+ statsReporter.reportItemsProcessed(outputRecords);
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 049087b..9708d7c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -271,7 +271,7 @@ public class PipelinedSorter extends ExternalSorter {
String pathComponent = (outputContext.getUniqueIdentifier() + "_" + (numSpills-1));
ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false, outputContext,
(numSpills - 1), indexCacheList.get(numSpills - 1), partitions, sendEmptyPartitionDetails,
- pathComponent);
+ pathComponent, partitionStats);
outputContext.sendEvents(events);
LOG.info(outputContext.getDestinationVertexName() +
": Added spill event for spill (final update=false), spillId=" + (numSpills - 1));
@@ -476,6 +476,9 @@ public class PipelinedSorter extends ExternalSorter {
writer.getRawLength(),
writer.getCompressedLength());
spillRec.putIndex(rec, i);
+ if (!isFinalMergeEnabled() && reportPartitionStats()) {
+ partitionStats[i] += writer.getCompressedLength();
+ }
}
Path indexFilename =
@@ -538,7 +541,7 @@ public class PipelinedSorter extends ExternalSorter {
String pathComponent = (outputContext.getUniqueIdentifier() + "_" + i);
ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent,
outputContext, i, indexCacheList.get(i), partitions,
- sendEmptyPartitionDetails, pathComponent);
+ sendEmptyPartitionDetails, pathComponent, partitionStats);
LOG.info(outputContext.getDestinationVertexName() + ": Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i);
}
outputContext.sendEvents(events);
@@ -564,6 +567,12 @@ public class PipelinedSorter extends ExternalSorter {
+ "finalIndexFile=" + finalIndexFile + ", filename=" + filename + ", indexFilename=" +
indexFilename);
}
+ TezSpillRecord spillRecord = new TezSpillRecord(finalIndexFile, conf);
+ if (reportPartitionStats()) {
+ for (int i = 0; i < spillRecord.size(); i++) {
+ partitionStats[i] += spillRecord.getIndex(i).getPartLength();
+ }
+ }
numShuffleChunks.setValue(numSpills);
fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen());
return;
@@ -638,6 +647,9 @@ public class PipelinedSorter extends ExternalSorter {
writer.getRawLength(),
writer.getCompressedLength());
spillRec.putIndex(rec, parts);
+ if (reportPartitionStats()) {
+ partitionStats[parts] += writer.getCompressedLength();
+ }
}
numShuffleChunks.setValue(1); //final merge has happened.
http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index ac90112..6c15a5d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -699,8 +699,6 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
}
}
- @Override
- public void close() throws IOException { }
protected class SpillThread extends Thread {
@@ -883,6 +881,9 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
writer.getRawLength(),
writer.getCompressedLength());
spillRec.putIndex(rec, i);
+ if (!isFinalMergeEnabled() && reportPartitionStats()) {
+ partitionStats[i] += writer.getCompressedLength();
+ }
writer = null;
} finally {
@@ -1079,7 +1080,8 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
String pathComponent = (outputContext.getUniqueIdentifier() + "_" + index);
ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent,
- outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent);
+ outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent,
+ partitionStats);
LOG.info(outputContext.getDestinationVertexName() + ": " +
"Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index);
@@ -1127,19 +1129,22 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
}
if (numSpills == 1) { //the spill is the final output
+ TezSpillRecord spillRecord = null;
if (isFinalMergeEnabled()) {
finalOutputFile = mapOutputFile.getOutputFileForWriteInVolume(filename[0]);
finalIndexFile = mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]);
sameVolRename(filename[0], finalOutputFile);
if (indexCacheList.size() == 0) {
sameVolRename(spillFileIndexPaths.get(0), finalIndexFile);
+ spillRecord = new TezSpillRecord(finalIndexFile, conf);
} else {
- indexCacheList.get(0).writeToFile(finalIndexFile, conf);
+ spillRecord = indexCacheList.get(0);
+ spillRecord.writeToFile(finalIndexFile, conf);
}
} else {
List<Event> events = Lists.newLinkedList();
//Since there is only one spill, spill record would be present in cache.
- TezSpillRecord spillRecord = indexCacheList.get(0);
+ spillRecord = indexCacheList.get(0);
Path indexPath = mapOutputFile.getSpillIndexFileForWrite(numSpills-1, partitions *
MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRecord.writeToFile(indexPath, conf);
@@ -1147,6 +1152,11 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
fileOutputByteCounter.increment(rfs.getFileStatus(spillFilePaths.get(0)).getLen());
//No need to populate finalIndexFile, finalOutputFile etc when finalMerge is disabled
}
+ if (spillRecord != null && reportPartitionStats()) {
+ for(int i=0; i < spillRecord.size(); i++) {
+ partitionStats[i] += spillRecord.getIndex(i).getPartLength();
+ }
+ }
numShuffleChunks.setValue(numSpills);
return;
}
@@ -1276,6 +1286,9 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
writer.getRawLength(),
writer.getCompressedLength());
spillRec.putIndex(rec, parts);
+ if (reportPartitionStats()) {
+ partitionStats[parts] += writer.getCompressedLength();
+ }
}
numShuffleChunks.setValue(1); //final merge has happened
spillRec.writeToFile(finalIndexFile, conf);
http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 2b4c0f4..e418c1b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.api.AbstractLogicalOutput;
import org.apache.tez.runtime.api.Event;
@@ -191,13 +190,6 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
returnEvents = generateEmptyEvents();
}
- // This works for non-started outputs since new counters will be created with an initial value of 0
- long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue();
- getContext().getStatisticsReporter().reportDataSize(outputSize);
- long outputRecords = getContext().getCounters()
- .findCounter(TaskCounter.OUTPUT_RECORDS).getValue();
- getContext().getStatisticsReporter().reportItemsProcessed(outputRecords);
-
return returnEvents;
}
@@ -207,7 +199,8 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
boolean isLastEvent = true;
ShuffleUtils.generateEventOnSpill(eventList, finalMergeEnabled, isLastEvent,
getContext(), 0, new TezSpillRecord(sorter.getFinalIndexFile(), conf),
- getNumPhysicalOutputs(), sendEmptyPartitionDetails, getContext().getUniqueIdentifier());
+ getNumPhysicalOutputs(), sendEmptyPartitionDetails, getContext().getUniqueIdentifier(),
+ sorter.getPartitionStats());
}
return eventList;
}
@@ -228,6 +221,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS);
http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/DATA_RANGE_IN_MB.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/DATA_RANGE_IN_MB.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/DATA_RANGE_IN_MB.java
new file mode 100644
index 0000000..126f04e
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/DATA_RANGE_IN_MB.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.utils;
+
+import org.apache.commons.math3.util.FastMath;
+
+public enum DATA_RANGE_IN_MB {
+ THOUSAND(1000), HUNDRED(100), TEN(10), ONE(1), ZERO(0);
+
+ private final int sizeInMB;
+
+ private DATA_RANGE_IN_MB(int sizeInMB) {
+ this.sizeInMB = sizeInMB;
+ }
+
+ public final int getSizeInMB() {
+ return sizeInMB;
+ }
+
+ static long ceil(long a, long b) {
+ return (a + (b - 1)) / b;
+ }
+
+ public static final DATA_RANGE_IN_MB getRange(long sizeInBytes) {
+ long sizeInMB = ceil(sizeInBytes, (1024l * 1024l));
+ for (DATA_RANGE_IN_MB range : values()) {
+ if (sizeInMB >= range.sizeInMB) {
+ return range;
+ }
+ }
+ return ZERO;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/proto/ShufflePayloads.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/proto/ShufflePayloads.proto b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
index f7b482d..9b0fc16 100644
--- a/tez-runtime-library/src/main/proto/ShufflePayloads.proto
+++ b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
@@ -43,6 +43,7 @@ message InputInformationEventPayloadProto {
message VertexManagerEventPayloadProto {
optional int64 output_size = 1;
+ optional bytes partition_stats = 2;
}
message ShuffleEdgeManagerConfigPayloadProto {
http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 9a9ff27..9d53ebc 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -18,10 +18,14 @@
package org.apache.tez.dag.library.vertexmanager;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
@@ -48,11 +52,13 @@ import org.apache.tez.runtime.api.VertexIdentifier;
import org.apache.tez.runtime.api.VertexStatistics;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.roaringbitmap.RoaringBitmap;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -301,6 +307,43 @@ public class TestShuffleVertexManager {
Assert.assertEquals(5000L, manager.completedSourceTasksOutputSize);
/**
+ * Test partition stats
+ */
+ scheduledTasks.clear();
+ //{5,9,12,18} in bitmap
+ long[] sizes = new long[]{(0l), (1000l * 1000l),
+ (1010 * 1000l * 1000l), (50 * 1000l * 1000l)};
+ vmEvent = getVertexManagerEvent(sizes, 1L, "Vertex");
+
+ manager = createManager(conf, mockContext, 0.01f, 0.75f);
+ manager.onVertexStarted(emptyCompletions);
+ Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
+ Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
+
+ TezTaskAttemptID taId1 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0");
+ vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1, taId1));
+ manager.onVertexManagerEventReceived(vmEvent);
+ Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
+
+ Assert.assertEquals(4, manager.stats.length);
+ Assert.assertEquals(0, manager.stats[0]); //0 MB bucket
+ Assert.assertEquals(1, manager.stats[1]); //1 MB bucket
+ Assert.assertEquals(100, manager.stats[2]); //100 MB bucket
+ Assert.assertEquals(10, manager.stats[3]); //10 MB bucket
+
+ // sending again from a different version of the same task has not impact
+ TezTaskAttemptID taId2 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_1");
+ vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1, taId2));
+ manager.onVertexManagerEventReceived(vmEvent);
+ Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
+
+ Assert.assertEquals(4, manager.stats.length);
+ Assert.assertEquals(0, manager.stats[0]); //0 MB bucket
+ Assert.assertEquals(1, manager.stats[1]); //1 MB bucket
+ Assert.assertEquals(100, manager.stats[2]); //100 MB bucket
+ Assert.assertEquals(10, manager.stats[3]); //10 MB bucket
+
+ /**
* Test for TEZ-978
* Delay determining parallelism until enough data has been received.
*/
@@ -511,6 +554,7 @@ public class TestShuffleVertexManager {
String mockManagedVertexId = "Vertex4";
VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+ when(mockContext.getVertexStatistics(any(String.class))).thenReturn(mock(VertexStatistics.class));
when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
@@ -631,7 +675,7 @@ public class TestShuffleVertexManager {
when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
// source vertex have some tasks. min, max == 0
- manager = createManager(conf, mockContext, 0.f, 0.f);
+ manager = createManager(conf, mockContext, 0.0f, 0.0f);
manager.onVertexStarted(emptyCompletions);
Assert.assertTrue(manager.totalTasksToSchedule == 3);
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
@@ -926,7 +970,6 @@ public class TestShuffleVertexManager {
throws IOException {
ByteBuffer payload = null;
if (sizes != null) {
- /*
RoaringBitmap partitionStats = ShuffleUtils.getPartitionStatsForPhysicalOutput(sizes);
DataOutputBuffer dout = new DataOutputBuffer();
partitionStats.serialize(dout);
@@ -938,7 +981,6 @@ public class TestShuffleVertexManager {
.setPartitionStats(partitionStatsBytes)
.build().toByteString()
.asReadOnlyByteBuffer();
- */
} else {
payload =
VertexManagerEventPayloadProto.newBuilder()
@@ -954,6 +996,108 @@ public class TestShuffleVertexManager {
}
@Test(timeout = 5000)
+ public void testSchedulingWithPartitionStats() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(
+ ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
+ true);
+ conf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, 1000L);
+ ShuffleVertexManager manager = null;
+
+ HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>();
+ String r1 = "R1";
+ EdgeProperty eProp1 = EdgeProperty.create(
+ EdgeProperty.DataMovementType.SCATTER_GATHER,
+ EdgeProperty.DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create("out"),
+ InputDescriptor.create("in"));
+ String m2 = "M2";
+ EdgeProperty eProp2 = EdgeProperty.create(
+ EdgeProperty.DataMovementType.BROADCAST,
+ EdgeProperty.DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create("out"),
+ InputDescriptor.create("in"));
+ String m3 = "M3";
+ EdgeProperty eProp3 = EdgeProperty.create(
+ EdgeProperty.DataMovementType.BROADCAST,
+ EdgeProperty.DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create("out"),
+ InputDescriptor.create("in"));
+
+ final String mockManagedVertexId = "R2";
+
+ mockInputVertices.put(r1, eProp1);
+ mockInputVertices.put(m2, eProp2);
+ mockInputVertices.put(m3, eProp3);
+
+ VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+ when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
+ when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
+ when(mockContext.getVertexNumTasks(r1)).thenReturn(3);
+ when(mockContext.getVertexNumTasks(m2)).thenReturn(3);
+ when(mockContext.getVertexNumTasks(m3)).thenReturn(3);
+
+ final List<Integer> scheduledTasks = Lists.newLinkedList();
+ doAnswer(new Answer() {
+ public Object answer(InvocationOnMock invocation) {
+ Object[] args = invocation.getArguments();
+ scheduledTasks.clear();
+ List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
+ for (ScheduleTaskRequest task : tasks) {
+ scheduledTasks.add(task.getTaskIndex());
+ }
+ return null;
+ }}).when(mockContext).scheduleTasks(anyList());
+
+ // check initialization
+ manager = createManager(conf, mockContext, 0.001f, 0.001f);
+ manager.onVertexStarted(emptyCompletions);
+ Assert.assertTrue(manager.bipartiteSources == 1);
+
+ manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
+
+ Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled
+ Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks);
+ Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
+
+ //Send an event for r1.
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
+ Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
+
+ //Tasks should be scheduled in task 2, 0, 1 order
+ long[] sizes = new long[]{(100 * 1000l * 1000l), (0l), (5000 * 1000l * 1000l)};
+ VertexManagerEvent vmEvent = getVertexManagerEvent(sizes, 1060000000, r1);
+ manager.onVertexManagerEventReceived(vmEvent); //send VM event
+
+ //stats from another vertex (more of empty stats)
+ sizes = new long[]{(0l), (0l), (0l)};
+ vmEvent = getVertexManagerEvent(sizes, 1060000000, r1);
+ manager.onVertexManagerEventReceived(vmEvent); //send VM event
+
+ //Send an event for m2.
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0));
+ Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
+
+ //Send an event for m3.
+ manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0));
+ Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
+ Assert.assertTrue(scheduledTasks.size() == 3);
+
+ //Order of scheduling should be 2,0,1 based on the available partition statistics
+ Assert.assertTrue(scheduledTasks.get(0) == 2);
+ Assert.assertTrue(scheduledTasks.get(1) == 0);
+ Assert.assertTrue(scheduledTasks.get(2) == 1);
+ }
+
+ @Test(timeout = 5000)
public void test_Tez1649_with_mixed_edges() {
Configuration conf = new Configuration();
conf.setBoolean(
http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index 2e264f6..9f9cd59 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -148,7 +148,7 @@ public class TestShuffleUtils {
int physicalOutputs = 10;
String pathComponent = "/attempt_x_y_0/file.out";
ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, outputContext,
- spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent);
+ spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent, null);
Assert.assertTrue(events.size() == 1);
Assert.assertTrue(events.get(0) instanceof CompositeDataMovementEvent);
@@ -186,7 +186,7 @@ public class TestShuffleUtils {
//normal code path where we do final merge all the time
ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, outputContext,
- spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent);
+ spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent, null);
Assert.assertTrue(events.size() == 2); //one for VM
Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);
@@ -226,7 +226,7 @@ public class TestShuffleUtils {
//normal code path where we do final merge all the time
ShuffleUtils.generateEventOnSpill(events, finalMergeDisabled, isLastEvent, outputContext,
- spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent);
+ spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent, null);
Assert.assertTrue(events.size() == 2); //one for VM
Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);
http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index 4aa53eb..129f4d1 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -19,6 +19,7 @@ import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.OutputStatisticsReporter;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
@@ -131,6 +132,14 @@ public class TestPipelinedSorter {
}
@Test
+ public void testWithoutPartitionStats() throws IOException {
+ conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, false);
+ //# partition, # of keys, size per key, InitialMem, blockSize
+ basicTest(1, 0, 0, (10 * 1024l * 1024l), 3 << 20);
+ conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, true);
+ }
+
+ @Test
public void testWithEmptyData() throws IOException {
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5);
//# partition, # of keys, size per key, InitialMem, blockSize
@@ -328,6 +337,13 @@ public class TestPipelinedSorter {
writeData(sorter, numKeys, keySize);
+ //partition stats;
+ boolean partitionStats = conf.getBoolean(TezRuntimeConfiguration
+ .TEZ_RUNTIME_REPORT_PARTITION_STATS, TezRuntimeConfiguration
+ .TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT);
+ if (partitionStats) {
+ assertTrue(sorter.getPartitionStats() != null);
+ }
verifyCounters(sorter, outputContext);
Path outputFile = sorter.finalOutputFile;
@@ -471,6 +487,7 @@ public class TestPipelinedSorter {
(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
doReturn(execContext).when(outputContext).getExecutionContext();
+ doReturn(mock(OutputStatisticsReporter.class)).when(outputContext).getStatisticsReporter();
doReturn(counters).when(outputContext).getCounters();
doReturn(appId).when(outputContext).getApplicationId();
doReturn(1).when(outputContext).getDAGAttemptNumber();
http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
index ecc44a7..4022525 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
@@ -53,6 +53,7 @@ import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.OutputStatisticsReporter;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
@@ -307,6 +308,38 @@ public class TestDefaultSorter {
}
}
+ void testPartitionStats(boolean withStats) throws IOException {
+ conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, withStats);
+ OutputContext context = createTezOutputContext();
+
+ conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
+ conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4);
+ MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
+ context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
+ context.getTotalMemoryAvailableToTask()), handler);
+ DefaultSorter sorter = new DefaultSorter(context, conf, 1, handler.getMemoryAssigned());
+
+ writeData(sorter, 1000, 10);
+ assertTrue(sorter.getNumSpills() == 1);
+ verifyCounters(sorter, context);
+
+ if (withStats) {
+ assertTrue(sorter.getPartitionStats() != null);
+ } else {
+ assertTrue(sorter.getPartitionStats() == null);
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testWithPartitionStats() throws IOException {
+ testPartitionStats(true);
+ }
+
+ @Test(timeout = 60000)
+ public void testWithoutPartitionStats() throws IOException {
+ testPartitionStats(false);
+ }
+
@Test(timeout = 60000)
@SuppressWarnings("unchecked")
public void testWithSingleSpillWithFinalMergeDisabled() throws IOException {
@@ -418,6 +451,7 @@ public class TestDefaultSorter {
OutputContext context = mock(OutputContext.class);
ExecutionContext execContext = new ExecutionContextImpl("localhost");
+ doReturn(mock(OutputStatisticsReporter.class)).when(context).getStatisticsReporter();
doReturn(execContext).when(context).getExecutionContext();
doReturn(counters).when(context).getCounters();
doReturn(workingDirs).when(context).getWorkDirs();
http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
index c72dd52..f57731c 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
@@ -127,6 +127,7 @@ public class TestOrderedPartitionedKVEdgeConfig {
additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, "1111");
additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, "0.22f");
additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, "CustomSorter");
+ additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, "true");
additionalConfs.put("file.shouldExist", "file");
OrderedPartitionedKVEdgeConfig.Builder builder = OrderedPartitionedKVEdgeConfig
@@ -163,6 +164,9 @@ public class TestOrderedPartitionedKVEdgeConfig {
assertEquals("io", outputConf.get("io.shouldExist"));
assertEquals("file", outputConf.get("file.shouldExist"));
assertEquals("fs", outputConf.get("fs.shouldExist"));
+ assertEquals(true, outputConf.getBoolean(TezRuntimeConfiguration
+ .TEZ_RUNTIME_REPORT_PARTITION_STATS,
+ TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT));
assertEquals(3, inputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, 0));
http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
index 19eb18a..8942f4b 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
@@ -35,6 +35,7 @@ import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.OutputStatisticsReporter;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.api.KeyValuesWriter;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
@@ -166,6 +167,7 @@ public class TestOnFileSortedOutput {
private void startSortedOutput(int partitions) throws Exception {
OutputContext context = createTezOutputContext();
+ conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, true);
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4);
UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
doReturn(payLoad).when(context).getUserPayload();
@@ -292,6 +294,12 @@ public class TestOnFileSortedOutput {
.parseFrom(
ByteString.copyFrom(((CompositeDataMovementEvent) eventList.get(1)).getUserPayload()));
+ ShuffleUserPayloads.VertexManagerEventPayloadProto
+ vmPayload = ShuffleUserPayloads.VertexManagerEventPayloadProto
+ .parseFrom(
+ ByteString.copyFrom(((VertexManagerEvent) eventList.get(0)).getUserPayload()));
+
+ assertTrue(vmPayload.hasPartitionStats());
assertEquals(HOST, payload.getHost());
assertEquals(PORT, payload.getPort());
assertEquals(UniqueID, payload.getPathComponent());