You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/09/15 14:50:36 UTC

[2/4] flink git commit: [FLINK-2655] Minimize intermediate merging of external merge sort.

[FLINK-2655] Minimize intermediate merging of external merge sort.

This closes #1118


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a75dd627
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a75dd627
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a75dd627

Branch: refs/heads/master
Commit: a75dd6270fd47720ae1c2ce9464ff6b8f7b43d39
Parents: e78b80c
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Sep 10 10:16:35 2015 -0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Sep 15 12:20:01 2015 +0200

----------------------------------------------------------------------
 .../operators/sort/UnilateralSortMerger.java    | 47 +++++++++++---------
 1 file changed, 27 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a75dd627/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index fd1062d..32fbb52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -1513,11 +1513,11 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		}
 
 		/**
-		 * Merges the given sorted runs to a smaller number of sorted runs. 
-		 * 
+		 * Merges the given sorted runs to a smaller number of sorted runs.
+		 *
 		 * @param channelIDs The IDs of the sorted runs that need to be merged.
+		 * @param allReadBuffers
 		 * @param writeBuffers The buffers to be used by the writers.
-
 		 * @return A list of the IDs of the merged channels.
 		 * @throws IOException Thrown, if the readers or writers encountered an I/O problem.
 		 */
@@ -1525,34 +1525,41 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 					final List<MemorySegment> allReadBuffers, final List<MemorySegment> writeBuffers)
 		throws IOException
 		{
-			final double numMerges = Math.ceil(channelIDs.size() / ((double) this.maxFanIn));
-			final int channelsToMergePerStep = (int) Math.ceil(channelIDs.size() / numMerges);
-			
+			// A channel list with length maxFanIn<sup>i</sup> can be merged to maxFanIn files in i-1 rounds where every merge
+			// is a full merge with maxFanIn input channels. A partial round includes merges with fewer than maxFanIn
+			// inputs. It is most efficient to perform the partial round first.
+			final double scale = Math.ceil(Math.log(channelIDs.size()) / Math.log(this.maxFanIn)) - 1;
+
+			final int numStart = channelIDs.size();
+			final int numEnd = (int) Math.pow(this.maxFanIn, scale);
+
+			final int numMerges = (int) Math.ceil((numStart - numEnd) / (double) (this.maxFanIn - 1));
+
+			final int numNotMerged = numEnd - numMerges;
+			final int numToMerge = numStart - numNotMerged;
+
+			// unmerged channel IDs are copied directly to the result list
+			final List<ChannelWithBlockCount> mergedChannelIDs = new ArrayList<ChannelWithBlockCount>(numEnd);
+			mergedChannelIDs.addAll(channelIDs.subList(0, numNotMerged));
+
+			final int channelsToMergePerStep = (int) Math.ceil(numToMerge / (double) numMerges);
+
 			// allocate the memory for the merging step
 			final List<List<MemorySegment>> readBuffers = new ArrayList<List<MemorySegment>>(channelsToMergePerStep);
 			getSegmentsForReaders(readBuffers, allReadBuffers, channelsToMergePerStep);
-			
-			// the list containing the IDs of the merged channels
-			final ArrayList<ChannelWithBlockCount> mergedChannelIDs = new ArrayList<ChannelWithBlockCount>((int) (numMerges + 1));
 
-			final ArrayList<ChannelWithBlockCount> channelsToMergeThisStep = new ArrayList<ChannelWithBlockCount>(channelsToMergePerStep);
-			int channelNum = 0;
+			final List<ChannelWithBlockCount> channelsToMergeThisStep = new ArrayList<ChannelWithBlockCount>(channelsToMergePerStep);
+			int channelNum = numNotMerged;
 			while (isRunning() && channelNum < channelIDs.size()) {
 				channelsToMergeThisStep.clear();
 
 				for (int i = 0; i < channelsToMergePerStep && channelNum < channelIDs.size(); i++, channelNum++) {
 					channelsToMergeThisStep.add(channelIDs.get(channelNum));
 				}
-				
-				// merge only, if there is more than one channel
-				if (channelsToMergeThisStep.size() < 2)  {
-					mergedChannelIDs.addAll(channelsToMergeThisStep);
-				}
-				else {
-					mergedChannelIDs.add(mergeChannels(channelsToMergeThisStep, readBuffers, writeBuffers));
-				}
+
+				mergedChannelIDs.add(mergeChannels(channelsToMergeThisStep, readBuffers, writeBuffers));
 			}
-			
+
 			return mergedChannelIDs;
 		}