You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/21 23:34:43 UTC
[2/4] flink git commit: [FLINK-2648] [FLINK-2717] [runtime] Harden
memory release in sorters against asynchronous canceling
[FLINK-2648] [FLINK-2717] [runtime] Harden memory release in sorters against asynchronous canceling
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/435ee4eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/435ee4eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/435ee4eb
Branch: refs/heads/master
Commit: 435ee4ebb5ec2ac0086e1a5136125ecc1d7c89a8
Parents: bd74bae
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 21 21:05:45 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 21 23:18:04 2015 +0200
----------------------------------------------------------------------
.../operators/GroupReduceCombineDriver.java | 28 ++++++---
.../runtime/operators/ReduceCombineDriver.java | 26 +++++---
.../chaining/GroupCombineChainedDriver.java | 45 ++++++++------
.../SynchronousChainedCombineDriver.java | 51 ++++++++--------
.../operators/sort/FixedLengthRecordSorter.java | 37 +++---------
.../runtime/operators/sort/InMemorySorter.java | 22 ++++---
.../operators/sort/NormalizedKeySorter.java | 39 +++---------
.../operators/sort/UnilateralSortMerger.java | 18 +++---
.../sort/FixedLengthRecordSorterTest.java | 32 +++++-----
.../operators/sort/NormalizedKeySorterTest.java | 63 ++++++++++----------
10 files changed, 176 insertions(+), 185 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/435ee4eb/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index 7115a4d..ab46d95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -74,10 +74,10 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
private QuickSort sortAlgo = new QuickSort();
- private MemoryManager memManager;
-
private Collector<OUT> output;
+ private List<MemorySegment> memory;
+
private long oversizedRecordCount;
private volatile boolean running = true;
@@ -115,9 +115,8 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
if (driverStrategy != DriverStrategy.SORTED_GROUP_COMBINE){
throw new Exception("Invalid strategy " + driverStrategy + " for group reduce combiner.");
}
-
- this.memManager = this.taskContext.getMemoryManager();
- final int numMemoryPages = memManager.computeNumberOfPages(this.taskContext.getTaskConfig().getRelativeMemoryDriver());
+
+
final TypeSerializerFactory<IN> serializerFactory = this.taskContext.getInputSerializer(0);
this.serializer = serializerFactory.getSerializer();
@@ -128,8 +127,9 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
this.combiner = this.taskContext.getStub();
this.output = this.taskContext.getOutputCollector();
- final List<MemorySegment> memory = this.memManager.allocatePages(this.taskContext.getOwningNepheleTask(),
- numMemoryPages);
+ MemoryManager memManager = this.taskContext.getMemoryManager();
+ final int numMemoryPages = memManager.computeNumberOfPages(this.taskContext.getTaskConfig().getRelativeMemoryDriver());
+ this.memory = memManager.allocatePages(this.taskContext.getOwningNepheleTask(), numMemoryPages);
// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
if (sortingComparator.supportsSerializationWithKeyNormalization() &&
@@ -218,16 +218,26 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
@Override
public void cleanup() throws Exception {
if (this.sorter != null) {
- this.memManager.release(this.sorter.dispose());
+ this.sorter.dispose();
}
+
+ this.taskContext.getMemoryManager().release(this.memory);
}
@Override
public void cancel() {
this.running = false;
+
if (this.sorter != null) {
- this.memManager.release(this.sorter.dispose());
+ try {
+ this.sorter.dispose();
+ }
+ catch (Exception e) {
+ // may happen during concurrent modification
+ }
}
+
+ this.taskContext.getMemoryManager().release(this.memory);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/435ee4eb/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
index 19557bc..26da0ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
@@ -63,12 +63,12 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
private Collector<T> output;
- private MemoryManager memManager;
-
private InMemorySorter<T> sorter;
private QuickSort sortAlgo = new QuickSort();
+ private List<MemorySegment> memory;
+
private boolean running;
private boolean objectReuseEnabled = false;
@@ -105,10 +105,6 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
throw new Exception("Invalid strategy " + this.taskContext.getTaskConfig().getDriverStrategy() + " for reduce combiner.");
}
- this.memManager = this.taskContext.getMemoryManager();
- final int numMemoryPages = memManager.computeNumberOfPages(this.taskContext.getTaskConfig()
- .getRelativeMemoryDriver());
-
// instantiate the serializer / comparator
final TypeSerializerFactory<T> serializerFactory = this.taskContext.getInputSerializer(0);
this.comparator = this.taskContext.getDriverComparator(0);
@@ -116,7 +112,10 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
this.reducer = this.taskContext.getStub();
this.output = this.taskContext.getOutputCollector();
- final List<MemorySegment> memory = this.memManager.allocatePages(this.taskContext.getOwningNepheleTask(), numMemoryPages);
+ MemoryManager memManager = this.taskContext.getMemoryManager();
+ final int numMemoryPages = memManager.computeNumberOfPages(
+ this.taskContext.getTaskConfig().getRelativeMemoryDriver());
+ this.memory = memManager.allocatePages(this.taskContext.getOwningNepheleTask(), numMemoryPages);
// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
if (this.comparator.supportsSerializationWithKeyNormalization() &&
@@ -241,12 +240,21 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
@Override
public void cleanup() {
- this.memManager.release(this.sorter.dispose());
+ this.sorter.dispose();
+ this.taskContext.getMemoryManager().release(this.memory);
}
@Override
public void cancel() {
this.running = false;
- this.memManager.release(this.sorter.dispose());
+
+ try {
+ this.sorter.dispose();
+ }
+ catch (Exception e) {
+ // may happen during concurrent modifications
+ }
+
+ this.taskContext.getMemoryManager().release(this.memory);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/435ee4eb/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
index 09f4288..08ad25b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
@@ -71,15 +71,13 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> {
private TypeSerializer<IN> serializer;
- private TypeComparator<IN> sortingComparator;
-
private TypeComparator<IN> groupingComparator;
private AbstractInvokable parent;
private QuickSort sortAlgo = new QuickSort();
- private MemoryManager memManager;
+ private List<MemorySegment> memory;
private volatile boolean running = true;
@@ -102,28 +100,28 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> {
final Configuration stubConfig = this.config.getStubParameters();
RegularPactTask.openUserCode(this.reducer, stubConfig);
- // ----------------- Set up the asynchronous sorter -------------------------
-
- this.memManager = this.parent.getEnvironment().getMemoryManager();
- final int numMemoryPages = memManager.computeNumberOfPages(this.config.getRelativeMemoryDriver());
+ // ----------------- Set up the sorter -------------------------
// instantiate the serializer / comparator
final TypeSerializerFactory<IN> serializerFactory = this.config.getInputSerializer(0, this.userCodeClassLoader);
final TypeComparatorFactory<IN> sortingComparatorFactory = this.config.getDriverComparator(0, this.userCodeClassLoader);
final TypeComparatorFactory<IN> groupingComparatorFactory = this.config.getDriverComparator(1, this.userCodeClassLoader);
this.serializer = serializerFactory.getSerializer();
- this.sortingComparator = sortingComparatorFactory.createComparator();
+
+ TypeComparator<IN> sortingComparator = sortingComparatorFactory.createComparator();
this.groupingComparator = groupingComparatorFactory.createComparator();
- final List<MemorySegment> memory = this.memManager.allocatePages(this.parent, numMemoryPages);
+ MemoryManager memManager = this.parent.getEnvironment().getMemoryManager();
+ final int numMemoryPages = memManager.computeNumberOfPages(this.config.getRelativeMemoryDriver());
+ this.memory = memManager.allocatePages(this.parent, numMemoryPages);
// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
- if (this.sortingComparator.supportsSerializationWithKeyNormalization() &&
+ if (sortingComparator.supportsSerializationWithKeyNormalization() &&
this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
{
- this.sorter = new FixedLengthRecordSorter<IN>(this.serializer, this.sortingComparator, memory);
+ this.sorter = new FixedLengthRecordSorter<IN>(this.serializer, sortingComparator, memory);
} else {
- this.sorter = new NormalizedKeySorter<IN>(this.serializer, this.sortingComparator.duplicate(), memory);
+ this.sorter = new NormalizedKeySorter<IN>(this.serializer, sortingComparator.duplicate(), memory);
}
if (LOG.isDebugEnabled()) {
@@ -133,19 +131,30 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> {
@Override
public void closeTask() throws Exception {
- this.memManager.release(this.sorter.dispose());
-
- if (!this.running) {
- return;
+ if (this.sorter != null) {
+ this.sorter.dispose();
}
+ this.parent.getEnvironment().getMemoryManager().release(this.memory);
- RegularPactTask.closeUserCode(this.reducer);
+ if (this.running) {
+ RegularPactTask.closeUserCode(this.reducer);
+ }
}
@Override
public void cancelTask() {
this.running = false;
- this.memManager.release(this.sorter.dispose());
+
+ if (this.sorter != null) {
+ try {
+ this.sorter.dispose();
+ }
+ catch (Exception e) {
+ // may happen during concurrent modification
+ }
+ }
+
+ this.parent.getEnvironment().getMemoryManager().release(this.memory);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/435ee4eb/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
index 75c1eed..da9698c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
@@ -58,9 +58,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN,
private static final Logger LOG = LoggerFactory.getLogger(SynchronousChainedCombineDriver.class);
- /**
- * Fix length records with a length below this threshold will be in-place sorted, if possible.
- */
+ /** Fix length records with a length below this threshold will be in-place sorted, if possible. */
private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
// --------------------------------------------------------------------------------------------
@@ -71,16 +69,14 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN,
private TypeSerializer<IN> serializer;
- private TypeComparator<IN> sortingComparator;
-
private TypeComparator<IN> groupingComparator;
private AbstractInvokable parent;
- private QuickSort sortAlgo = new QuickSort();
-
- private MemoryManager memManager;
+ private final QuickSort sortAlgo = new QuickSort();
+ private List<MemorySegment> memory;
+
private volatile boolean running = true;
// --------------------------------------------------------------------------------------------
@@ -102,28 +98,29 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN,
final Configuration stubConfig = this.config.getStubParameters();
RegularPactTask.openUserCode(this.combiner, stubConfig);
- // ----------------- Set up the asynchronous sorter -------------------------
-
- this.memManager = this.parent.getEnvironment().getMemoryManager();
- final int numMemoryPages = memManager.computeNumberOfPages(this.config.getRelativeMemoryDriver());
+ // ----------------- Set up the sorter -------------------------
// instantiate the serializer / comparator
final TypeSerializerFactory<IN> serializerFactory = this.config.getInputSerializer(0, this.userCodeClassLoader);
final TypeComparatorFactory<IN> sortingComparatorFactory = this.config.getDriverComparator(0, this.userCodeClassLoader);
final TypeComparatorFactory<IN> groupingComparatorFactory = this.config.getDriverComparator(1, this.userCodeClassLoader);
+
this.serializer = serializerFactory.getSerializer();
- this.sortingComparator = sortingComparatorFactory.createComparator();
- this.groupingComparator = groupingComparatorFactory.createComparator();
- final List<MemorySegment> memory = this.memManager.allocatePages(this.parent, numMemoryPages);
+ TypeComparator<IN> sortingComparator = sortingComparatorFactory.createComparator();
+ this.groupingComparator = groupingComparatorFactory.createComparator();
+
+ MemoryManager memManager = this.parent.getEnvironment().getMemoryManager();
+ final int numMemoryPages = memManager.computeNumberOfPages(this.config.getRelativeMemoryDriver());
+ this.memory = memManager.allocatePages(this.parent, numMemoryPages);
// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
- if (this.sortingComparator.supportsSerializationWithKeyNormalization() &&
+ if (sortingComparator.supportsSerializationWithKeyNormalization() &&
this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
{
- this.sorter = new FixedLengthRecordSorter<IN>(this.serializer, this.sortingComparator, memory);
+ this.sorter = new FixedLengthRecordSorter<IN>(this.serializer, sortingComparator, this.memory);
} else {
- this.sorter = new NormalizedKeySorter<IN>(this.serializer, this.sortingComparator.duplicate(), memory);
+ this.sorter = new NormalizedKeySorter<IN>(this.serializer, sortingComparator.duplicate(), this.memory);
}
if (LOG.isDebugEnabled()) {
@@ -133,19 +130,25 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN,
@Override
public void closeTask() throws Exception {
- this.memManager.release(this.sorter.dispose());
+ this.sorter.dispose();
+ this.parent.getEnvironment().getMemoryManager().release(this.memory);
- if (!this.running) {
- return;
+ if (this.running) {
+ RegularPactTask.closeUserCode(this.combiner);
}
-
- RegularPactTask.closeUserCode(this.combiner);
}
@Override
public void cancelTask() {
this.running = false;
- this.memManager.release(this.sorter.dispose());
+ try {
+ this.sorter.dispose();
+ }
+ catch (Exception e) {
+ // may happen during concurrent modification when canceling
+ }
+
+ this.parent.getEnvironment().getMemoryManager().release(this.memory);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/435ee4eb/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
index e932592..da96b17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
@@ -121,13 +121,7 @@ public final class FixedLengthRecordSorter<T> implements InMemorySorter<T> {
this.lastEntryOffset = (this.recordsPerSegment - 1) * this.recordSize;
this.swapBuffer = new byte[this.recordSize];
- if (memory instanceof ArrayList<?>) {
- this.freeMemory = (ArrayList<MemorySegment>) memory;
- }
- else {
- this.freeMemory = new ArrayList<MemorySegment>(memory.size());
- this.freeMemory.addAll(memory);
- }
+ this.freeMemory = new ArrayList<MemorySegment>(memory);
// create the buffer collections
this.sortBuffer = new ArrayList<MemorySegment>(16);
@@ -174,16 +168,10 @@ public final class FixedLengthRecordSorter<T> implements InMemorySorter<T> {
return this.numRecords == 0;
}
- /**
- * Collects all memory segments from this sorter.
- *
- * @return All memory segments from this sorter.
- */
@Override
- public List<MemorySegment> dispose() {
- this.freeMemory.addAll(this.sortBuffer);
+ public void dispose() {
+ this.freeMemory.clear();
this.sortBuffer.clear();
- return this.freeMemory;
}
@Override
@@ -195,23 +183,16 @@ public final class FixedLengthRecordSorter<T> implements InMemorySorter<T> {
public long getOccupancy() {
return this.sortBufferBytes;
}
-
- @Override
- public long getNumRecordBytes() {
- return this.sortBufferBytes;
- }
// -------------------------------------------------------------------------
// Retrieving and Writing
// -------------------------------------------------------------------------
-
- /**
- * Gets the record at the given logical position.
- *
- * @param reuse The reuse object to deserialize the record into.
- * @param logicalPosition The logical position of the record.
- * @throws IOException Thrown, if an exception occurred during deserialization.
- */
+
+ @Override
+ public T getRecord(int logicalPosition) throws IOException {
+ return getRecord(serializer.createInstance(), logicalPosition);
+ }
+
@Override
public T getRecord(T reuse, int logicalPosition) throws IOException {
final int buffer = logicalPosition / this.recordsPerSegment;
http://git-wip-us.apache.org/repos/asf/flink/blob/435ee4eb/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
index 633ec70..a47041b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
@@ -19,9 +19,7 @@
package org.apache.flink.runtime.operators.sort;
import java.io.IOException;
-import java.util.List;
-import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
import org.apache.flink.util.MutableObjectIterator;
@@ -43,11 +41,10 @@ public interface InMemorySorter<T> extends IndexedSortable {
boolean isEmpty();
/**
- * Collects all memory segments from this sorter.
- *
- * @return All memory segments from this sorter.
+ * Disposes the sorter.
+ * This method does not release the memory segments used by the sorter.
*/
- List<MemorySegment> dispose();
+ void dispose();
/**
* Gets the total capacity of this sorter, in bytes.
@@ -62,13 +59,14 @@ public interface InMemorySorter<T> extends IndexedSortable {
* @return The number of bytes occupied.
*/
long getOccupancy();
-
+
/**
- * Gets the number of bytes occupied by records only.
- *
- * @return The number of bytes occupied by records.
+ * Gets the record at the given logical position.
+ *
+ * @param logicalPosition The logical position of the record.
+ * @throws IOException Thrown, if an exception occurred during deserialization.
*/
- long getNumRecordBytes();
+ T getRecord(int logicalPosition) throws IOException;
/**
* Gets the record at the given logical position.
@@ -114,5 +112,5 @@ public interface InMemorySorter<T> extends IndexedSortable {
* @param num The number of elements to write.
* @throws IOException Thrown, if an I/O exception occurred writing to the output view.
*/
- public void writeToOutput(final ChannelWriterOutputView output, final int start, int num) throws IOException;
+ public void writeToOutput(ChannelWriterOutputView output, int start, int num) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/435ee4eb/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
index c05e518..9e1882c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
@@ -133,14 +133,7 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
throw new IllegalArgumentException("Normalized-Key sorter requires at least " + MIN_REQUIRED_BUFFERS + " memory buffers.");
}
this.segmentSize = memory.get(0).size();
-
- if (memory instanceof ArrayList<?>) {
- this.freeMemory = (ArrayList<MemorySegment>) memory;
- }
- else {
- this.freeMemory = new ArrayList<MemorySegment>(memory.size());
- this.freeMemory.addAll(memory);
- }
+ this.freeMemory = new ArrayList<MemorySegment>(memory);
// create the buffer collections
this.sortIndex = new ArrayList<MemorySegment>(16);
@@ -220,20 +213,11 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
return this.numRecords == 0;
}
- /**
- * Collects all memory segments from this sorter.
- *
- * @return All memory segments from this sorter.
- */
@Override
- public List<MemorySegment> dispose() {
- this.freeMemory.addAll(this.sortIndex);
- this.freeMemory.addAll(this.recordBufferSegments);
-
+ public void dispose() {
+ this.freeMemory.clear();
this.recordBufferSegments.clear();
this.sortIndex.clear();
-
- return this.freeMemory;
}
@Override
@@ -245,23 +229,16 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
public long getOccupancy() {
return this.currentDataBufferOffset + this.sortIndexBytes;
}
-
- @Override
- public long getNumRecordBytes() {
- return this.currentDataBufferOffset;
- }
// -------------------------------------------------------------------------
// Retrieving and Writing
// -------------------------------------------------------------------------
- /**
- * Gets the record at the given logical position.
- *
- * @param reuse The target object to deserialize the record into.
- * @param logicalPosition The logical position of the record.
- * @throws IOException Thrown, if an exception occurred during deserialization.
- */
+ @Override
+ public T getRecord(int logicalPosition) throws IOException {
+ return getRecordFromBuffer(readPointer(logicalPosition));
+ }
+
@Override
public T getRecord(T reuse, int logicalPosition) throws IOException {
return getRecordFromBuffer(reuse, readPointer(logicalPosition));
http://git-wip-us.apache.org/repos/asf/flink/blob/435ee4eb/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index 32fbb52..13159d9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -342,7 +342,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
}
// add to empty queue
- CircularElement<E> element = new CircularElement<E>(i, buffer);
+ CircularElement<E> element = new CircularElement<E>(i, buffer, sortSegments);
circularQueues.empty.add(element);
}
@@ -686,15 +686,18 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
final int id;
final InMemorySorter<E> buffer;
+ final List<MemorySegment> memory;
public CircularElement() {
this.id = -1;
this.buffer = null;
+ this.memory = null;
}
- public CircularElement(int id, InMemorySorter<E> buffer) {
+ public CircularElement(int id, InMemorySorter<E> buffer, List<MemorySegment> memory) {
this.id = id;
this.buffer = buffer;
+ this.memory = memory;
}
}
@@ -1199,7 +1202,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
public void go() throws IOException {
final Queue<CircularElement<E>> cache = new ArrayDeque<CircularElement<E>>();
- CircularElement<E> element = null;
+ CircularElement<E> element;
boolean cacheOnly = false;
// ------------------- In-Memory Cache ------------------------
@@ -1236,7 +1239,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
CircularElement<E> circElement;
while ((circElement = this.queues.empty.poll()) != null) {
- memoryForLargeRecordSorting.addAll(circElement.buffer.dispose());
+ circElement.buffer.dispose();
+ memoryForLargeRecordSorting.addAll(circElement.memory);
}
if (memoryForLargeRecordSorting.isEmpty()) {
@@ -1440,10 +1444,10 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
protected final void disposeSortBuffers(boolean releaseMemory) {
while (!this.queues.empty.isEmpty()) {
try {
- final InMemorySorter<?> sorter = this.queues.empty.take().buffer;
- final List<MemorySegment> sorterMem = sorter.dispose();
+ CircularElement<E> element = this.queues.empty.take();
+ element.buffer.dispose();
if (releaseMemory) {
- this.memManager.release(sorterMem);
+ this.memManager.release(element.memory);
}
}
catch (InterruptedException iex) {
http://git-wip-us.apache.org/repos/asf/flink/blob/435ee4eb/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
index f40171a..517bec3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
@@ -33,14 +33,12 @@ import org.apache.flink.runtime.operators.testutils.types.IntPair;
import org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
import org.apache.flink.util.MutableObjectIterator;
+
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-/**
- *
- */
public class FixedLengthRecordSorterTest {
private static final long SEED = 649180756312423613L;
@@ -125,7 +123,8 @@ public class FixedLengthRecordSorterTest {
// System.out.println("RECORDS " + num);
// release the memory occupied by the buffers
- this.memoryManager.release(sorter.dispose());
+ sorter.dispose();
+ this.memoryManager.release(memory);
}
@Test
@@ -170,7 +169,8 @@ public class FixedLengthRecordSorterTest {
Assert.assertEquals("Incorrect number of records", num, count);
// release the memory occupied by the buffers
- this.memoryManager.release(sorter.dispose());
+ sorter.dispose();
+ this.memoryManager.release(memory);
}
@Test
@@ -225,7 +225,8 @@ public class FixedLengthRecordSorterTest {
}
// release the memory occupied by the buffers
- this.memoryManager.release(sorter.dispose());
+ sorter.dispose();
+ this.memoryManager.release(memory);
}
/**
@@ -276,7 +277,8 @@ public class FixedLengthRecordSorterTest {
}
// release the memory occupied by the buffers
- this.memoryManager.release(sorter.dispose());
+ sorter.dispose();
+ this.memoryManager.release(memory);
}
/**
@@ -318,7 +320,8 @@ public class FixedLengthRecordSorterTest {
}
// release the memory occupied by the buffers
- this.memoryManager.release(sorter.dispose());
+ sorter.dispose();
+ this.memoryManager.release(memory);
}
@Test
@@ -345,11 +348,10 @@ public class FixedLengthRecordSorterTest {
MutableObjectIterator<IntPair> iter = sorter.getIterator();
IntPair readTarget = new IntPair();
- int current = 0;
- int last = 0;
+ int current;
+ int last;
iter.next(readTarget);
- //readTarget.getFieldInto(0, last);
last = readTarget.getKey();
while ((readTarget = iter.next(readTarget)) != null) {
@@ -359,13 +361,11 @@ public class FixedLengthRecordSorterTest {
if (cmp > 0) {
Assert.fail("Next key is not larger or equal to previous key.");
}
-
- int tmp = current;
- current = last;
- last = tmp;
+ last = current;
}
// release the memory occupied by the buffers
- this.memoryManager.release(sorter.dispose());
+ sorter.dispose();
+ this.memoryManager.release(memory);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/435ee4eb/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java
index 3e941dd..f8a8f11 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java
@@ -35,15 +35,15 @@ import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
import org.apache.flink.types.Record;
import org.apache.flink.util.MutableObjectIterator;
+
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-/**
- */
-public class NormalizedKeySorterTest
-{
+
+public class NormalizedKeySorterTest {
+
private static final long SEED = 649180756312423613L;
private static final long SEED2 = 97652436586326573L;
@@ -76,16 +76,14 @@ public class NormalizedKeySorterTest
}
}
- private NormalizedKeySorter<Record> newSortBuffer(List<MemorySegment> memory) throws Exception
- {
- @SuppressWarnings("unchecked")
- RecordComparator accessors = new RecordComparator(new int[] {0}, new Class[]{Key.class});
+ private NormalizedKeySorter<Record> newSortBuffer(List<MemorySegment> memory) throws Exception {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ RecordComparator accessors = new RecordComparator(new int[] {0}, new Class[]{ Key.class });
return new NormalizedKeySorter<Record>(RecordSerializer.get(), accessors, memory);
}
@Test
- public void testWriteAndRead() throws Exception
- {
+ public void testWriteAndRead() throws Exception {
final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
@@ -122,12 +120,12 @@ public class NormalizedKeySorterTest
}
// release the memory occupied by the buffers
- this.memoryManager.release(sorter.dispose());
+ sorter.dispose();
+ this.memoryManager.release(memory);
}
@Test
- public void testWriteAndIterator() throws Exception
- {
+ public void testWriteAndIterator() throws Exception {
final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
@@ -161,12 +159,12 @@ public class NormalizedKeySorterTest
}
// release the memory occupied by the buffers
- this.memoryManager.release(sorter.dispose());
+ sorter.dispose();
+ this.memoryManager.release(memory);
}
@Test
- public void testReset() throws Exception
- {
+ public void testReset() throws Exception {
final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
@@ -217,7 +215,8 @@ public class NormalizedKeySorterTest
}
// release the memory occupied by the buffers
- this.memoryManager.release(sorter.dispose());
+ sorter.dispose();
+ this.memoryManager.release(memory);
}
/**
@@ -226,8 +225,7 @@ public class NormalizedKeySorterTest
* and compares for equality.
*/
@Test
- public void testSwap() throws Exception
- {
+ public void testSwap() throws Exception {
final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
@@ -270,7 +268,8 @@ public class NormalizedKeySorterTest
}
// release the memory occupied by the buffers
- this.memoryManager.release(sorter.dispose());
+ sorter.dispose();
+ this.memoryManager.release(memory);
}
/**
@@ -279,8 +278,7 @@ public class NormalizedKeySorterTest
* ones.
*/
@Test
- public void testCompare() throws Exception
- {
+ public void testCompare() throws Exception {
final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
@@ -314,12 +312,12 @@ public class NormalizedKeySorterTest
}
// release the memory occupied by the buffers
- this.memoryManager.release(sorter.dispose());
+ sorter.dispose();
+ this.memoryManager.release(memory);
}
@Test
- public void testSort() throws Exception
- {
+ public void testSort() throws Exception {
final int NUM_RECORDS = 559273;
final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
@@ -364,15 +362,16 @@ public class NormalizedKeySorterTest
}
// release the memory occupied by the buffers
- this.memoryManager.release(sorter.dispose());
+ sorter.dispose();
+ this.memoryManager.release(memory);
}
@Test
public void testSortShortStringKeys() throws Exception {
final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
-
- @SuppressWarnings("unchecked")
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
RecordComparator accessors = new RecordComparator(new int[] {1}, new Class[]{Value.class});
NormalizedKeySorter<Record> sorter = new NormalizedKeySorter<Record>(RecordSerializer.get(), accessors, memory);
@@ -412,7 +411,8 @@ public class NormalizedKeySorterTest
}
// release the memory occupied by the buffers
- this.memoryManager.release(sorter.dispose());
+ sorter.dispose();
+ this.memoryManager.release(memory);
}
@Test
@@ -420,7 +420,7 @@ public class NormalizedKeySorterTest
final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "rawtypes"})
RecordComparator accessors = new RecordComparator(new int[] {1}, new Class[]{Value.class});
NormalizedKeySorter<Record> sorter = new NormalizedKeySorter<Record>(RecordSerializer.get(), accessors, memory);
@@ -460,6 +460,7 @@ public class NormalizedKeySorterTest
}
// release the memory occupied by the buffers
- this.memoryManager.release(sorter.dispose());
+ sorter.dispose();
+ this.memoryManager.release(memory);
}
}