You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/02/05 20:57:47 UTC

[2/2] flink git commit: [FLINK-5652] [asyncIO] Cancel timers when completing a StreamRecordQueueEntry

[FLINK-5652] [asyncIO] Cancel timers when completing a StreamRecordQueueEntry

Whenever a StreamRecordQueueEntry has been completed we no longer need the registered timeout.
Therefore, we have to cancel the corresponding ScheduledFuture so that the system knows that
it can remove the associated TriggerTask. This is important since the TriggerTask contains a
reference on the StreamRecordQueueEntry. Consequently, such a task will prevent the
StreamRecordQueueEntry from being garbage collected.

This closes #3264.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/215776b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/215776b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/215776b8

Branch: refs/heads/master
Commit: 215776b81a52cd380e8ccabd65da612f77da25e6
Parents: 43d2fd2
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Feb 3 16:02:55 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Feb 5 21:57:01 2017 +0100

----------------------------------------------------------------------
 .../api/operators/async/AsyncWaitOperator.java  | 13 ++-
 .../operators/async/AsyncWaitOperatorTest.java  | 86 ++++++++++++++++++++
 2 files changed, 98 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/215776b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index 6793620..a70d825 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
@@ -50,6 +51,7 @@ import org.apache.flink.util.Preconditions;
 import java.util.Collection;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -203,7 +205,7 @@ public class AsyncWaitOperator<IN, OUT>
 			// register a timeout for this AsyncStreamRecordBufferEntry
 			long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();
 
-			getProcessingTimeService().registerTimer(
+			final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(
 				timeoutTimestamp,
 				new ProcessingTimeCallback() {
 					@Override
@@ -212,6 +214,15 @@ public class AsyncWaitOperator<IN, OUT>
 							new TimeoutException("Async function call has timed out."));
 					}
 				});
+
+			// Cancel the timer once we've completed the stream record buffer entry. This will remove
+			// the register trigger task
+			streamRecordBufferEntry.onComplete(new AcceptFunction<StreamElementQueueEntry<Collection<OUT>>>() {
+				@Override
+				public void accept(StreamElementQueueEntry<Collection<OUT>> value) {
+					timerFuture.cancel(true);
+				}
+			}, executor);
 		}
 
 		addAsyncBufferEntry(streamRecordBufferEntry);

http://git-wip-us.apache.org/repos/asf/flink/blob/215776b8/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 4558e06..c2b0803 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -49,10 +49,13 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
 import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueEntry;
+import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -76,12 +79,15 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -801,4 +807,84 @@ public class AsyncWaitOperatorTest extends TestLogger {
 			super.close();
 		}
 	}
+
+	/**
+	 * FLINK-5652
+	 * Tests that registered timers are properly canceled upon completion of a
+	 * {@link StreamRecordQueueEntry} in order to avoid resource leaks because TriggerTasks hold
+	 * a reference on the StreamRecordQueueEntry.
+	 */
+	@Test
+	public void testTimeoutCleanup() throws Exception {
+		final Object lock = new Object();
+
+		final long timeout = 100000L;
+		final long timestamp = 1L;
+
+		Environment environment = mock(Environment.class);
+		when(environment.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup());
+		when(environment.getTaskManagerInfo()).thenReturn(new TestingTaskManagerRuntimeInfo());
+		when(environment.getUserClassLoader()).thenReturn(getClass().getClassLoader());
+		when(environment.getTaskInfo()).thenReturn(new TaskInfo(
+			"testTask",
+			1,
+			0,
+			1,
+			0));
+
+		ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class);
+
+		ProcessingTimeService processingTimeService = mock(ProcessingTimeService.class);
+		when(processingTimeService.getCurrentProcessingTime()).thenReturn(timestamp);
+		doReturn(scheduledFuture).when(processingTimeService).registerTimer(anyLong(), any(ProcessingTimeCallback.class));
+
+		StreamTask<?, ?> containingTask = mock(StreamTask.class);
+		when(containingTask.getEnvironment()).thenReturn(environment);
+		when(containingTask.getCheckpointLock()).thenReturn(lock);
+		when(containingTask.getProcessingTimeService()).thenReturn(processingTimeService);
+
+		StreamConfig streamConfig = mock(StreamConfig.class);
+		doReturn(IntSerializer.INSTANCE).when(streamConfig).getTypeSerializerIn1(any(ClassLoader.class));
+
+		Output<StreamRecord<Integer>> output = mock(Output.class);
+
+		AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(
+			new AsyncFunction<Integer, Integer>() {
+				private static final long serialVersionUID = -3718276118074877073L;
+
+				@Override
+				public void asyncInvoke(Integer input, AsyncCollector<Integer> collector) throws Exception {
+					collector.collect(Collections.singletonList(input));
+				}
+			},
+			timeout,
+			1,
+			AsyncDataStream.OutputMode.UNORDERED);
+
+		operator.setup(
+			containingTask,
+			streamConfig,
+			output);
+
+		operator.open();
+
+		final StreamRecord<Integer> streamRecord = new StreamRecord<>(42, timestamp);
+
+		synchronized (lock) {
+			// processing an element will register a timeout
+			operator.processElement(streamRecord);
+		}
+
+		synchronized (lock) {
+			// closing the operator waits until all inputs have been processed
+			operator.close();
+		}
+
+		// check that we actually outputted the result of the single input
+		verify(output).collect(eq(streamRecord));
+		verify(processingTimeService).registerTimer(eq(processingTimeService.getCurrentProcessingTime() + timeout), any(ProcessingTimeCallback.class));
+
+		// check that we have cancelled our registered timeout
+		verify(scheduledFuture).cancel(eq(true));
+	}
 }