You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/02/05 20:59:45 UTC

flink git commit: [FLINK-5652] [asyncIO] Cancel timers when completing a StreamRecordQueueEntry

Repository: flink
Updated Branches:
  refs/heads/release-1.2 28eea24e4 -> 36c7de1ae


[FLINK-5652] [asyncIO] Cancel timers when completing a StreamRecordQueueEntry

Whenever a StreamRecordQueueEntry has been completed we no longer need the registered timeout.
Therefore, we have to cancel the corresponding ScheduledFuture so that the system knows that
it can remove the associated TriggerTask. This is important since the TriggerTask contains a
reference on the StreamRecordQueueEntry. Consequently, such a task will prevent the
StreamRecordQueueEntry from being garbage collected.

This closes #3264.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36c7de1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36c7de1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36c7de1a

Branch: refs/heads/release-1.2
Commit: 36c7de1aef7b349b9d66c9d92398f50ebec9d186
Parents: 28eea24
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Feb 3 16:02:55 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Feb 5 21:59:18 2017 +0100

----------------------------------------------------------------------
 .../api/operators/async/AsyncWaitOperator.java  | 13 ++-
 .../operators/async/AsyncWaitOperatorTest.java  | 86 ++++++++++++++++++++
 2 files changed, 98 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/36c7de1a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index 6793620..a70d825 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
@@ -50,6 +51,7 @@ import org.apache.flink.util.Preconditions;
 import java.util.Collection;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -203,7 +205,7 @@ public class AsyncWaitOperator<IN, OUT>
 			// register a timeout for this AsyncStreamRecordBufferEntry
 			long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();
 
-			getProcessingTimeService().registerTimer(
+			final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(
 				timeoutTimestamp,
 				new ProcessingTimeCallback() {
 					@Override
@@ -212,6 +214,15 @@ public class AsyncWaitOperator<IN, OUT>
 							new TimeoutException("Async function call has timed out."));
 					}
 				});
+
+			// Cancel the timer once we've completed the stream record buffer entry. This will remove
+			// the register trigger task
+			streamRecordBufferEntry.onComplete(new AcceptFunction<StreamElementQueueEntry<Collection<OUT>>>() {
+				@Override
+				public void accept(StreamElementQueueEntry<Collection<OUT>> value) {
+					timerFuture.cancel(true);
+				}
+			}, executor);
 		}
 
 		addAsyncBufferEntry(streamRecordBufferEntry);

http://git-wip-us.apache.org/repos/asf/flink/blob/36c7de1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 34a4c56..15715da 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -48,10 +48,13 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
 import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueEntry;
+import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -75,12 +78,15 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -805,4 +811,84 @@ public class AsyncWaitOperatorTest extends TestLogger {
 			super.close();
 		}
 	}
+
+	/**
+	 * FLINK-5652
+	 * Tests that registered timers are properly canceled upon completion of a
+	 * {@link StreamRecordQueueEntry} in order to avoid resource leaks because TriggerTasks hold
+	 * a reference on the StreamRecordQueueEntry.
+	 */
+	@Test
+	public void testTimeoutCleanup() throws Exception {
+		final Object lock = new Object();
+
+		final long timeout = 100000L;
+		final long timestamp = 1L;
+
+		Environment environment = mock(Environment.class);
+		when(environment.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup());
+		when(environment.getTaskManagerInfo()).thenReturn(new TestingTaskManagerRuntimeInfo());
+		when(environment.getUserClassLoader()).thenReturn(getClass().getClassLoader());
+		when(environment.getTaskInfo()).thenReturn(new TaskInfo(
+			"testTask",
+			1,
+			0,
+			1,
+			0));
+
+		ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class);
+
+		ProcessingTimeService processingTimeService = mock(ProcessingTimeService.class);
+		when(processingTimeService.getCurrentProcessingTime()).thenReturn(timestamp);
+		doReturn(scheduledFuture).when(processingTimeService).registerTimer(anyLong(), any(ProcessingTimeCallback.class));
+
+		StreamTask<?, ?> containingTask = mock(StreamTask.class);
+		when(containingTask.getEnvironment()).thenReturn(environment);
+		when(containingTask.getCheckpointLock()).thenReturn(lock);
+		when(containingTask.getProcessingTimeService()).thenReturn(processingTimeService);
+
+		StreamConfig streamConfig = mock(StreamConfig.class);
+		doReturn(IntSerializer.INSTANCE).when(streamConfig).getTypeSerializerIn1(any(ClassLoader.class));
+
+		Output<StreamRecord<Integer>> output = mock(Output.class);
+
+		AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(
+			new AsyncFunction<Integer, Integer>() {
+				private static final long serialVersionUID = -3718276118074877073L;
+
+				@Override
+				public void asyncInvoke(Integer input, AsyncCollector<Integer> collector) throws Exception {
+					collector.collect(Collections.singletonList(input));
+				}
+			},
+			timeout,
+			1,
+			AsyncDataStream.OutputMode.UNORDERED);
+
+		operator.setup(
+			containingTask,
+			streamConfig,
+			output);
+
+		operator.open();
+
+		final StreamRecord<Integer> streamRecord = new StreamRecord<>(42, timestamp);
+
+		synchronized (lock) {
+			// processing an element will register a timeout
+			operator.processElement(streamRecord);
+		}
+
+		synchronized (lock) {
+			// closing the operator waits until all inputs have been processed
+			operator.close();
+		}
+
+		// check that we actually outputted the result of the single input
+		verify(output).collect(eq(streamRecord));
+		verify(processingTimeService).registerTimer(eq(processingTimeService.getCurrentProcessingTime() + timeout), any(ProcessingTimeCallback.class));
+
+		// check that we have cancelled our registered timeout
+		verify(scheduledFuture).cancel(eq(true));
+	}
 }