You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/04/26 17:25:49 UTC

flink git commit: [FLINK-3722] [runtime] Don't / and % when sorting

Repository: flink
Updated Branches:
  refs/heads/master bbc5e29c8 -> 336b95d4e


[FLINK-3722] [runtime] Don't / and % when sorting

Replace division and modulus with addition and subtraction.

This closes #2628


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/336b95d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/336b95d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/336b95d4

Branch: refs/heads/master
Commit: 336b95d4eedc23e5ce37d1739165157e127c65f8
Parents: bbc5e29
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Oct 5 16:13:02 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Apr 26 11:44:48 2017 -0400

----------------------------------------------------------------------
 .../operators/sort/FixedLengthRecordSorter.java |  59 +++++---
 .../runtime/operators/sort/IndexedSortable.java |  38 +++++
 .../operators/sort/NormalizedKeySorter.java     |  66 ++++++---
 .../flink/runtime/operators/sort/QuickSort.java | 147 ++++++++++++++-----
 4 files changed, 234 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/336b95d4/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
index 3a44ab5..22dfd29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
@@ -19,11 +19,6 @@
 
 package org.apache.flink.runtime.operators.sort;
 
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.MemorySegment;
@@ -32,6 +27,11 @@ import org.apache.flink.runtime.memory.AbstractPagedInputView;
 import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 import org.apache.flink.util.MutableObjectIterator;
 
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * 
  */
@@ -134,6 +134,17 @@ public final class FixedLengthRecordSorter<T> implements InMemorySorter<T> {
 		this.recordInstance = this.serializer.createInstance();
 	}
 
+	@Override
+	public int recordSize() {
+		return recordSize;
+	}
+
+	@Override
+	public int recordsPerSegment() {
+		return recordsPerSegment;
+	}
+
+
 	// -------------------------------------------------------------------------
 	// Memory Segment
 	// -------------------------------------------------------------------------
@@ -254,30 +265,40 @@ public final class FixedLengthRecordSorter<T> implements InMemorySorter<T> {
 
 	@Override
 	public int compare(int i, int j) {
-		final int bufferNumI = i / this.recordsPerSegment;
+		final int segmentNumberI = i / this.recordsPerSegment;
 		final int segmentOffsetI = (i % this.recordsPerSegment) * this.recordSize;
-		
-		final int bufferNumJ = j / this.recordsPerSegment;
+
+		final int segmentNumberJ = j / this.recordsPerSegment;
 		final int segmentOffsetJ = (j % this.recordsPerSegment) * this.recordSize;
-		
-		final MemorySegment segI = this.sortBuffer.get(bufferNumI);
-		final MemorySegment segJ = this.sortBuffer.get(bufferNumJ);
-		
+
+		return compare(segmentNumberI, segmentOffsetI, segmentNumberJ, segmentOffsetJ);
+	}
+
+	@Override
+	public int compare(int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int segmentOffsetJ) {
+		final MemorySegment segI = this.sortBuffer.get(segmentNumberI);
+		final MemorySegment segJ = this.sortBuffer.get(segmentNumberJ);
+
 		int val = segI.compare(segJ, segmentOffsetI, segmentOffsetJ, this.numKeyBytes);
 		return this.useNormKeyUninverted ? val : -val;
 	}
 
 	@Override
 	public void swap(int i, int j) {
-		final int bufferNumI = i / this.recordsPerSegment;
+		final int segmentNumberI = i / this.recordsPerSegment;
 		final int segmentOffsetI = (i % this.recordsPerSegment) * this.recordSize;
-		
-		final int bufferNumJ = j / this.recordsPerSegment;
+
+		final int segmentNumberJ = j / this.recordsPerSegment;
 		final int segmentOffsetJ = (j % this.recordsPerSegment) * this.recordSize;
-		
-		final MemorySegment segI = this.sortBuffer.get(bufferNumI);
-		final MemorySegment segJ = this.sortBuffer.get(bufferNumJ);
-		
+
+		swap(segmentNumberI, segmentOffsetI, segmentNumberJ, segmentOffsetJ);
+	}
+
+	@Override
+	public void swap(int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int segmentOffsetJ) {
+		final MemorySegment segI = this.sortBuffer.get(segmentNumberI);
+		final MemorySegment segJ = this.sortBuffer.get(segmentNumberJ);
+
 		segI.swapBytes(this.swapBuffer, segJ, segmentOffsetI, segmentOffsetJ, this.recordSize);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/336b95d4/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/IndexedSortable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/IndexedSortable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/IndexedSortable.java
index 6d7d499..c09cf03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/IndexedSortable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/IndexedSortable.java
@@ -30,15 +30,53 @@ public interface IndexedSortable {
 	int compare(int i, int j);
 
 	/**
+	 * Compare records at the given addresses consistent with the semantics of
+	 * {@link java.util.Comparator#compare(Object, Object)}.
+
+	 * @param segmentNumberI index of memory segment containing first record
+	 * @param segmentOffsetI offset into memory segment containing first record
+	 * @param segmentNumberJ index of memory segment containing second record
+	 * @param segmentOffsetJ offset into memory segment containing second record
+	 * @return a negative integer, zero, or a positive integer as the
+	 *         first argument is less than, equal to, or greater than the
+	 *         second.
+	 */
+	int compare(int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int segmentOffsetJ);
+
+	/**
 	 * Swap items at the given addresses.
 	 */
 	void swap(int i, int j);
 
 	/**
+	 * Swap records at the given addresses.
+	 *
+	 * @param segmentNumberI index of memory segment containing first record
+	 * @param segmentOffsetI offset into memory segment containing first record
+	 * @param segmentNumberJ index of memory segment containing second record
+	 * @param segmentOffsetJ offset into memory segment containing second record
+	 */
+	void swap(int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int segmentOffsetJ);
+
+	/**
 	 * Gets the number of elements in the sortable.
 	 * 
 	 * @return The number of elements.
 	 */
 	int size();
 
+	/**
+	 * Gets the size of each record, the number of bytes separating the head
+	 * of successive records.
+	 *
+	 * @return The record size
+	 */
+	int recordSize();
+
+	/**
+	 * Gets the number of elements in each memory segment.
+	 *
+	 * @return The number of records per segment
+	 */
+	int recordsPerSegment();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/336b95d4/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
index 2cade8d..0fd6f38 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.runtime.operators.sort;
 
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.MemorySegment;
@@ -34,6 +29,11 @@ import org.apache.flink.util.MutableObjectIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * 
  */
@@ -167,7 +167,7 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
 		
 		// compute the index entry size and limits
 		this.indexEntrySize = this.numKeyBytes + OFFSET_LEN;
-		this.indexEntriesPerSegment = segmentSize / this.indexEntrySize;
+		this.indexEntriesPerSegment = this.segmentSize / this.indexEntrySize;
 		this.lastIndexEntryOffset = (this.indexEntriesPerSegment - 1) * this.indexEntrySize;
 		this.swapBuffer = new byte[this.indexEntrySize];
 		
@@ -176,6 +176,16 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
 		this.sortIndex.add(this.currentSortIndexSegment);
 	}
 
+	@Override
+	public int recordSize() {
+		return indexEntrySize;
+	}
+
+	@Override
+	public int recordsPerSegment() {
+		return indexEntriesPerSegment;
+	}
+
 	// -------------------------------------------------------------------------
 	// Memory Segment
 	// -------------------------------------------------------------------------
@@ -345,38 +355,48 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
 
 	@Override
 	public int compare(int i, int j) {
-		final int bufferNumI = i / this.indexEntriesPerSegment;
+		final int segmentNumberI = i / this.indexEntriesPerSegment;
 		final int segmentOffsetI = (i % this.indexEntriesPerSegment) * this.indexEntrySize;
-		
-		final int bufferNumJ = j / this.indexEntriesPerSegment;
+
+		final int segmentNumberJ = j / this.indexEntriesPerSegment;
 		final int segmentOffsetJ = (j % this.indexEntriesPerSegment) * this.indexEntrySize;
-		
-		final MemorySegment segI = this.sortIndex.get(bufferNumI);
-		final MemorySegment segJ = this.sortIndex.get(bufferNumJ);
-		
+
+		return compare(segmentNumberI, segmentOffsetI, segmentNumberJ, segmentOffsetJ);
+	}
+
+	@Override
+	public int compare(int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int segmentOffsetJ) {
+		final MemorySegment segI = this.sortIndex.get(segmentNumberI);
+		final MemorySegment segJ = this.sortIndex.get(segmentNumberJ);
+
 		int val = segI.compare(segJ, segmentOffsetI + OFFSET_LEN, segmentOffsetJ + OFFSET_LEN, this.numKeyBytes);
-		
+
 		if (val != 0 || this.normalizedKeyFullyDetermines) {
 			return this.useNormKeyUninverted ? val : -val;
 		}
-		
+
 		final long pointerI = segI.getLong(segmentOffsetI) & POINTER_MASK;
 		final long pointerJ = segJ.getLong(segmentOffsetJ) & POINTER_MASK;
-		
+
 		return compareRecords(pointerI, pointerJ);
 	}
 
 	@Override
 	public void swap(int i, int j) {
-		final int bufferNumI = i / this.indexEntriesPerSegment;
+		final int segmentNumberI = i / this.indexEntriesPerSegment;
 		final int segmentOffsetI = (i % this.indexEntriesPerSegment) * this.indexEntrySize;
-		
-		final int bufferNumJ = j / this.indexEntriesPerSegment;
+
+		final int segmentNumberJ = j / this.indexEntriesPerSegment;
 		final int segmentOffsetJ = (j % this.indexEntriesPerSegment) * this.indexEntrySize;
-		
-		final MemorySegment segI = this.sortIndex.get(bufferNumI);
-		final MemorySegment segJ = this.sortIndex.get(bufferNumJ);
-		
+
+		swap(segmentNumberI, segmentOffsetI, segmentNumberJ, segmentOffsetJ);
+	}
+
+	@Override
+	public void swap(int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int segmentOffsetJ) {
+		final MemorySegment segI = this.sortIndex.get(segmentNumberI);
+		final MemorySegment segJ = this.sortIndex.get(segmentNumberJ);
+
 		segI.swapBytes(this.swapBuffer, segJ, segmentOffsetI, segmentOffsetJ, this.indexEntrySize);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/336b95d4/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java
index 474644e..80ba6d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java
@@ -26,9 +26,19 @@ public final class QuickSort implements IndexedSorter {
 	public QuickSort() {
 	}
 
-	private static void fix(IndexedSortable s, int p, int r) {
-		if (s.compare(p, r) > 0) {
-			s.swap(p, r);
+	/**
+	 * Fix the records into sorted order, swapping when the first record is
+	 * greater than the second record.
+	 *
+	 * @param s paged sortable
+	 * @param pN page number of first record
+	 * @param pO page offset of first record
+	 * @param rN page number of second record
+	 * @param rO page offset of second record
+	 */
+	private static void fix(IndexedSortable s, int pN, int pO, int rN, int rO) {
+		if (s.compare(pN, pO, rN, rO) > 0) {
+			s.swap(pN, pO, rN, rO);
 		}
 	}
 
@@ -45,85 +55,154 @@ public final class QuickSort implements IndexedSorter {
 
 	/**
 	 * Sort the given range of items using quick sort. {@inheritDoc} If the recursion depth falls below
-	 * {@link #getMaxDepth},
-	 * then switch to {@link HeapSort}.
+	 * {@link #getMaxDepth}, then switch to {@link HeapSort}.
 	 */
 	public void sort(final IndexedSortable s, int p, int r) {
-		sortInternal(s, p, r, getMaxDepth(r - p));
+		int recordsPerSegment = s.recordsPerSegment();
+		int recordSize = s.recordSize();
+		int maxOffset = recordSize * (recordsPerSegment - 1);
+
+		int pN = p / recordsPerSegment;
+		int pO = (p % recordsPerSegment) * recordSize;
+
+		int rN = r / recordsPerSegment;
+		int rO = (r % recordsPerSegment) * recordSize;
+
+		sortInternal(s, recordsPerSegment, recordSize, maxOffset, p, pN, pO, r, rN, rO, getMaxDepth(r - p));
 	}
 
 	public void sort(IndexedSortable s) {
 		sort(s, 0, s.size());
 	}
 
-	private static void sortInternal(final IndexedSortable s, int p, int r, int depth) {
+	/**
+	 * Sort the given range of items using quick sort. If the recursion depth falls below
+	 * {@link #getMaxDepth}, then switch to {@link HeapSort}.
+	 *
+	 * @param s paged sortable
+	 * @param recordsPerSegment number of records per memory segment
+	 * @param recordSize number of bytes per record
+	 * @param maxOffset offset of a last record in a memory segment
+	 * @param p index of first record in range
+	 * @param pN page number of first record in range
+	 * @param pO page offset of first record in range
+	 * @param r index of last-plus-one'th record in range
+	 * @param rN page number of last-plus-one'th record in range
+	 * @param rO page offset of last-plus-one'th record in range
+	 * @param depth recursion depth
+	 *
+	 * @see #sort(IndexedSortable, int, int)
+	 */
+	private static void sortInternal(final IndexedSortable s, int recordsPerSegment, int recordSize, int maxOffset,
+			int p, int pN, int pO, int r, int rN, int rO, int depth) {
 		while (true) {
 			if (r - p < 13) {
-				for (int i = p; i < r; ++i) {
-					for (int j = i; j > p && s.compare(j - 1, j) > 0; --j) {
-						s.swap(j, j - 1);
+				// switch to insertion sort
+				int i = p+1, iN, iO; if (pO == maxOffset) { iN = pN+1; iO = 0; } else { iN = pN; iO = pO+recordSize; }
+
+				while (i < r) {
+					int j = i, jN = iN, jO = iO;
+					int jd = j-1, jdN, jdO; if (jO == 0) { jdN = jN-1; jdO = maxOffset; } else { jdN = jN; jdO = jO-recordSize; }
+
+					while (j > p && s.compare(jdN, jdO, jN, jO) > 0) {
+						s.swap(jN, jO, jdN, jdO);
+
+						j = jd; jN = jdN; jO = jdO;
+						jd--; if (jdO == 0) { jdN--; jdO = maxOffset; } else { jdO -= recordSize; }
 					}
+
+					i++; if (iO == maxOffset) { iN++; iO = 0; } else { iO += recordSize; }
 				}
 				return;
 			}
+
 			if (--depth < 0) {
-				// give up
+				// switch to heap sort
 				alt.sort(s, p, r);
 				return;
 			}
 
+			int rdN, rdO; if (rO == 0) { rdN = rN-1; rdO = maxOffset; } else { rdN = rN; rdO = rO-recordSize; }
+			int m = (p+r)>>>1, mN = m / recordsPerSegment, mO = (m % recordsPerSegment) * recordSize;
+
 			// select, move pivot into first position
-			fix(s, (p + r) >>> 1, p);
-			fix(s, (p + r) >>> 1, r - 1);
-			fix(s, p, r - 1);
+			fix(s, mN, mO, pN, pO);
+			fix(s, mN, mO, rdN, rdO);
+			fix(s, pN, pO, rdN, rdO);
 
 			// Divide
-			int i = p;
-			int j = r;
-			int ll = p;
-			int rr = r;
+			int i = p, iN = pN, iO = pO;
+			int j = r, jN = rN, jO = rO;
+			int ll = p, llN = pN, llO = pO;
+			int rr = r, rrN = rN, rrO = rO;
 			int cr;
 			while (true) {
-				while (++i < j) {
-					if ((cr = s.compare(i, p)) > 0) {
+				i++; if (iO == maxOffset) { iN++; iO = 0; } else { iO += recordSize; }
+
+				while (i < j) {
+					if ((cr = s.compare(iN, iO, pN, pO)) > 0) {
 						break;
 					}
-					if (0 == cr && ++ll != i) {
-						s.swap(ll, i);
+
+					if (0 == cr) {
+						ll++; if (llO == maxOffset) { llN++; llO = 0; } else { llO += recordSize; }
+
+						if (ll != i) {
+							s.swap(llN, llO, iN, iO);
+						}
 					}
+
+					i++; if (iO == maxOffset) { iN++; iO = 0; } else { iO += recordSize; }
 				}
-				while (--j > i) {
-					if ((cr = s.compare(p, j)) > 0) {
+
+				j--; if (jO == 0) { jN--; jO = maxOffset; } else { jO -= recordSize; }
+
+				while (j > i) {
+					if ((cr = s.compare(pN, pO, jN, jO)) > 0) {
 						break;
 					}
-					if (0 == cr && --rr != j) {
-						s.swap(rr, j);
+
+					if (0 == cr) {
+						rr--; if (rrO == 0) { rrN--; rrO = maxOffset; } else { rrO -= recordSize; }
+
+						if (rr != j) {
+							s.swap(rrN, rrO, jN, jO);
+						}
 					}
+
+					j--; if (jO == 0) { jN--; jO = maxOffset; } else { jO -= recordSize; }
 				}
 				if (i < j) {
-					s.swap(i, j);
+					s.swap(iN, iO, jN, jO);
 				} else {
 					break;
 				}
 			}
-			j = i;
+			j = i; jN = iN; jO = iO;
 			// swap pivot- and all eq values- into position
 			while (ll >= p) {
-				s.swap(ll--, --i);
+				i--; if (iO == 0) { iN--; iO = maxOffset; } else { iO -= recordSize; }
+
+				s.swap(llN, llO, iN, iO);
+
+				ll--; if (llO == 0) { llN--; llO = maxOffset; } else { llO -= recordSize; }
 			}
 			while (rr < r) {
-				s.swap(rr++, j++);
+				s.swap(rrN, rrO, jN, jO);
+
+				rr++; if (rrO == maxOffset) { rrN++; rrO = 0; } else { rrO += recordSize; }
+				j++; if (jO == maxOffset) { jN++; jO = 0; } else { jO += recordSize; }
 			}
 
 			// Conquer
 			// Recurse on smaller interval first to keep stack shallow
 			assert i != j;
 			if (i - p < r - j) {
-				sortInternal(s, p, i, depth);
-				p = j;
+				sortInternal(s, recordsPerSegment, recordSize, maxOffset, p, pN, pO, i, iN, iO, depth);
+				p = j; pN = jN; pO = jO;
 			} else {
-				sortInternal(s, j, r, depth);
-				r = i;
+				sortInternal(s, recordsPerSegment, recordSize, maxOffset, j, jN, jO, r, rN, rO, depth);
+				r = i; rN = iN; rO = iO;
 			}
 		}
 	}