You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/04/26 17:25:49 UTC
flink git commit: [FLINK-3722] [runtime] Don't / and % when sorting
Repository: flink
Updated Branches:
refs/heads/master bbc5e29c8 -> 336b95d4e
[FLINK-3722] [runtime] Don't / and % when sorting
Replace division and modulus with addition and subtraction.
This closes #2628
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/336b95d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/336b95d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/336b95d4
Branch: refs/heads/master
Commit: 336b95d4eedc23e5ce37d1739165157e127c65f8
Parents: bbc5e29
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Oct 5 16:13:02 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Apr 26 11:44:48 2017 -0400
----------------------------------------------------------------------
.../operators/sort/FixedLengthRecordSorter.java | 59 +++++---
.../runtime/operators/sort/IndexedSortable.java | 38 +++++
.../operators/sort/NormalizedKeySorter.java | 66 ++++++---
.../flink/runtime/operators/sort/QuickSort.java | 147 ++++++++++++++-----
4 files changed, 234 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/336b95d4/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
index 3a44ab5..22dfd29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
@@ -19,11 +19,6 @@
package org.apache.flink.runtime.operators.sort;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
@@ -32,6 +27,11 @@ import org.apache.flink.runtime.memory.AbstractPagedInputView;
import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import org.apache.flink.util.MutableObjectIterator;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
/**
*
*/
@@ -134,6 +134,17 @@ public final class FixedLengthRecordSorter<T> implements InMemorySorter<T> {
this.recordInstance = this.serializer.createInstance();
}
+ @Override
+ public int recordSize() {
+ return recordSize;
+ }
+
+ @Override
+ public int recordsPerSegment() {
+ return recordsPerSegment;
+ }
+
+
// -------------------------------------------------------------------------
// Memory Segment
// -------------------------------------------------------------------------
@@ -254,30 +265,40 @@ public final class FixedLengthRecordSorter<T> implements InMemorySorter<T> {
@Override
public int compare(int i, int j) {
- final int bufferNumI = i / this.recordsPerSegment;
+ final int segmentNumberI = i / this.recordsPerSegment;
final int segmentOffsetI = (i % this.recordsPerSegment) * this.recordSize;
-
- final int bufferNumJ = j / this.recordsPerSegment;
+
+ final int segmentNumberJ = j / this.recordsPerSegment;
final int segmentOffsetJ = (j % this.recordsPerSegment) * this.recordSize;
-
- final MemorySegment segI = this.sortBuffer.get(bufferNumI);
- final MemorySegment segJ = this.sortBuffer.get(bufferNumJ);
-
+
+ return compare(segmentNumberI, segmentOffsetI, segmentNumberJ, segmentOffsetJ);
+ }
+
+ @Override
+ public int compare(int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int segmentOffsetJ) {
+ final MemorySegment segI = this.sortBuffer.get(segmentNumberI);
+ final MemorySegment segJ = this.sortBuffer.get(segmentNumberJ);
+
int val = segI.compare(segJ, segmentOffsetI, segmentOffsetJ, this.numKeyBytes);
return this.useNormKeyUninverted ? val : -val;
}
@Override
public void swap(int i, int j) {
- final int bufferNumI = i / this.recordsPerSegment;
+ final int segmentNumberI = i / this.recordsPerSegment;
final int segmentOffsetI = (i % this.recordsPerSegment) * this.recordSize;
-
- final int bufferNumJ = j / this.recordsPerSegment;
+
+ final int segmentNumberJ = j / this.recordsPerSegment;
final int segmentOffsetJ = (j % this.recordsPerSegment) * this.recordSize;
-
- final MemorySegment segI = this.sortBuffer.get(bufferNumI);
- final MemorySegment segJ = this.sortBuffer.get(bufferNumJ);
-
+
+ swap(segmentNumberI, segmentOffsetI, segmentNumberJ, segmentOffsetJ);
+ }
+
+ @Override
+ public void swap(int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int segmentOffsetJ) {
+ final MemorySegment segI = this.sortBuffer.get(segmentNumberI);
+ final MemorySegment segJ = this.sortBuffer.get(segmentNumberJ);
+
segI.swapBytes(this.swapBuffer, segJ, segmentOffsetI, segmentOffsetJ, this.recordSize);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/336b95d4/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/IndexedSortable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/IndexedSortable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/IndexedSortable.java
index 6d7d499..c09cf03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/IndexedSortable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/IndexedSortable.java
@@ -30,15 +30,53 @@ public interface IndexedSortable {
int compare(int i, int j);
/**
+ * Compare records at the given addresses consistent with the semantics of
+ * {@link java.util.Comparator#compare(Object, Object)}.
+
+ * @param segmentNumberI index of memory segment containing first record
+ * @param segmentOffsetI offset into memory segment containing first record
+ * @param segmentNumberJ index of memory segment containing second record
+ * @param segmentOffsetJ offset into memory segment containing second record
+ * @return a negative integer, zero, or a positive integer as the
+ * first argument is less than, equal to, or greater than the
+ * second.
+ */
+ int compare(int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int segmentOffsetJ);
+
+ /**
* Swap items at the given addresses.
*/
void swap(int i, int j);
/**
+ * Swap records at the given addresses.
+ *
+ * @param segmentNumberI index of memory segment containing first record
+ * @param segmentOffsetI offset into memory segment containing first record
+ * @param segmentNumberJ index of memory segment containing second record
+ * @param segmentOffsetJ offset into memory segment containing second record
+ */
+ void swap(int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int segmentOffsetJ);
+
+ /**
* Gets the number of elements in the sortable.
*
* @return The number of elements.
*/
int size();
+ /**
+ * Gets the size of each record, the number of bytes separating the head
+ * of successive records.
+ *
+ * @return The record size
+ */
+ int recordSize();
+
+ /**
+ * Gets the number of elements in each memory segment.
+ *
+ * @return The number of records per segment
+ */
+ int recordsPerSegment();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/336b95d4/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
index 2cade8d..0fd6f38 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
@@ -18,11 +18,6 @@
package org.apache.flink.runtime.operators.sort;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
@@ -34,6 +29,11 @@ import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
/**
*
*/
@@ -167,7 +167,7 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
// compute the index entry size and limits
this.indexEntrySize = this.numKeyBytes + OFFSET_LEN;
- this.indexEntriesPerSegment = segmentSize / this.indexEntrySize;
+ this.indexEntriesPerSegment = this.segmentSize / this.indexEntrySize;
this.lastIndexEntryOffset = (this.indexEntriesPerSegment - 1) * this.indexEntrySize;
this.swapBuffer = new byte[this.indexEntrySize];
@@ -176,6 +176,16 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
this.sortIndex.add(this.currentSortIndexSegment);
}
+ @Override
+ public int recordSize() {
+ return indexEntrySize;
+ }
+
+ @Override
+ public int recordsPerSegment() {
+ return indexEntriesPerSegment;
+ }
+
// -------------------------------------------------------------------------
// Memory Segment
// -------------------------------------------------------------------------
@@ -345,38 +355,48 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
@Override
public int compare(int i, int j) {
- final int bufferNumI = i / this.indexEntriesPerSegment;
+ final int segmentNumberI = i / this.indexEntriesPerSegment;
final int segmentOffsetI = (i % this.indexEntriesPerSegment) * this.indexEntrySize;
-
- final int bufferNumJ = j / this.indexEntriesPerSegment;
+
+ final int segmentNumberJ = j / this.indexEntriesPerSegment;
final int segmentOffsetJ = (j % this.indexEntriesPerSegment) * this.indexEntrySize;
-
- final MemorySegment segI = this.sortIndex.get(bufferNumI);
- final MemorySegment segJ = this.sortIndex.get(bufferNumJ);
-
+
+ return compare(segmentNumberI, segmentOffsetI, segmentNumberJ, segmentOffsetJ);
+ }
+
+ @Override
+ public int compare(int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int segmentOffsetJ) {
+ final MemorySegment segI = this.sortIndex.get(segmentNumberI);
+ final MemorySegment segJ = this.sortIndex.get(segmentNumberJ);
+
int val = segI.compare(segJ, segmentOffsetI + OFFSET_LEN, segmentOffsetJ + OFFSET_LEN, this.numKeyBytes);
-
+
if (val != 0 || this.normalizedKeyFullyDetermines) {
return this.useNormKeyUninverted ? val : -val;
}
-
+
final long pointerI = segI.getLong(segmentOffsetI) & POINTER_MASK;
final long pointerJ = segJ.getLong(segmentOffsetJ) & POINTER_MASK;
-
+
return compareRecords(pointerI, pointerJ);
}
@Override
public void swap(int i, int j) {
- final int bufferNumI = i / this.indexEntriesPerSegment;
+ final int segmentNumberI = i / this.indexEntriesPerSegment;
final int segmentOffsetI = (i % this.indexEntriesPerSegment) * this.indexEntrySize;
-
- final int bufferNumJ = j / this.indexEntriesPerSegment;
+
+ final int segmentNumberJ = j / this.indexEntriesPerSegment;
final int segmentOffsetJ = (j % this.indexEntriesPerSegment) * this.indexEntrySize;
-
- final MemorySegment segI = this.sortIndex.get(bufferNumI);
- final MemorySegment segJ = this.sortIndex.get(bufferNumJ);
-
+
+ swap(segmentNumberI, segmentOffsetI, segmentNumberJ, segmentOffsetJ);
+ }
+
+ @Override
+ public void swap(int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int segmentOffsetJ) {
+ final MemorySegment segI = this.sortIndex.get(segmentNumberI);
+ final MemorySegment segJ = this.sortIndex.get(segmentNumberJ);
+
segI.swapBytes(this.swapBuffer, segJ, segmentOffsetI, segmentOffsetJ, this.indexEntrySize);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/336b95d4/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java
index 474644e..80ba6d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java
@@ -26,9 +26,19 @@ public final class QuickSort implements IndexedSorter {
public QuickSort() {
}
- private static void fix(IndexedSortable s, int p, int r) {
- if (s.compare(p, r) > 0) {
- s.swap(p, r);
+ /**
+ * Fix the records into sorted order, swapping when the first record is
+ * greater than the second record.
+ *
+ * @param s paged sortable
+ * @param pN page number of first record
+ * @param pO page offset of first record
+ * @param rN page number of second record
+ * @param rO page offset of second record
+ */
+ private static void fix(IndexedSortable s, int pN, int pO, int rN, int rO) {
+ if (s.compare(pN, pO, rN, rO) > 0) {
+ s.swap(pN, pO, rN, rO);
}
}
@@ -45,85 +55,154 @@ public final class QuickSort implements IndexedSorter {
/**
* Sort the given range of items using quick sort. {@inheritDoc} If the recursion depth falls below
- * {@link #getMaxDepth},
- * then switch to {@link HeapSort}.
+ * {@link #getMaxDepth}, then switch to {@link HeapSort}.
*/
public void sort(final IndexedSortable s, int p, int r) {
- sortInternal(s, p, r, getMaxDepth(r - p));
+ int recordsPerSegment = s.recordsPerSegment();
+ int recordSize = s.recordSize();
+ int maxOffset = recordSize * (recordsPerSegment - 1);
+
+ int pN = p / recordsPerSegment;
+ int pO = (p % recordsPerSegment) * recordSize;
+
+ int rN = r / recordsPerSegment;
+ int rO = (r % recordsPerSegment) * recordSize;
+
+ sortInternal(s, recordsPerSegment, recordSize, maxOffset, p, pN, pO, r, rN, rO, getMaxDepth(r - p));
}
public void sort(IndexedSortable s) {
sort(s, 0, s.size());
}
- private static void sortInternal(final IndexedSortable s, int p, int r, int depth) {
+ /**
+ * Sort the given range of items using quick sort. If the recursion depth falls below
+ * {@link #getMaxDepth}, then switch to {@link HeapSort}.
+ *
+ * @param s paged sortable
+ * @param recordsPerSegment number of records per memory segment
+ * @param recordSize number of bytes per record
+ * @param maxOffset offset of a last record in a memory segment
+ * @param p index of first record in range
+ * @param pN page number of first record in range
+ * @param pO page offset of first record in range
+ * @param r index of last-plus-one'th record in range
+ * @param rN page number of last-plus-one'th record in range
+ * @param rO page offset of last-plus-one'th record in range
+ * @param depth recursion depth
+ *
+ * @see #sort(IndexedSortable, int, int)
+ */
+ private static void sortInternal(final IndexedSortable s, int recordsPerSegment, int recordSize, int maxOffset,
+ int p, int pN, int pO, int r, int rN, int rO, int depth) {
while (true) {
if (r - p < 13) {
- for (int i = p; i < r; ++i) {
- for (int j = i; j > p && s.compare(j - 1, j) > 0; --j) {
- s.swap(j, j - 1);
+ // switch to insertion sort
+ int i = p+1, iN, iO; if (pO == maxOffset) { iN = pN+1; iO = 0; } else { iN = pN; iO = pO+recordSize; }
+
+ while (i < r) {
+ int j = i, jN = iN, jO = iO;
+ int jd = j-1, jdN, jdO; if (jO == 0) { jdN = jN-1; jdO = maxOffset; } else { jdN = jN; jdO = jO-recordSize; }
+
+ while (j > p && s.compare(jdN, jdO, jN, jO) > 0) {
+ s.swap(jN, jO, jdN, jdO);
+
+ j = jd; jN = jdN; jO = jdO;
+ jd--; if (jdO == 0) { jdN--; jdO = maxOffset; } else { jdO -= recordSize; }
}
+
+ i++; if (iO == maxOffset) { iN++; iO = 0; } else { iO += recordSize; }
}
return;
}
+
if (--depth < 0) {
- // give up
+ // switch to heap sort
alt.sort(s, p, r);
return;
}
+ int rdN, rdO; if (rO == 0) { rdN = rN-1; rdO = maxOffset; } else { rdN = rN; rdO = rO-recordSize; }
+ int m = (p+r)>>>1, mN = m / recordsPerSegment, mO = (m % recordsPerSegment) * recordSize;
+
// select, move pivot into first position
- fix(s, (p + r) >>> 1, p);
- fix(s, (p + r) >>> 1, r - 1);
- fix(s, p, r - 1);
+ fix(s, mN, mO, pN, pO);
+ fix(s, mN, mO, rdN, rdO);
+ fix(s, pN, pO, rdN, rdO);
// Divide
- int i = p;
- int j = r;
- int ll = p;
- int rr = r;
+ int i = p, iN = pN, iO = pO;
+ int j = r, jN = rN, jO = rO;
+ int ll = p, llN = pN, llO = pO;
+ int rr = r, rrN = rN, rrO = rO;
int cr;
while (true) {
- while (++i < j) {
- if ((cr = s.compare(i, p)) > 0) {
+ i++; if (iO == maxOffset) { iN++; iO = 0; } else { iO += recordSize; }
+
+ while (i < j) {
+ if ((cr = s.compare(iN, iO, pN, pO)) > 0) {
break;
}
- if (0 == cr && ++ll != i) {
- s.swap(ll, i);
+
+ if (0 == cr) {
+ ll++; if (llO == maxOffset) { llN++; llO = 0; } else { llO += recordSize; }
+
+ if (ll != i) {
+ s.swap(llN, llO, iN, iO);
+ }
}
+
+ i++; if (iO == maxOffset) { iN++; iO = 0; } else { iO += recordSize; }
}
- while (--j > i) {
- if ((cr = s.compare(p, j)) > 0) {
+
+ j--; if (jO == 0) { jN--; jO = maxOffset; } else { jO -= recordSize; }
+
+ while (j > i) {
+ if ((cr = s.compare(pN, pO, jN, jO)) > 0) {
break;
}
- if (0 == cr && --rr != j) {
- s.swap(rr, j);
+
+ if (0 == cr) {
+ rr--; if (rrO == 0) { rrN--; rrO = maxOffset; } else { rrO -= recordSize; }
+
+ if (rr != j) {
+ s.swap(rrN, rrO, jN, jO);
+ }
}
+
+ j--; if (jO == 0) { jN--; jO = maxOffset; } else { jO -= recordSize; }
}
if (i < j) {
- s.swap(i, j);
+ s.swap(iN, iO, jN, jO);
} else {
break;
}
}
- j = i;
+ j = i; jN = iN; jO = iO;
// swap pivot- and all eq values- into position
while (ll >= p) {
- s.swap(ll--, --i);
+ i--; if (iO == 0) { iN--; iO = maxOffset; } else { iO -= recordSize; }
+
+ s.swap(llN, llO, iN, iO);
+
+ ll--; if (llO == 0) { llN--; llO = maxOffset; } else { llO -= recordSize; }
}
while (rr < r) {
- s.swap(rr++, j++);
+ s.swap(rrN, rrO, jN, jO);
+
+ rr++; if (rrO == maxOffset) { rrN++; rrO = 0; } else { rrO += recordSize; }
+ j++; if (jO == maxOffset) { jN++; jO = 0; } else { jO += recordSize; }
}
// Conquer
// Recurse on smaller interval first to keep stack shallow
assert i != j;
if (i - p < r - j) {
- sortInternal(s, p, i, depth);
- p = j;
+ sortInternal(s, recordsPerSegment, recordSize, maxOffset, p, pN, pO, i, iN, iO, depth);
+ p = j; pN = jN; pO = jO;
} else {
- sortInternal(s, j, r, depth);
- r = i;
+ sortInternal(s, recordsPerSegment, recordSize, maxOffset, j, jN, jO, r, rN, rO, depth);
+ r = i; rN = iN; rO = iO;
}
}
}