You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/09/15 14:50:36 UTC
[2/4] flink git commit: [FLINK-2655] Minimize intermediate merging of
external merge sort.
[FLINK-2655] Minimize intermediate merging of external merge sort.
This closes #1118
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a75dd627
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a75dd627
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a75dd627
Branch: refs/heads/master
Commit: a75dd6270fd47720ae1c2ce9464ff6b8f7b43d39
Parents: e78b80c
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Sep 10 10:16:35 2015 -0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Sep 15 12:20:01 2015 +0200
----------------------------------------------------------------------
.../operators/sort/UnilateralSortMerger.java | 47 +++++++++++---------
1 file changed, 27 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a75dd627/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index fd1062d..32fbb52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -1513,11 +1513,11 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
}
/**
- * Merges the given sorted runs to a smaller number of sorted runs.
- *
+ * Merges the given sorted runs to a smaller number of sorted runs.
+ *
* @param channelIDs The IDs of the sorted runs that need to be merged.
+ * @param allReadBuffers
* @param writeBuffers The buffers to be used by the writers.
-
* @return A list of the IDs of the merged channels.
* @throws IOException Thrown, if the readers or writers encountered an I/O problem.
*/
@@ -1525,34 +1525,41 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
final List<MemorySegment> allReadBuffers, final List<MemorySegment> writeBuffers)
throws IOException
{
- final double numMerges = Math.ceil(channelIDs.size() / ((double) this.maxFanIn));
- final int channelsToMergePerStep = (int) Math.ceil(channelIDs.size() / numMerges);
-
+ // A channel list with length maxFanIn<sup>i</sup> can be merged to maxFanIn files in i-1 rounds where every merge
+ // is a full merge with maxFanIn input channels. A partial round includes merges with fewer than maxFanIn
+ // inputs. It is most efficient to perform the partial round first.
+ final double scale = Math.ceil(Math.log(channelIDs.size()) / Math.log(this.maxFanIn)) - 1;
+
+ final int numStart = channelIDs.size();
+ final int numEnd = (int) Math.pow(this.maxFanIn, scale);
+
+ final int numMerges = (int) Math.ceil((numStart - numEnd) / (double) (this.maxFanIn - 1));
+
+ final int numNotMerged = numEnd - numMerges;
+ final int numToMerge = numStart - numNotMerged;
+
+ // unmerged channel IDs are copied directly to the result list
+ final List<ChannelWithBlockCount> mergedChannelIDs = new ArrayList<ChannelWithBlockCount>(numEnd);
+ mergedChannelIDs.addAll(channelIDs.subList(0, numNotMerged));
+
+ final int channelsToMergePerStep = (int) Math.ceil(numToMerge / (double) numMerges);
+
// allocate the memory for the merging step
final List<List<MemorySegment>> readBuffers = new ArrayList<List<MemorySegment>>(channelsToMergePerStep);
getSegmentsForReaders(readBuffers, allReadBuffers, channelsToMergePerStep);
-
- // the list containing the IDs of the merged channels
- final ArrayList<ChannelWithBlockCount> mergedChannelIDs = new ArrayList<ChannelWithBlockCount>((int) (numMerges + 1));
- final ArrayList<ChannelWithBlockCount> channelsToMergeThisStep = new ArrayList<ChannelWithBlockCount>(channelsToMergePerStep);
- int channelNum = 0;
+ final List<ChannelWithBlockCount> channelsToMergeThisStep = new ArrayList<ChannelWithBlockCount>(channelsToMergePerStep);
+ int channelNum = numNotMerged;
while (isRunning() && channelNum < channelIDs.size()) {
channelsToMergeThisStep.clear();
for (int i = 0; i < channelsToMergePerStep && channelNum < channelIDs.size(); i++, channelNum++) {
channelsToMergeThisStep.add(channelIDs.get(channelNum));
}
-
- // merge only, if there is more than one channel
- if (channelsToMergeThisStep.size() < 2) {
- mergedChannelIDs.addAll(channelsToMergeThisStep);
- }
- else {
- mergedChannelIDs.add(mergeChannels(channelsToMergeThisStep, readBuffers, writeBuffers));
- }
+
+ mergedChannelIDs.add(mergeChannels(channelsToMergeThisStep, readBuffers, writeBuffers));
}
-
+
return mergedChannelIDs;
}