You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/03/23 18:12:08 UTC

[6/7] flink git commit: [FLINK-8958][tests] Port TaskCancelAsyncProducerConsumerITCase to flip6

[FLINK-8958][tests] Port TaskCancelAsyncProducerConsumerITCase to flip6

This closes #5722.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c56e191
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c56e191
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c56e191

Branch: refs/heads/master
Commit: 0c56e1917aa3a563a7425ba98ff33ed9bfcd22c5
Parents: 0623e24
Author: zentol <ch...@apache.org>
Authored: Mon Mar 19 15:16:18 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri Mar 23 19:11:49 2018 +0100

----------------------------------------------------------------------
 ...cyTaskCancelAsyncProducerConsumerITCase.java | 287 +++++++++++++++++++
 .../TaskCancelAsyncProducerConsumerITCase.java  |  82 +++---
 2 files changed, 329 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0c56e191/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LegacyTaskCancelAsyncProducerConsumerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LegacyTaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LegacyTaskCancelAsyncProducerConsumerITCase.java
new file mode 100644
index 0000000..ee0bfda
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LegacyTaskCancelAsyncProducerConsumerITCase.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobStatus;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.apache.flink.runtime.io.network.buffer.LocalBufferPoolDestroyTest.isInBlockingBufferRequest;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class LegacyTaskCancelAsyncProducerConsumerITCase extends TestLogger {
+
+	// The Exceptions thrown by the producer/consumer Threads
+	private static volatile Exception ASYNC_PRODUCER_EXCEPTION;
+	private static volatile Exception ASYNC_CONSUMER_EXCEPTION;
+
+	// The Threads producing/consuming the intermediate stream
+	private static volatile Thread ASYNC_PRODUCER_THREAD;
+	private static volatile Thread ASYNC_CONSUMER_THREAD;
+
+	/**
+	 * Tests that a task waiting on an async producer/consumer that is stuck
+	 * in a blocking buffer request can be properly cancelled.
+	 *
+	 * <p>This is currently required for the Flink Kafka sources, which spawn
+	 * a separate Thread consuming from Kafka and producing the intermediate
+	 * streams in the spawned Thread instead of the main task Thread.
+	 */
+	@Test
+	public void testCancelAsyncProducerAndConsumer() throws Exception {
+		Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
+		TestingCluster flink = null;
+
+		try {
+			// Cluster
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+			config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
+			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 9);
+
+			flink = new TestingCluster(config, true);
+			flink.start();
+
+			// Job with async producer and consumer
+			JobVertex producer = new JobVertex("AsyncProducer");
+			producer.setParallelism(1);
+			producer.setInvokableClass(AsyncProducer.class);
+
+			JobVertex consumer = new JobVertex("AsyncConsumer");
+			consumer.setParallelism(1);
+			consumer.setInvokableClass(AsyncConsumer.class);
+			consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+			SlotSharingGroup slot = new SlotSharingGroup(producer.getID(), consumer.getID());
+			producer.setSlotSharingGroup(slot);
+			consumer.setSlotSharingGroup(slot);
+
+			JobGraph jobGraph = new JobGraph(producer, consumer);
+
+			// Submit job and wait until running
+			ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft());
+			flink.submitJobDetached(jobGraph);
+
+			Object msg = new WaitForAllVerticesToBeRunning(jobGraph.getJobID());
+			Future<?> runningFuture = jobManager.ask(msg, deadline.timeLeft());
+			Await.ready(runningFuture, deadline.timeLeft());
+
+			// Wait for blocking requests, cancel and wait for cancellation
+			msg = new NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.CANCELED);
+			Future<?> cancelledFuture = jobManager.ask(msg, deadline.timeLeft());
+
+			boolean producerBlocked = false;
+			for (int i = 0; i < 50; i++) {
+				Thread thread = ASYNC_PRODUCER_THREAD;
+
+				if (thread != null && thread.isAlive()) {
+					StackTraceElement[] stackTrace = thread.getStackTrace();
+					producerBlocked = isInBlockingBufferRequest(stackTrace);
+				}
+
+				if (producerBlocked) {
+					break;
+				} else {
+					// Retry
+					Thread.sleep(500L);
+				}
+			}
+
+			// Verify that async producer is in blocking request
+			assertTrue("Producer thread is not blocked: " + Arrays.toString(ASYNC_PRODUCER_THREAD.getStackTrace()), producerBlocked);
+
+			boolean consumerWaiting = false;
+			for (int i = 0; i < 50; i++) {
+				Thread thread = ASYNC_CONSUMER_THREAD;
+
+				if (thread != null && thread.isAlive()) {
+					consumerWaiting = thread.getState() == Thread.State.WAITING;
+				}
+
+				if (consumerWaiting) {
+					break;
+				} else {
+					// Retry
+					Thread.sleep(500L);
+				}
+			}
+
+			// Verify that async consumer is in blocking request
+			assertTrue("Consumer thread is not blocked.", consumerWaiting);
+
+			msg = new CancelJob(jobGraph.getJobID());
+			Future<?> cancelFuture = jobManager.ask(msg, deadline.timeLeft());
+			Await.ready(cancelFuture, deadline.timeLeft());
+
+			Await.ready(cancelledFuture, deadline.timeLeft());
+
+			// Verify the expected Exceptions
+			assertNotNull(ASYNC_PRODUCER_EXCEPTION);
+			assertEquals(IllegalStateException.class, ASYNC_PRODUCER_EXCEPTION.getClass());
+
+			assertNotNull(ASYNC_CONSUMER_EXCEPTION);
+			assertEquals(IllegalStateException.class, ASYNC_CONSUMER_EXCEPTION.getClass());
+		} finally {
+			if (flink != null) {
+				flink.stop();
+			}
+		}
+	}
+
+	/**
+	 * Invokable emitting records in a separate Thread (not the main Task
+	 * thread).
+	 */
+	public static class AsyncProducer extends AbstractInvokable {
+
+		public AsyncProducer(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			Thread producer = new ProducerThread(getEnvironment().getWriter(0));
+
+			// Publish the async producer for the main test Thread
+			ASYNC_PRODUCER_THREAD = producer;
+
+			producer.start();
+
+			// Wait for the producer Thread to finish. This is executed in the
+			// main Task thread and will be interrupted on cancellation.
+			while (producer.isAlive()) {
+				try {
+					producer.join();
+				} catch (InterruptedException ignored) {
+				}
+			}
+		}
+
+		/**
+		 * The Thread emitting the records.
+		 */
+		private static class ProducerThread extends Thread {
+
+			private final RecordWriter<LongValue> recordWriter;
+
+			public ProducerThread(ResultPartitionWriter partitionWriter) {
+				this.recordWriter = new RecordWriter<>(partitionWriter);
+			}
+
+			@Override
+			public void run() {
+				LongValue current = new LongValue(0);
+
+				try {
+					while (true) {
+						current.setValue(current.getValue() + 1);
+						recordWriter.emit(current);
+						recordWriter.flushAll();
+					}
+				} catch (Exception e) {
+					ASYNC_PRODUCER_EXCEPTION = e;
+				}
+			}
+		}
+	}
+
+	/**
+	 * Invokable consuming buffers in a separate Thread (not the main Task
+	 * thread).
+	 */
+	public static class AsyncConsumer extends AbstractInvokable {
+
+		public AsyncConsumer(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			Thread consumer = new ConsumerThread(getEnvironment().getInputGate(0));
+
+			// Publish the async consumer for the main test Thread
+			ASYNC_CONSUMER_THREAD = consumer;
+
+			consumer.start();
+
+			// Wait for the consumer Thread to finish. This is executed in the
+			// main Task thread and will be interrupted on cancellation.
+			while (consumer.isAlive()) {
+				try {
+					consumer.join();
+				} catch (InterruptedException ignored) {
+				}
+			}
+		}
+
+		/**
+		 * The Thread consuming buffers.
+		 */
+		private static class ConsumerThread extends Thread {
+
+			private final InputGate inputGate;
+
+			public ConsumerThread(InputGate inputGate) {
+				this.inputGate = inputGate;
+			}
+
+			@Override
+			public void run() {
+				try {
+					while (true) {
+						inputGate.getNextBufferOrEvent();
+					}
+				} catch (Exception e) {
+					ASYNC_CONSUMER_EXCEPTION = e;
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c56e191/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
index c63af83..4b73b09 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -18,11 +18,12 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -33,28 +34,26 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobStatus;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
 import static org.apache.flink.runtime.io.network.buffer.LocalBufferPoolDestroyTest.isInBlockingBufferRequest;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+@Category(Flip6.class)
 public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
 
 	// The Exceptions thrown by the producer/consumer Threads
@@ -75,18 +74,20 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
 	 */
 	@Test
 	public void testCancelAsyncProducerAndConsumer() throws Exception {
-		Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
-		TestingCluster flink = null;
-
-		try {
-			// Cluster
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
-			config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
-			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 9);
-
-			flink = new TestingCluster(config, true);
+		Deadline deadline = Deadline.now().plus(Duration.ofMinutes(2));
+
+		// Cluster
+		Configuration config = new Configuration();
+		config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
+		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 9);
+
+		MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+			.setConfiguration(config)
+			.setNumTaskManagers(1)
+			.setNumSlotsPerTaskManager(1)
+			.build();
+
+		try (MiniCluster flink = new MiniCluster(miniClusterConfiguration)) {
 			flink.start();
 
 			// Job with async producer and consumer
@@ -106,16 +107,15 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
 			JobGraph jobGraph = new JobGraph(producer, consumer);
 
 			// Submit job and wait until running
-			ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft());
-			flink.submitJobDetached(jobGraph);
-
-			Object msg = new WaitForAllVerticesToBeRunning(jobGraph.getJobID());
-			Future<?> runningFuture = jobManager.ask(msg, deadline.timeLeft());
-			Await.ready(runningFuture, deadline.timeLeft());
+			flink.runDetached(jobGraph);
 
-			// Wait for blocking requests, cancel and wait for cancellation
-			msg = new NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.CANCELED);
-			Future<?> cancelledFuture = jobManager.ask(msg, deadline.timeLeft());
+			FutureUtils.retrySuccesfulWithDelay(
+				() -> flink.getJobStatus(jobGraph.getJobID()),
+				Time.milliseconds(10),
+				deadline,
+				status -> status == JobStatus.RUNNING,
+				TestingUtils.defaultScheduledExecutor()
+			).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
 			boolean producerBlocked = false;
 			for (int i = 0; i < 50; i++) {
@@ -156,11 +156,17 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
 			// Verify that async consumer is in blocking request
 			assertTrue("Consumer thread is not blocked.", consumerWaiting);
 
-			msg = new CancelJob(jobGraph.getJobID());
-			Future<?> cancelFuture = jobManager.ask(msg, deadline.timeLeft());
-			Await.ready(cancelFuture, deadline.timeLeft());
+			flink.cancelJob(jobGraph.getJobID())
+				.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
-			Await.ready(cancelledFuture, deadline.timeLeft());
+			// wait until the job is canceled
+			FutureUtils.retrySuccesfulWithDelay(
+				() -> flink.getJobStatus(jobGraph.getJobID()),
+				Time.milliseconds(10),
+				deadline,
+				status -> status == JobStatus.CANCELED,
+				TestingUtils.defaultScheduledExecutor()
+			).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
 			// Verify the expected Exceptions
 			assertNotNull(ASYNC_PRODUCER_EXCEPTION);
@@ -168,10 +174,6 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
 
 			assertNotNull(ASYNC_CONSUMER_EXCEPTION);
 			assertEquals(IllegalStateException.class, ASYNC_CONSUMER_EXCEPTION.getClass());
-		} finally {
-			if (flink != null) {
-				flink.stop();
-			}
 		}
 	}