You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/07/05 08:40:23 UTC
[flink] 03/04: [hotfix][tests] Remove PowerMocking from
StreamTaskTest
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e64071c21e8637909fef7761426e2e5e5bcf0285
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Jul 3 14:24:03 2019 +0200
[hotfix][tests] Remove PowerMocking from StreamTaskTest
---
.../streaming/runtime/tasks/StreamTaskTest.java | 32 +++++++++++++---------
1 file changed, 19 insertions(+), 13 deletions(-)
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 189dd3d..80f47a6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -110,6 +110,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.util.CloseableIterable;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
@@ -118,13 +119,9 @@ import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
-import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
import java.io.Closeable;
import java.io.IOException;
@@ -170,9 +167,6 @@ import static org.mockito.Mockito.when;
/**
* Tests for {@link StreamTask}.
*/
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(StreamTask.class)
-@PowerMockIgnore("org.apache.log4j.*")
@SuppressWarnings("deprecation")
public class StreamTaskTest extends TestLogger {
@@ -649,6 +643,7 @@ public class StreamTaskTest extends TestLogger {
*/
@Test
public void testOperatorClosingBeforeStopRunning() throws Throwable {
+ BlockingCloseStreamOperator.resetLatches();
Configuration taskConfiguration = new Configuration();
StreamConfig streamConfig = new StreamConfig(taskConfiguration);
streamConfig.setStreamOperator(new BlockingCloseStreamOperator());
@@ -665,13 +660,13 @@ public class StreamTaskTest extends TestLogger {
RunningTask<StreamTask<Void, BlockingCloseStreamOperator>> task = runTask(() -> new NoOpStreamTask<>(mockEnvironment));
- BlockingCloseStreamOperator.IN_CLOSE.await();
+ BlockingCloseStreamOperator.inClose.await();
// check that the StreamTask is not yet in isRunning == false
assertTrue(task.streamTask.isRunning());
// let the operator finish its close operation
- BlockingCloseStreamOperator.FINISH_CLOSE.trigger();
+ BlockingCloseStreamOperator.finishClose.trigger();
task.waitForTaskCompletion(false);
@@ -804,15 +799,26 @@ public class StreamTaskTest extends TestLogger {
private static class BlockingCloseStreamOperator extends AbstractStreamOperator<Void> {
private static final long serialVersionUID = -9042150529568008847L;
- public static final OneShotLatch IN_CLOSE = new OneShotLatch();
- public static final OneShotLatch FINISH_CLOSE = new OneShotLatch();
+ private static volatile OneShotLatch inClose;
+ private static volatile OneShotLatch finishClose;
@Override
public void close() throws Exception {
- IN_CLOSE.trigger();
- FINISH_CLOSE.await();
+ checkLatches();
+ inClose.trigger();
+ finishClose.await();
super.close();
}
+
+ private void checkLatches() {
+ Preconditions.checkNotNull(inClose);
+ Preconditions.checkNotNull(finishClose);
+ }
+
+ private static void resetLatches() {
+ inClose = new OneShotLatch();
+ finishClose = new OneShotLatch();
+ }
}
public static Task createTask(