You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/08/03 14:38:30 UTC

[flink] branch release-1.6 updated (c858d31 -> 231c955)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from c858d31  [FLINK-9947] [docs] Document unified table sources/sinks/formats
     new 06f6a85  [FLINK-10033] [runtime] Task releases reference to AbstractInvokable
     new 928198b  [hotfix] Let MemoryManager log start up values
     new b8084f9  [hotfix] Fix import order in CombiningUnilateralSortMergerITCase
     new ad281a2  [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger
     new 231c955  [FLINK-9936][mesos] Wait for leadership before creating MesosResourceManager components

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../clusterframework/MesosResourceManager.java     | 130 ++++++++-----
 .../clusterframework/MesosResourceManagerTest.java |  21 ++
 .../apache/flink/runtime/memory/MemoryManager.java |   9 +
 .../sort/DefaultInMemorySorterFactory.java         |  67 +++++++
 .../sort/InMemorySorterFactory.java}               |  20 +-
 .../operators/sort/UnilateralSortMerger.java       |  63 ++++--
 .../runtime/resourcemanager/ResourceManager.java   |  90 +++++++--
 .../org/apache/flink/runtime/taskmanager/Task.java |  51 +++--
 .../sort/CombiningUnilateralSortMergerITCase.java  |  23 +--
 .../operators/sort/UnilateralSortMergerTest.java   | 213 +++++++++++++++++++++
 .../apache/flink/runtime/taskmanager/TaskTest.java |   5 +
 11 files changed, 574 insertions(+), 118 deletions(-)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/DefaultInMemorySorterFactory.java
 copy flink-runtime/src/main/java/org/apache/flink/runtime/{io/disk/iomanager/BulkBlockChannelReader.java => operators/sort/InMemorySorterFactory.java} (69%)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java


[flink] 01/05: [FLINK-10033] [runtime] Task releases reference to AbstractInvokable

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 06f6a85ca8d29249766819144f571149ad9fcd3d
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Aug 2 20:57:50 2018 +0200

    [FLINK-10033] [runtime] Task releases reference to AbstractInvokable
    
    To guard against memory leaks, the Task releases the reference to its AbstractInvokable
    when it shuts down or cancels.
    
    This closes #6480.
---
 .../org/apache/flink/runtime/taskmanager/Task.java | 51 +++++++++++++++-------
 .../apache/flink/runtime/taskmanager/TaskTest.java |  5 +++
 2 files changed, 40 insertions(+), 16 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 60b2ed8..92ae167 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -78,6 +78,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
@@ -243,7 +244,9 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 	/** atomic flag that makes sure the invokable is canceled exactly once upon error. */
 	private final AtomicBoolean invokableHasBeenCanceled;
 
-	/** The invokable of this task, if initialized. */
+	/** The invokable of this task, if initialized. All accesses must copy the reference and
+	 * check for null, as this field is cleared as part of the disposal logic. */
+	@Nullable
 	private volatile AbstractInvokable invokable;
 
 	/** The current execution state of the task. */
@@ -473,6 +476,12 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 		return taskCancellationTimeout;
 	}
 
+	@Nullable
+	@VisibleForTesting
+	AbstractInvokable getInvokable() {
+		return invokable;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Task Execution
 	// ------------------------------------------------------------------------
@@ -762,7 +771,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 					if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
 						if (t instanceof CancelTaskException) {
 							if (transitionState(current, ExecutionState.CANCELED)) {
-								cancelInvokable();
+								cancelInvokable(invokable);
 
 								notifyObservers(ExecutionState.CANCELED, null);
 								break;
@@ -773,7 +782,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 								// proper failure of the task. record the exception as the root cause
 								String errorMessage = String.format("Execution of %s (%s) failed.", taskNameWithSubtask, executionId);
 								failureCause = t;
-								cancelInvokable();
+								cancelInvokable(invokable);
 
 								notifyObservers(ExecutionState.FAILED, new Exception(errorMessage, t));
 								break;
@@ -808,6 +817,10 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 			try {
 				LOG.info("Freeing task resources for {} ({}).", taskNameWithSubtask, executionId);
 
+				// clear the reference to the invokable. this helps guard against holding references
+				// to the invokable and its structures in cases where this Task object is still referenced
+				this.invokable = null;
+
 				// stop the async dispatcher.
 				// copy dispatcher reference to stack, against concurrent release
 				ExecutorService dispatcher = this.asyncCallDispatcher;
@@ -924,18 +937,20 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 	 * @throws IllegalStateException if the {@link Task} is not yet running
 	 */
 	public void stopExecution() {
+		// copy reference to stack, to guard against concurrent setting to null
+		final AbstractInvokable invokable = this.invokable;
+
 		if (invokable != null) {
-			LOG.info("Attempting to stop task {} ({}).", taskNameWithSubtask, executionId);
 			if (invokable instanceof StoppableTask) {
-				Runnable runnable = new Runnable() {
-					@Override
-					public void run() {
-						try {
-							((StoppableTask) invokable).stop();
-						} catch (RuntimeException e) {
-							LOG.error("Stopping task {} ({}) failed.", taskNameWithSubtask, executionId, e);
-							taskManagerActions.failTask(executionId, e);
-						}
+				LOG.info("Attempting to stop task {} ({}).", taskNameWithSubtask, executionId);
+				final StoppableTask stoppable = (StoppableTask) invokable;
+
+				Runnable runnable = () -> {
+					try {
+						stoppable.stop();
+					} catch (Throwable t) {
+						LOG.error("Stopping task {} ({}) failed.", taskNameWithSubtask, executionId, t);
+						taskManagerActions.failTask(executionId, t);
 					}
 				};
 				executeAsyncCallRunnable(runnable, String.format("Stopping source task %s (%s).", taskNameWithSubtask, executionId));
@@ -945,7 +960,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 		} else {
 			throw new IllegalStateException(
 				String.format(
-					"Cannot stop task %s (%s) because it is not yet running.",
+					"Cannot stop task %s (%s) because it is not running.",
 					taskNameWithSubtask,
 					executionId));
 		}
@@ -1010,6 +1025,10 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 				if (transitionState(ExecutionState.RUNNING, targetState, cause)) {
 					// we are canceling / failing out of the running state
 					// we need to cancel the invokable
+
+					// copy reference to guard against concurrent null-ing out the reference
+					final AbstractInvokable invokable = this.invokable;
+
 					if (invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) {
 						this.failureCause = cause;
 						notifyObservers(
@@ -1363,9 +1382,9 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	private void cancelInvokable() {
+	private void cancelInvokable(AbstractInvokable invokable) {
 		// in case of an exception during execution, we still call "cancel()" on the task
-		if (invokable != null && this.invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) {
+		if (invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) {
 			try {
 				invokable.cancel();
 			}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 1829e97..3dfcfb3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -174,6 +174,7 @@ public class TaskTest extends TestLogger {
 			assertEquals(ExecutionState.FINISHED, task.getExecutionState());
 			assertFalse(task.isCanceledOrFailed());
 			assertNull(task.getFailureCause());
+			assertNull(task.getInvokable());
 
 			// verify listener messages
 			validateListenerMessage(ExecutionState.RUNNING, task, false);
@@ -202,6 +203,8 @@ public class TaskTest extends TestLogger {
 			// verify final state
 			assertEquals(ExecutionState.CANCELED, task.getExecutionState());
 			validateUnregisterTask(task.getExecutionId());
+
+			assertNull(task.getInvokable());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -258,6 +261,8 @@ public class TaskTest extends TestLogger {
 
 			// make sure that the TaskManager received an message to unregister the task
 			validateUnregisterTask(task.getExecutionId());
+
+			assertNull(task.getInvokable());
 		}
 		catch (Exception e) {
 			e.printStackTrace();


[flink] 04/05: [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ad281a2753cee7aa5a5c549225ceeae58e2ebd00
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Aug 3 09:19:42 2018 +0200

    [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger
    
    This commit changes the behaviour of the UnilateralSortMerger to keep references of the created
    InMemorySorters in order to explicitly dispse them when the sort merger is closed. This prevents
    that InMemorySorters leak and block the garbage collection of MemorySegments to which they keep
    references.
    
    This closes #6479.
---
 .../sort/DefaultInMemorySorterFactory.java         |  67 +++++++
 .../operators/sort/InMemorySorterFactory.java      |  37 ++++
 .../operators/sort/UnilateralSortMerger.java       |  63 ++++--
 .../operators/sort/UnilateralSortMergerTest.java   | 213 +++++++++++++++++++++
 4 files changed, 364 insertions(+), 16 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/DefaultInMemorySorterFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/DefaultInMemorySorterFactory.java
new file mode 100644
index 0000000..673ac41
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/DefaultInMemorySorterFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.core.memory.MemorySegment;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+
+/**
+ * Default factory for {@link InMemorySorter}.
+ */
+public class DefaultInMemorySorterFactory<T> implements InMemorySorterFactory<T> {
+
+	@Nonnull
+	private final TypeSerializerFactory<T> typeSerializerFactory;
+
+	@Nonnull
+	private final TypeComparator<T> typeComparator;
+
+	private final boolean useFixedLengthRecordSorter;
+
+	DefaultInMemorySorterFactory(
+			@Nonnull TypeSerializerFactory<T> typeSerializerFactory,
+			@Nonnull TypeComparator<T> typeComparator,
+			int thresholdForInPlaceSorting) {
+		this.typeSerializerFactory = typeSerializerFactory;
+		this.typeComparator = typeComparator;
+
+		TypeSerializer<T> typeSerializer = typeSerializerFactory.getSerializer();
+
+		this.useFixedLengthRecordSorter = typeComparator.supportsSerializationWithKeyNormalization() &&
+			typeSerializer.getLength() > 0 && typeSerializer.getLength() <= thresholdForInPlaceSorting;
+	}
+
+	@Override
+	public InMemorySorter<T> create(List<MemorySegment> sortSegments) {
+		final TypeSerializer<T> typeSerializer = typeSerializerFactory.getSerializer();
+		final TypeComparator<T> duplicateTypeComparator = typeComparator.duplicate();
+
+		if (useFixedLengthRecordSorter) {
+			return new FixedLengthRecordSorter<>(typeSerializer, duplicateTypeComparator, sortSegments);
+		} else {
+			return new NormalizedKeySorter<>(typeSerializer, duplicateTypeComparator, sortSegments);
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorterFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorterFactory.java
new file mode 100644
index 0000000..e15f5d8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorterFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.util.List;
+
+/**
+ * Factory for {@link InMemorySorter}.
+ */
+public interface InMemorySorterFactory<T> {
+
+	/**
+	 * Create an {@link InMemorySorter} instance with the given memory segments.
+	 *
+	 * @param sortSegments to initialize the InMemorySorter with
+	 * @return new InMemorySorter instance
+	 */
+	InMemorySorter<T> create(List<MemorySegment> sortSegments);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index 934eeb7..1c9a8d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -151,6 +152,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 	 */
 	protected final boolean objectReuseEnabled;
 
+	private final Collection<InMemorySorter<?>> inMemorySorters;
+
 	// ------------------------------------------------------------------------
 	//                         Constructor & Shutdown
 	// ------------------------------------------------------------------------
@@ -212,9 +215,39 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
 			int numSortBuffers, int maxNumFileHandles,
 			float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords,
-			boolean objectReuseEnabled)
-	throws IOException
-	{
+			boolean objectReuseEnabled) throws IOException {
+		this (
+			memoryManager,
+			memory,
+			ioManager,
+			input,
+			parentTask,
+			serializerFactory,
+			comparator,
+			numSortBuffers,
+			maxNumFileHandles,
+			startSpillingFraction,
+			noSpillingMemory,
+			handleLargeRecords,
+			objectReuseEnabled,
+			new DefaultInMemorySorterFactory<>(serializerFactory, comparator, THRESHOLD_FOR_IN_PLACE_SORTING));
+	}
+
+	protected UnilateralSortMerger(
+			MemoryManager memoryManager,
+			List<MemorySegment> memory,
+			IOManager ioManager,
+			MutableObjectIterator<E> input,
+			AbstractInvokable parentTask,
+			TypeSerializerFactory<E> serializerFactory,
+			TypeComparator<E> comparator,
+			int numSortBuffers,
+			int maxNumFileHandles,
+			float startSpillingFraction,
+			boolean noSpillingMemory,
+			boolean handleLargeRecords,
+			boolean objectReuseEnabled,
+			InMemorySorterFactory<E> inMemorySorterFactory) throws IOException {
 		// sanity checks
 		if (memoryManager == null || (ioManager == null && !noSpillingMemory) || serializerFactory == null || comparator == null) {
 			throw new NullPointerException();
@@ -330,6 +363,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		
 		// circular queues pass buffers between the threads
 		final CircularQueues<E> circularQueues = new CircularQueues<E>();
+
+		inMemorySorters = new ArrayList<>(numSortBuffers);
 		
 		// allocate the sort buffers and fill empty queue with them
 		final Iterator<MemorySegment> segments = this.sortReadMemory.iterator();
@@ -341,20 +376,11 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 				sortSegments.add(segments.next());
 			}
 			
-			final TypeComparator<E> comp = comparator.duplicate();
-			final InMemorySorter<E> buffer;
-			
-			// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
-			if (comp.supportsSerializationWithKeyNormalization() &&
-					serializer.getLength() > 0 && serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
-			{
-				buffer = new FixedLengthRecordSorter<E>(serializerFactory.getSerializer(), comp, sortSegments);
-			} else {
-				buffer = new NormalizedKeySorter<E>(serializerFactory.getSerializer(), comp, sortSegments);
-			}
+			final InMemorySorter<E> inMemorySorter = inMemorySorterFactory.create(sortSegments);
+			inMemorySorters.add(inMemorySorter);
 
 			// add to empty queue
-			CircularElement<E> element = new CircularElement<E>(i, buffer, sortSegments);
+			CircularElement<E> element = new CircularElement<E>(i, inMemorySorter, sortSegments);
 			circularQueues.empty.add(element);
 		}
 
@@ -494,7 +520,12 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 			}
 		}
 		finally {
-			
+
+			// Dispose all in memory sorter in order to clear memory references
+			for (InMemorySorter<?> inMemorySorter : inMemorySorters) {
+				inMemorySorter.dispose();
+			}
+
 			// RELEASE ALL MEMORY. If the threads and channels are still running, this should cause
 			// exceptions, because their memory segments are freed
 			try {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java
new file mode 100644
index 0000000..cdaa5b1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link UnilateralSortMerger}.
+ */
+public class UnilateralSortMergerTest extends TestLogger {
+
+	@Test
+	public void testInMemorySorterDisposal() throws Exception {
+		final TestingInMemorySorterFactory<Tuple2<Integer, Integer>> inMemorySorterFactory = new TestingInMemorySorterFactory<>();
+
+		final int numPages = 32;
+		final MemoryManager memoryManager = new MemoryManager(MemoryManager.DEFAULT_PAGE_SIZE * numPages, 1);
+		final IOManagerAsync ioManager = new IOManagerAsync();
+		final DummyInvokable parentTask = new DummyInvokable();
+
+		try {
+			final List<MemorySegment> memory = memoryManager.allocatePages(parentTask, numPages);
+			final UnilateralSortMerger<Tuple2<Integer, Integer>> unilateralSortMerger = new UnilateralSortMerger<>(
+				memoryManager,
+				memory,
+				ioManager,
+				EmptyMutableObjectIterator.get(),
+				parentTask,
+				TestData.getIntIntTupleSerializerFactory(),
+				TestData.getIntIntTupleComparator(),
+				10,
+				2,
+				1.0f,
+				true,
+				false,
+				false,
+				inMemorySorterFactory);
+
+			final Collection<TestingInMemorySorter<?>> inMemorySorters = inMemorySorterFactory.getInMemorySorters();
+
+			assertThat(inMemorySorters, is(not(empty())));
+
+			unilateralSortMerger.close();
+
+			assertThat(unilateralSortMerger.closed, is(true));
+
+			for (TestingInMemorySorter<?> inMemorySorter : inMemorySorters) {
+				assertThat(inMemorySorter.isDisposed(), is(true));
+			}
+		} finally {
+			ioManager.shutdown();
+			memoryManager.shutdown();
+		}
+	}
+
+	private static final class TestingInMemorySorterFactory<T> implements InMemorySorterFactory<T> {
+
+		private final Collection<TestingInMemorySorter<?>> inMemorySorters = new ArrayList<>(10);
+
+		Collection<TestingInMemorySorter<?>> getInMemorySorters() {
+			return inMemorySorters;
+		}
+
+		@Override
+		public InMemorySorter<T> create(List<MemorySegment> sortSegments) {
+			final TestingInMemorySorter<T> testingInMemorySorter = new TestingInMemorySorter<>();
+			inMemorySorters.add(testingInMemorySorter);
+			return testingInMemorySorter;
+		}
+	}
+
+	private static final class TestingInMemorySorter<T> implements InMemorySorter<T> {
+
+		private volatile boolean isDisposed;
+
+		public boolean isDisposed() {
+			return isDisposed;
+		}
+
+		@Override
+		public void reset() {
+
+		}
+
+		@Override
+		public boolean isEmpty() {
+			return true;
+		}
+
+		@Override
+		public void dispose() {
+			isDisposed = true;
+		}
+
+		@Override
+		public long getCapacity() {
+			return 0;
+		}
+
+		@Override
+		public long getOccupancy() {
+			return 0;
+		}
+
+		@Override
+		public T getRecord(int logicalPosition) throws IOException {
+			return null;
+		}
+
+		@Override
+		public T getRecord(T reuse, int logicalPosition) throws IOException {
+			return null;
+		}
+
+		@Override
+		public boolean write(T record) throws IOException {
+			return false;
+		}
+
+		@Override
+		public MutableObjectIterator<T> getIterator() {
+			return null;
+		}
+
+		@Override
+		public void writeToOutput(ChannelWriterOutputView output) throws IOException {
+
+		}
+
+		@Override
+		public void writeToOutput(ChannelWriterOutputView output, LargeRecordHandler<T> largeRecordsOutput) throws IOException {
+
+		}
+
+		@Override
+		public void writeToOutput(ChannelWriterOutputView output, int start, int num) throws IOException {
+
+		}
+
+		@Override
+		public int compare(int i, int j) {
+			return 0;
+		}
+
+		@Override
+		public int compare(int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int segmentOffsetJ) {
+			return 0;
+		}
+
+		@Override
+		public void swap(int i, int j) {
+
+		}
+
+		@Override
+		public void swap(int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int segmentOffsetJ) {
+
+		}
+
+		@Override
+		public int size() {
+			return 0;
+		}
+
+		@Override
+		public int recordSize() {
+			return 0;
+		}
+
+		@Override
+		public int recordsPerSegment() {
+			return 0;
+		}
+	}
+
+}


[flink] 05/05: [FLINK-9936][mesos] Wait for leadership before creating MesosResourceManager components

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 231c9559ef67cf159e0413b56634a382bc78d93c
Author: gyao <ga...@data-artisans.com>
AuthorDate: Mon Jul 30 10:57:40 2018 +0800

    [FLINK-9936][mesos] Wait for leadership before creating MesosResourceManager components
    
    This closes #6464.
---
 .../clusterframework/MesosResourceManager.java     | 130 +++++++++++++--------
 .../clusterframework/MesosResourceManagerTest.java |  21 ++++
 .../runtime/resourcemanager/ResourceManager.java   |  90 ++++++++++----
 3 files changed, 173 insertions(+), 68 deletions(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index ea10ff8..776f016 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -85,6 +85,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 
 import scala.Option;
@@ -139,6 +140,8 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 	final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
 	final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
 
+	private MesosConfiguration initializedMesosConfig;
+
 	public MesosResourceManager(
 			// base class
 			RpcService rpcService,
@@ -223,9 +226,6 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 	//  Resource Manager overrides
 	// ------------------------------------------------------------------------
 
-	/**
-	 * Starts the Mesos-specifics.
-	 */
 	@Override
 	protected void initialize() throws ResourceManagerException {
 		// create and start the worker store
@@ -236,9 +236,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 			throw new ResourceManagerException("Unable to initialize the worker store.", e);
 		}
 
-		// register with Mesos
-		// TODO : defer connection until RM acquires leadership
-
+		// Prepare to register with Mesos
 		Protos.FrameworkInfo.Builder frameworkInfo = mesosConfig.frameworkInfo()
 			.clone()
 			.setCheckpoint(true);
@@ -254,49 +252,86 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 			throw new ResourceManagerException("Unable to recover the framework ID.", e);
 		}
 
-		MesosConfiguration initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo);
+		initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo);
 		MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig);
+
+		this.selfActor = createSelfActor();
+
+		// configure the artifact server to serve the TM container artifacts
+		try {
+			LaunchableMesosWorker.configureArtifactServer(artifactServer, taskManagerContainerSpec);
+		}
+		catch (IOException e) {
+			throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e);
+		}
+	}
+
+	@Override
+	protected CompletableFuture<Void> prepareLeadershipAsync() {
+		Preconditions.checkState(initializedMesosConfig != null);
+
 		schedulerDriver = initializedMesosConfig.createDriver(
 			new MesosResourceManagerSchedulerCallback(),
 			false);
 
 		// create supporting actors
-		selfActor = createSelfActor();
 		connectionMonitor = createConnectionMonitor();
 		launchCoordinator = createLaunchCoordinator(schedulerDriver, selfActor);
 		reconciliationCoordinator = createReconciliationCoordinator(schedulerDriver);
 		taskMonitor = createTaskMonitor(schedulerDriver);
 
-		// recover state
-		try {
-			recoverWorkers();
-		} catch (Exception e) {
-			throw new ResourceManagerException("Unable to recover Mesos worker state.", e);
-		}
+		return getWorkersAsync().thenApplyAsync((tasksFromPreviousAttempts) -> {
+			// recover state
+			recoverWorkers(tasksFromPreviousAttempts);
 
-		// configure the artifact server to serve the TM container artifacts
-		try {
-			LaunchableMesosWorker.configureArtifactServer(artifactServer, taskManagerContainerSpec);
-		}
-		catch (IOException e) {
-			throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e);
-		}
+			// begin scheduling
+			connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor);
+			schedulerDriver.start();
 
-		// begin scheduling
-		connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor);
-		schedulerDriver.start();
+			LOG.info("Mesos resource manager started.");
+			return null;
+		}, getMainThreadExecutor());
+	}
+
+	@Override
+	protected CompletableFuture<Void> clearStateAsync() {
+		schedulerDriver.stop(true);
+
+		workersInNew.clear();
+		workersInLaunch.clear();
+		workersBeingReturned.clear();
 
-		LOG.info("Mesos resource manager initialized.");
+		return stopSupportingActorsAsync();
 	}
 
 	/**
-	 * Recover framework/worker information persisted by a prior incarnation of the RM.
+	 * Fetches framework/worker information persisted by a prior incarnation of the RM.
 	 */
-	private void recoverWorkers() throws Exception {
+	private CompletableFuture<List<MesosWorkerStore.Worker>> getWorkersAsync() {
 		// if this resource manager is recovering from failure,
 		// then some worker tasks are most likely still alive and we can re-obtain them
-		final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = workerStore.recoverWorkers();
+		return CompletableFuture.supplyAsync(() -> {
+			try {
+				final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = workerStore.recoverWorkers();
+				for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
+					if (worker.state() == MesosWorkerStore.WorkerState.New) {
+						// remove new workers because allocation requests are transient
+						workerStore.removeWorker(worker.taskID());
+					}
+				}
+				return tasksFromPreviousAttempts;
+			} catch (final Exception e) {
+				throw new CompletionException(new ResourceManagerException(e));
+			}
+		}, getRpcService().getExecutor());
+	}
 
+	/**
+	 * Recovers given framework/worker information.
+	 *
+	 * @see #getWorkersAsync()
+	 */
+	private void recoverWorkers(final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts) {
 		assert(workersInNew.isEmpty());
 		assert(workersInLaunch.isEmpty());
 		assert(workersBeingReturned.isEmpty());
@@ -307,15 +342,10 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 			List<Tuple2<TaskRequest, String>> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size());
 
 			for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
-				LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), worker.profile());
-
 				switch(worker.state()) {
-					case New:
-						// remove new workers because allocation requests are transient
-						workerStore.removeWorker(worker.taskID());
-						break;
 					case Launched:
 						workersInLaunch.put(extractResourceID(worker.taskID()), worker);
+						final LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), worker.profile());
 						toAssign.add(new Tuple2<>(launchable.taskRequest(), worker.hostname().get()));
 						break;
 					case Released:
@@ -332,8 +362,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 		}
 	}
 
-	@Override
-	public CompletableFuture<Void> postStop() {
+	private CompletableFuture<Void> stopSupportingActorsAsync() {
 		FiniteDuration stopTimeout = new FiniteDuration(5L, TimeUnit.SECONDS);
 
 		CompletableFuture<Boolean> stopTaskMonitorFuture = stopActor(taskMonitor, stopTimeout);
@@ -348,23 +377,23 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 		CompletableFuture<Boolean> stopReconciliationCoordinatorFuture = stopActor(reconciliationCoordinator, stopTimeout);
 		reconciliationCoordinator = null;
 
-		CompletableFuture<Void> stopFuture = CompletableFuture.allOf(
+		return CompletableFuture.allOf(
 			stopTaskMonitorFuture,
 			stopConnectionMonitorFuture,
 			stopLaunchCoordinatorFuture,
 			stopReconciliationCoordinatorFuture);
+	}
 
-		final CompletableFuture<Void> terminationFuture = super.postStop();
-
-		return stopFuture.thenCombine(
-			terminationFuture,
-			(Void voidA, Void voidB) -> null);
+	@Override
+	public CompletableFuture<Void> postStop() {
+		return stopSupportingActorsAsync().thenCompose((ignored) -> super.postStop());
 	}
 
 	@Override
 	protected void internalDeregisterApplication(
 			ApplicationStatus finalStatus,
 			@Nullable String diagnostics) throws ResourceManagerException {
+
 		LOG.info("Shutting down and unregistering as a Mesos framework.");
 
 		Exception exception = null;
@@ -630,11 +659,16 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 	 * Tries to shut down the given actor gracefully.
 	 *
 	 * @param actorRef specifying the actor to shut down
-	 * @param timeout for the graceful shut down
-	 * @return Future containing the result of the graceful shut down
+	 * @param timeout  for the graceful shut down
+	 * @return A future that finishes with {@code true} iff. the actor could be stopped gracefully
+	 * or {@code actorRef} was {@code null}.
 	 */
-	private CompletableFuture<Boolean> stopActor(final ActorRef actorRef, FiniteDuration timeout) {
-		return FutureUtils.toJava(Patterns.gracefulStop(actorRef, timeout))
+	private CompletableFuture<Boolean> stopActor(@Nullable final ActorRef actorRef, FiniteDuration timeout) {
+		if (actorRef == null) {
+			return CompletableFuture.completedFuture(true);
+		}
+
+		return FutureUtils.<Boolean>toJava(Patterns.gracefulStop(actorRef, timeout))
 			.exceptionally(
 				(Throwable throwable) -> {
 					// The actor did not stop gracefully in time, try to directly stop it
@@ -642,7 +676,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 
 					log.warn("Could not stop actor {} gracefully.", actorRef.path(), throwable);
 
-					return true;
+					return false;
 				}
 			);
 	}
@@ -797,7 +831,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 
 		@Override
 		public void disconnected(SchedulerDriver driver) {
-			runAsync(new Runnable() {
+			runAsyncWithoutFencing(new Runnable() {
 				@Override
 				public void run() {
 					MesosResourceManager.this.disconnected(new Disconnected());
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 9fa8c0e..9f4fc26 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -513,6 +513,7 @@ public class MesosResourceManagerTest extends TestLogger {
 		@Override
 		public void close() throws Exception {
 			rpcService.stopService().get();
+			fatalErrorHandler.rethrowError();
 		}
 	}
 
@@ -815,4 +816,24 @@ public class MesosResourceManagerTest extends TestLogger {
 			resourceManager.taskRouter.expectMsgClass(Disconnected.class);
 		}};
 	}
+
+	@Test
+	public void testClearStateAfterRevokeLeadership() throws Exception {
+		new Context() {{
+			final MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1);
+			final MesosWorkerStore.Worker worker2 = MesosWorkerStore.Worker.newWorker(task2).launchWorker(slave1, slave1host);
+			final MesosWorkerStore.Worker worker3 = MesosWorkerStore.Worker.newWorker(task3).launchWorker(slave1, slave1host).releaseWorker();
+			when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
+			when(rmServices.workerStore.recoverWorkers()).thenReturn(Arrays.asList(worker1, worker2, worker3)).thenReturn(Collections.emptyList());
+
+			startResourceManager();
+			rmServices.rmLeaderElectionService.notLeader();
+			rmServices.grantLeadership();
+
+			assertThat(resourceManager.workersInNew.size(), equalTo(0));
+			assertThat(resourceManager.workersInLaunch.size(), equalTo(0));
+			assertThat(resourceManager.workersBeingReturned.size(), equalTo(0));
+			verify(rmServices.schedulerDriver).stop(true);
+		}};
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index fb1b888..ff319b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -142,6 +142,14 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	/** All registered listeners for status updates of the ResourceManager. */
 	private ConcurrentMap<String, InfoMessageListenerRpcGateway> infoMessageListeners;
 
+	/**
+	 * Represents asynchronous state clearing work.
+	 *
+	 * @see #clearStateAsync()
+	 * @see #clearStateInternal()
+	 */
+	private CompletableFuture<Void> clearStateFuture = CompletableFuture.completedFuture(null);
+
 	public ResourceManager(
 			RpcService rpcService,
 			String resourceManagerEndpointId,
@@ -197,6 +205,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 
 		leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
 
+		initialize();
+
 		try {
 			leaderElectionService.start(this);
 		} catch (Exception e) {
@@ -208,8 +218,6 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		} catch (Exception e) {
 			throw new ResourceManagerException("Could not start the job leader id service.", e);
 		}
-
-		initialize();
 	}
 
 	@Override
@@ -238,7 +246,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 			exception = ExceptionUtils.firstOrSuppressed(e, exception);
 		}
 
-		clearState();
+		clearStateInternal();
 
 		if (exception != null) {
 			return FutureUtils.completedExceptionally(
@@ -731,7 +739,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		}
 	}
 
-	private void clearState() {
+	private void clearStateInternal() {
 		jobManagerRegistrations.clear();
 		jmResourceIdRegistrations.clear();
 		taskExecutors.clear();
@@ -741,6 +749,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		} catch (Exception e) {
 			onFatalError(new ResourceManagerException("Could not properly clear the job leader id service.", e));
 		}
+		clearStateFuture = clearStateAsync();
 	}
 
 	/**
@@ -893,26 +902,45 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	 */
 	@Override
 	public void grantLeadership(final UUID newLeaderSessionID) {
-		runAsyncWithoutFencing(
-			() -> {
-				final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID);
-
-				log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId);
+		final CompletableFuture<Boolean> acceptLeadershipFuture = clearStateFuture
+			.thenComposeAsync((ignored) -> tryAcceptLeadership(newLeaderSessionID), getUnfencedMainThreadExecutor());
+
+		final CompletableFuture<Void> confirmationFuture = acceptLeadershipFuture.thenAcceptAsync(
+			(acceptLeadership) -> {
+				if (acceptLeadership) {
+					// confirming the leader session ID might be blocking,
+					leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
+				}
+			},
+			getRpcService().getExecutor());
 
-				// clear the state if we've been the leader before
-				if (getFencingToken() != null) {
-					clearState();
+		confirmationFuture.whenComplete(
+			(Void ignored, Throwable throwable) -> {
+				if (throwable != null) {
+					onFatalError(ExceptionUtils.stripCompletionException(throwable));
 				}
+			});
+	}
 
-				setFencingToken(newResourceManagerId);
+	private CompletableFuture<Boolean> tryAcceptLeadership(final UUID newLeaderSessionID) {
+		if (leaderElectionService.hasLeadership(newLeaderSessionID)) {
+			final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID);
 
-				slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
+			log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId);
 
-				getRpcService().execute(
-					() ->
-						// confirming the leader session ID might be blocking,
-						leaderElectionService.confirmLeaderSessionID(newLeaderSessionID));
-			});
+			// clear the state if we've been the leader before
+			if (getFencingToken() != null) {
+				clearStateInternal();
+			}
+
+			setFencingToken(newResourceManagerId);
+
+			slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
+
+			return prepareLeadershipAsync().thenApply(ignored -> true);
+		} else {
+			return CompletableFuture.completedFuture(false);
+		}
 	}
 
 	/**
@@ -924,7 +952,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 			() -> {
 				log.info("ResourceManager {} was revoked leadership. Clearing fencing token.", getAddress());
 
-				clearState();
+				clearStateInternal();
 
 				setFencingToken(null);
 
@@ -954,6 +982,28 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	protected abstract void initialize() throws ResourceManagerException;
 
 	/**
+	 * This method can be overridden to add a (non-blocking) initialization routine to the
+	 * ResourceManager that will be called when leadership is granted but before leadership is
+	 * confirmed.
+	 *
+	 * @return Returns a {@code CompletableFuture} that completes when the computation is finished.
+	 */
+	protected CompletableFuture<Void> prepareLeadershipAsync() {
+		return CompletableFuture.completedFuture(null);
+	}
+
+	/**
+	 * This method can be overridden to add a (non-blocking) state clearing routine to the
+	 * ResourceManager that will be called when leadership is revoked.
+	 *
+	 * @return Returns a {@code CompletableFuture} that completes when the state clearing routine
+	 * is finished.
+	 */
+	protected CompletableFuture<Void> clearStateAsync() {
+		return CompletableFuture.completedFuture(null);
+	}
+
+	/**
 	 * The framework specific code to deregister the application. This should report the
 	 * application's final status and shut down the resource manager cleanly.
 	 *


[flink] 03/05: [hotfix] Fix import order in CombiningUnilateralSortMergerITCase

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b8084f9d9708130ef283d7cb9ffe0163db54aae1
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Aug 3 09:19:13 2018 +0200

    [hotfix] Fix import order in CombiningUnilateralSortMergerITCase
---
 .../sort/CombiningUnilateralSortMergerITCase.java  | 23 +++++++++++-----------
 1 file changed, 12 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
index fa675f6..d32cad0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
@@ -18,21 +18,11 @@
 
 package org.apache.flink.runtime.operators.sort;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
 import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.util.TestLogger;
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.typeutils.base.IntComparator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
@@ -47,9 +37,20 @@ import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.Valu
 import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
 
 public class CombiningUnilateralSortMergerITCase extends TestLogger {
 	


[flink] 02/05: [hotfix] Let MemoryManager log start up values

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 928198b9e4aa8d931042af0a03c32bcb37802731
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Aug 2 15:32:42 2018 +0200

    [hotfix] Let MemoryManager log start up values
---
 .../main/java/org/apache/flink/runtime/memory/MemoryManager.java | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index f3bea87..c450880 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -176,6 +176,15 @@ public class MemoryManager {
 			default:
 				throw new IllegalArgumentException("unrecognized memory type: " + memoryType);
 		}
+
+		LOG.debug("Initialized MemoryManager with total memory size {}, number of slots {}, page size {}, " +
+				"memory type {}, pre allocate memory {} and number of non allocated pages {}.",
+			memorySize,
+			numberOfSlots,
+			pageSize,
+			memoryType,
+			preAllocateMemory,
+			numNonAllocatedPages);
 	}
 
 	// ------------------------------------------------------------------------