You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/02/07 10:15:05 UTC

[GitHub] [flink] pnowojski commented on a change in pull request #10542: [FLINK-12484][runtime] move checkpoint lock to source task

pnowojski commented on a change in pull request #10542: [FLINK-12484][runtime] move checkpoint lock to source task
URL: https://github.com/apache/flink/pull/10542#discussion_r376310911
 
 

 ##########
 File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 ##########
 @@ -706,47 +680,6 @@ private void testAsyncTimeout(
 		}
 	}
 
-	/**
-	 * Test case for FLINK-5638: Tests that the async wait operator can be closed even if the
-	 * emitter is currently waiting on the checkpoint lock (e.g. in the case of two chained async
-	 * wait operators where the latter operator's queue is currently full).
-	 *
-	 * <p>Note that this test does not enforce the exact strict ordering because with the fix it is no
-	 * longer possible. However, it provokes the described situation without the fix.
-	 */
-	@Test
-	public void testClosingWithBlockedEmitter() throws Exception {
-
-		JobVertex chainedVertex = createChainedVertex(new MyAsyncFunction(), new EmitterBlockingFunction());
-
-		final OneInputStreamTaskTestHarness<Integer, Integer> testHarness = new OneInputStreamTaskTestHarness<>(
-				OneInputStreamTask::new,
-				1, 1,
-				BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
-		testHarness.setupOutputForSingletonOperatorChain();
-
-		testHarness.taskConfig = chainedVertex.getConfiguration();
-
-		final StreamConfig streamConfig = testHarness.getStreamConfig();
-		final StreamConfig operatorChainStreamConfig = new StreamConfig(chainedVertex.getConfiguration());
-		streamConfig.setStreamOperatorFactory(
-				operatorChainStreamConfig.getStreamOperatorFactory(AsyncWaitOperatorTest.class.getClassLoader()));
-
-		testHarness.invoke();
-		testHarness.waitForTaskRunning();
-		Object checkpointLock = testHarness.getTask().getCheckpointLock();
-		EmitterBlockingFunction.setLock(checkpointLock);
-
-		testHarness.processElement(new StreamRecord<>(42, 1L));
-
-		EmitterBlockingFunction.outputLatch.await();
-		testHarness.endInput();
-		EmitterBlockingFunction.closingLatch.trigger();
-		testHarness.waitForTaskCompletion();
-
-		assertEquals(emptyList(), new ArrayList<>(testHarness.getOutput()));
-	}
 
 Review comment:
   Do we have a test for closing the `AsyncWaitOperator` when it's yielding the execution?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services