You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/07/05 08:40:23 UTC

[flink] 03/04: [hotfix][tests] Remove PowerMocking from StreamTaskTest

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e64071c21e8637909fef7761426e2e5e5bcf0285
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Jul 3 14:24:03 2019 +0200

    [hotfix][tests] Remove PowerMocking from StreamTaskTest
---
 .../streaming/runtime/tasks/StreamTaskTest.java    | 32 +++++++++++++---------
 1 file changed, 19 insertions(+), 13 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 189dd3d..80f47a6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -110,6 +110,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.util.CloseableIterable;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.SupplierWithException;
@@ -118,13 +119,9 @@ import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
-import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -170,9 +167,6 @@ import static org.mockito.Mockito.when;
 /**
  * Tests for {@link StreamTask}.
  */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(StreamTask.class)
-@PowerMockIgnore("org.apache.log4j.*")
 @SuppressWarnings("deprecation")
 public class StreamTaskTest extends TestLogger {
 
@@ -649,6 +643,7 @@ public class StreamTaskTest extends TestLogger {
 	 */
 	@Test
 	public void testOperatorClosingBeforeStopRunning() throws Throwable {
+		BlockingCloseStreamOperator.resetLatches();
 		Configuration taskConfiguration = new Configuration();
 		StreamConfig streamConfig = new StreamConfig(taskConfiguration);
 		streamConfig.setStreamOperator(new BlockingCloseStreamOperator());
@@ -665,13 +660,13 @@ public class StreamTaskTest extends TestLogger {
 
 			RunningTask<StreamTask<Void, BlockingCloseStreamOperator>> task = runTask(() -> new NoOpStreamTask<>(mockEnvironment));
 
-			BlockingCloseStreamOperator.IN_CLOSE.await();
+			BlockingCloseStreamOperator.inClose.await();
 
 			// check that the StreamTask is not yet in isRunning == false
 			assertTrue(task.streamTask.isRunning());
 
 			// let the operator finish its close operation
-			BlockingCloseStreamOperator.FINISH_CLOSE.trigger();
+			BlockingCloseStreamOperator.finishClose.trigger();
 
 			task.waitForTaskCompletion(false);
 
@@ -804,15 +799,26 @@ public class StreamTaskTest extends TestLogger {
 	private static class BlockingCloseStreamOperator extends AbstractStreamOperator<Void> {
 		private static final long serialVersionUID = -9042150529568008847L;
 
-		public static final OneShotLatch IN_CLOSE = new OneShotLatch();
-		public static final OneShotLatch FINISH_CLOSE = new OneShotLatch();
+		private static volatile OneShotLatch inClose;
+		private static volatile OneShotLatch finishClose;
 
 		@Override
 		public void close() throws Exception {
-			IN_CLOSE.trigger();
-			FINISH_CLOSE.await();
+			checkLatches();
+			inClose.trigger();
+			finishClose.await();
 			super.close();
 		}
+
+		private void checkLatches() {
+			Preconditions.checkNotNull(inClose);
+			Preconditions.checkNotNull(finishClose);
+		}
+
+		private static void resetLatches() {
+			inClose = new OneShotLatch();
+			finishClose = new OneShotLatch();
+		}
 	}
 
 	public static Task createTask(