You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/08/03 14:38:34 UTC

[flink] 04/05: [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ad281a2753cee7aa5a5c549225ceeae58e2ebd00
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Aug 3 09:19:42 2018 +0200

    [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger
    
    This commit changes the behaviour of the UnilateralSortMerger to keep references of the created
    InMemorySorters in order to explicitly dispse them when the sort merger is closed. This prevents
    that InMemorySorters leak and block the garbage collection of MemorySegments to which they keep
    references.
    
    This closes #6479.
---
 .../sort/DefaultInMemorySorterFactory.java         |  67 +++++++
 .../operators/sort/InMemorySorterFactory.java      |  37 ++++
 .../operators/sort/UnilateralSortMerger.java       |  63 ++++--
 .../operators/sort/UnilateralSortMergerTest.java   | 213 +++++++++++++++++++++
 4 files changed, 364 insertions(+), 16 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/DefaultInMemorySorterFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/DefaultInMemorySorterFactory.java
new file mode 100644
index 0000000..673ac41
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/DefaultInMemorySorterFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.core.memory.MemorySegment;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+
+/**
+ * Default factory for {@link InMemorySorter}.
+ */
+public class DefaultInMemorySorterFactory<T> implements InMemorySorterFactory<T> {
+
+	@Nonnull
+	private final TypeSerializerFactory<T> typeSerializerFactory;
+
+	@Nonnull
+	private final TypeComparator<T> typeComparator;
+
+	private final boolean useFixedLengthRecordSorter;
+
+	DefaultInMemorySorterFactory(
+			@Nonnull TypeSerializerFactory<T> typeSerializerFactory,
+			@Nonnull TypeComparator<T> typeComparator,
+			int thresholdForInPlaceSorting) {
+		this.typeSerializerFactory = typeSerializerFactory;
+		this.typeComparator = typeComparator;
+
+		TypeSerializer<T> typeSerializer = typeSerializerFactory.getSerializer();
+
+		this.useFixedLengthRecordSorter = typeComparator.supportsSerializationWithKeyNormalization() &&
+			typeSerializer.getLength() > 0 && typeSerializer.getLength() <= thresholdForInPlaceSorting;
+	}
+
+	@Override
+	public InMemorySorter<T> create(List<MemorySegment> sortSegments) {
+		final TypeSerializer<T> typeSerializer = typeSerializerFactory.getSerializer();
+		final TypeComparator<T> duplicateTypeComparator = typeComparator.duplicate();
+
+		if (useFixedLengthRecordSorter) {
+			return new FixedLengthRecordSorter<>(typeSerializer, duplicateTypeComparator, sortSegments);
+		} else {
+			return new NormalizedKeySorter<>(typeSerializer, duplicateTypeComparator, sortSegments);
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorterFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorterFactory.java
new file mode 100644
index 0000000..e15f5d8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorterFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.util.List;
+
+/**
+ * Factory for {@link InMemorySorter}.
+ */
+public interface InMemorySorterFactory<T> {
+
+	/**
+	 * Create an {@link InMemorySorter} instance with the given memory segments.
+	 *
+	 * @param sortSegments to initialize the InMemorySorter with
+	 * @return new InMemorySorter instance
+	 */
+	InMemorySorter<T> create(List<MemorySegment> sortSegments);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index 934eeb7..1c9a8d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -151,6 +152,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 	 */
 	protected final boolean objectReuseEnabled;
 
+	private final Collection<InMemorySorter<?>> inMemorySorters;
+
 	// ------------------------------------------------------------------------
 	//                         Constructor & Shutdown
 	// ------------------------------------------------------------------------
@@ -212,9 +215,39 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
 			int numSortBuffers, int maxNumFileHandles,
 			float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords,
-			boolean objectReuseEnabled)
-	throws IOException
-	{
+			boolean objectReuseEnabled) throws IOException {
+		this (
+			memoryManager,
+			memory,
+			ioManager,
+			input,
+			parentTask,
+			serializerFactory,
+			comparator,
+			numSortBuffers,
+			maxNumFileHandles,
+			startSpillingFraction,
+			noSpillingMemory,
+			handleLargeRecords,
+			objectReuseEnabled,
+			new DefaultInMemorySorterFactory<>(serializerFactory, comparator, THRESHOLD_FOR_IN_PLACE_SORTING));
+	}
+
+	protected UnilateralSortMerger(
+			MemoryManager memoryManager,
+			List<MemorySegment> memory,
+			IOManager ioManager,
+			MutableObjectIterator<E> input,
+			AbstractInvokable parentTask,
+			TypeSerializerFactory<E> serializerFactory,
+			TypeComparator<E> comparator,
+			int numSortBuffers,
+			int maxNumFileHandles,
+			float startSpillingFraction,
+			boolean noSpillingMemory,
+			boolean handleLargeRecords,
+			boolean objectReuseEnabled,
+			InMemorySorterFactory<E> inMemorySorterFactory) throws IOException {
 		// sanity checks
 		if (memoryManager == null || (ioManager == null && !noSpillingMemory) || serializerFactory == null || comparator == null) {
 			throw new NullPointerException();
@@ -330,6 +363,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		
 		// circular queues pass buffers between the threads
 		final CircularQueues<E> circularQueues = new CircularQueues<E>();
+
+		inMemorySorters = new ArrayList<>(numSortBuffers);
 		
 		// allocate the sort buffers and fill empty queue with them
 		final Iterator<MemorySegment> segments = this.sortReadMemory.iterator();
@@ -341,20 +376,11 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 				sortSegments.add(segments.next());
 			}
 			
-			final TypeComparator<E> comp = comparator.duplicate();
-			final InMemorySorter<E> buffer;
-			
-			// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
-			if (comp.supportsSerializationWithKeyNormalization() &&
-					serializer.getLength() > 0 && serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
-			{
-				buffer = new FixedLengthRecordSorter<E>(serializerFactory.getSerializer(), comp, sortSegments);
-			} else {
-				buffer = new NormalizedKeySorter<E>(serializerFactory.getSerializer(), comp, sortSegments);
-			}
+			final InMemorySorter<E> inMemorySorter = inMemorySorterFactory.create(sortSegments);
+			inMemorySorters.add(inMemorySorter);
 
 			// add to empty queue
-			CircularElement<E> element = new CircularElement<E>(i, buffer, sortSegments);
+			CircularElement<E> element = new CircularElement<E>(i, inMemorySorter, sortSegments);
 			circularQueues.empty.add(element);
 		}
 
@@ -494,7 +520,12 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 			}
 		}
 		finally {
-			
+
+			// Dispose all in memory sorter in order to clear memory references
+			for (InMemorySorter<?> inMemorySorter : inMemorySorters) {
+				inMemorySorter.dispose();
+			}
+
 			// RELEASE ALL MEMORY. If the threads and channels are still running, this should cause
 			// exceptions, because their memory segments are freed
 			try {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java
new file mode 100644
index 0000000..cdaa5b1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link UnilateralSortMerger}.
+ */
+public class UnilateralSortMergerTest extends TestLogger {
+
+	@Test
+	public void testInMemorySorterDisposal() throws Exception {
+		final TestingInMemorySorterFactory<Tuple2<Integer, Integer>> inMemorySorterFactory = new TestingInMemorySorterFactory<>();
+
+		final int numPages = 32;
+		final MemoryManager memoryManager = new MemoryManager(MemoryManager.DEFAULT_PAGE_SIZE * numPages, 1);
+		final IOManagerAsync ioManager = new IOManagerAsync();
+		final DummyInvokable parentTask = new DummyInvokable();
+
+		try {
+			final List<MemorySegment> memory = memoryManager.allocatePages(parentTask, numPages);
+			final UnilateralSortMerger<Tuple2<Integer, Integer>> unilateralSortMerger = new UnilateralSortMerger<>(
+				memoryManager,
+				memory,
+				ioManager,
+				EmptyMutableObjectIterator.get(),
+				parentTask,
+				TestData.getIntIntTupleSerializerFactory(),
+				TestData.getIntIntTupleComparator(),
+				10,
+				2,
+				1.0f,
+				true,
+				false,
+				false,
+				inMemorySorterFactory);
+
+			final Collection<TestingInMemorySorter<?>> inMemorySorters = inMemorySorterFactory.getInMemorySorters();
+
+			assertThat(inMemorySorters, is(not(empty())));
+
+			unilateralSortMerger.close();
+
+			assertThat(unilateralSortMerger.closed, is(true));
+
+			for (TestingInMemorySorter<?> inMemorySorter : inMemorySorters) {
+				assertThat(inMemorySorter.isDisposed(), is(true));
+			}
+		} finally {
+			ioManager.shutdown();
+			memoryManager.shutdown();
+		}
+	}
+
+	private static final class TestingInMemorySorterFactory<T> implements InMemorySorterFactory<T> {
+
+		private final Collection<TestingInMemorySorter<?>> inMemorySorters = new ArrayList<>(10);
+
+		Collection<TestingInMemorySorter<?>> getInMemorySorters() {
+			return inMemorySorters;
+		}
+
+		@Override
+		public InMemorySorter<T> create(List<MemorySegment> sortSegments) {
+			final TestingInMemorySorter<T> testingInMemorySorter = new TestingInMemorySorter<>();
+			inMemorySorters.add(testingInMemorySorter);
+			return testingInMemorySorter;
+		}
+	}
+
+	private static final class TestingInMemorySorter<T> implements InMemorySorter<T> {
+
+		private volatile boolean isDisposed;
+
+		public boolean isDisposed() {
+			return isDisposed;
+		}
+
+		@Override
+		public void reset() {
+
+		}
+
+		@Override
+		public boolean isEmpty() {
+			return true;
+		}
+
+		@Override
+		public void dispose() {
+			isDisposed = true;
+		}
+
+		@Override
+		public long getCapacity() {
+			return 0;
+		}
+
+		@Override
+		public long getOccupancy() {
+			return 0;
+		}
+
+		@Override
+		public T getRecord(int logicalPosition) throws IOException {
+			return null;
+		}
+
+		@Override
+		public T getRecord(T reuse, int logicalPosition) throws IOException {
+			return null;
+		}
+
+		@Override
+		public boolean write(T record) throws IOException {
+			return false;
+		}
+
+		@Override
+		public MutableObjectIterator<T> getIterator() {
+			return null;
+		}
+
+		@Override
+		public void writeToOutput(ChannelWriterOutputView output) throws IOException {
+
+		}
+
+		@Override
+		public void writeToOutput(ChannelWriterOutputView output, LargeRecordHandler<T> largeRecordsOutput) throws IOException {
+
+		}
+
+		@Override
+		public void writeToOutput(ChannelWriterOutputView output, int start, int num) throws IOException {
+
+		}
+
+		@Override
+		public int compare(int i, int j) {
+			return 0;
+		}
+
+		@Override
+		public int compare(int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int segmentOffsetJ) {
+			return 0;
+		}
+
+		@Override
+		public void swap(int i, int j) {
+
+		}
+
+		@Override
+		public void swap(int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int segmentOffsetJ) {
+
+		}
+
+		@Override
+		public int size() {
+			return 0;
+		}
+
+		@Override
+		public int recordSize() {
+			return 0;
+		}
+
+		@Override
+		public int recordsPerSegment() {
+			return 0;
+		}
+	}
+
+}