You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/04 11:04:47 UTC
flink git commit: [hotfix] [tests] Simplify mocking of the
ResultPartitionWriter
Repository: flink
Updated Branches:
refs/heads/master cd610a99e -> 67c2868b3
[hotfix] [tests] Simplify mocking of the ResultPartitionWriter
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/67c2868b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/67c2868b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/67c2868b
Branch: refs/heads/master
Commit: 67c2868b33a5ed8430b38ac32c65444315b0658b
Parents: cd610a9
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 3 20:18:43 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Nov 4 12:04:22 2016 +0100
----------------------------------------------------------------------
.../streaming/state/RocksDBAsyncSnapshotTest.java | 8 +++++---
.../io/network/api/writer/ResultPartitionWriter.java | 2 +-
.../api/streamtask/StreamIterationHeadTest.java | 10 +---------
.../runtime/operators/StreamTaskTimerTest.java | 11 +----------
.../operators/TestProcessingTimeServiceTest.java | 11 +----------
.../streaming/runtime/tasks/OneInputStreamTaskTest.java | 12 +++---------
.../runtime/tasks/OneInputStreamTaskTestHarness.java | 4 ----
.../streaming/runtime/tasks/SourceStreamTaskTest.java | 10 +---------
.../streaming/runtime/tasks/StreamTaskTestHarness.java | 2 +-
.../streaming/runtime/tasks/TwoInputStreamTaskTest.java | 11 ++---------
.../runtime/tasks/TwoInputStreamTaskTestHarness.java | 4 ----
11 files changed, 16 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/67c2868b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 09979b8..1c5a91c 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.VoidNamespace;
@@ -49,15 +48,18 @@ import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.OperatingSystem;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
+
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
+
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -79,8 +81,8 @@ import static org.junit.Assert.assertNotNull;
* Tests for asynchronous RocksDB Key/Value state checkpoints.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({ResultPartitionWriter.class, FileSystem.class})
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
+@PrepareForTest({FileSystem.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*", "org.apache.log4j.*"})
@SuppressWarnings("serial")
public class RocksDBAsyncSnapshotTest {
http://git-wip-us.apache.org/repos/asf/flink/blob/67c2868b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 79c21c6..cfab34d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -37,7 +37,7 @@ import java.io.IOException;
* The {@link ResultPartitionWriter} is the runtime API for producing results. It
* supports two kinds of data to be sent: buffers and events.
*/
-public final class ResultPartitionWriter implements EventListener<TaskEvent> {
+public class ResultPartitionWriter implements EventListener<TaskEvent> {
private final ResultPartition partition;
http://git-wip-us.apache.org/repos/asf/flink/blob/67c2868b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
index a047ed4..36cf53a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
@@ -18,21 +18,14 @@
package org.apache.flink.streaming.api.streamtask;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
+
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
import static org.junit.Assert.*;
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ ResultPartitionWriter.class })
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class StreamIterationHeadTest {
@Test
@@ -49,5 +42,4 @@ public class StreamIterationHeadTest {
assertEquals(1, harness.getOutput().size());
assertEquals(new Watermark(Long.MAX_VALUE), harness.getOutput().peek());
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/67c2868b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index f23c6d2..e0e0e91 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -20,21 +20,15 @@ package org.apache.flink.streaming.runtime.operators;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.junit.Test;
import java.util.concurrent.atomic.AtomicReference;
@@ -43,9 +37,6 @@ import static org.junit.Assert.*;
/**
* Tests for the timer service of {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
*/
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(ResultPartitionWriter.class)
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
@SuppressWarnings("serial")
public class StreamTaskTimerTest {
http://git-wip-us.apache.org/repos/asf/flink/blob/67c2868b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
index a3b231b..cd1f253 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
@@ -19,29 +19,20 @@
package org.apache.flink.streaming.runtime.operators;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
-
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.junit.Test;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ResultPartitionWriter.class})
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class TestProcessingTimeServiceTest {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/67c2868b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index d31990a..42d7cec 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -15,8 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.runtime.tasks;
+package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
@@ -33,7 +33,6 @@ import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -51,12 +50,10 @@ import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
+
import org.junit.Assert;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
@@ -86,9 +83,6 @@ import static org.junit.Assert.fail;
* used as a representative to test OneInputStreamTask, since OneInputStreamTask is used for all
* OneInputStreamOperators.
*/
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ResultPartitionWriter.class})
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class OneInputStreamTaskTest extends TestLogger {
private static final ListStateDescriptor<Integer> TEST_DESCRIPTOR =
http://git-wip-us.apache.org/repos/asf/flink/blob/67c2868b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
index 3cf055e..3126d71 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
@@ -48,10 +48,6 @@ import java.io.IOException;
* of the Task read from this queue. Use {@link #waitForInputProcessing()} to wait until all
* queues are empty. This must be used after entering some elements before checking the
* desired output.
- *
- * <p>
- * When using this you need to add the following line to your test class to setup Powermock:
- * {@code {@literal @}PrepareForTest({ResultPartitionWriter.class})}
*/
public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarness<OUT> {
http://git-wip-us.apache.org/repos/asf/flink/blob/67c2868b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 21c894e..b592fe8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -23,19 +23,15 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.util.TestHarnessUtil;
+
import org.junit.Assert;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
import java.io.Serializable;
import java.util.List;
@@ -51,12 +47,8 @@ import java.util.concurrent.atomic.AtomicLong;
* These tests verify that the RichFunction methods are called (in correct order). And that
* checkpointing/element emission don't occur concurrently.
*/
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ResultPartitionWriter.class})
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*", "org.apache.log4j.*" })
public class SourceStreamTaskTest {
-
/**
* This test verifies that open() and close() are correctly called by the StreamTask.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/67c2868b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index ab7bf69..c531c0d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -117,7 +117,7 @@ public class StreamTaskTestHarness<OUT> {
if (!(task instanceof StreamTask)) {
throw new UnsupportedOperationException("getProcessingTimeService() only supported on StreamTasks.");
}
- return ((StreamTask) task).getProcessingTimeService();
+ return ((StreamTask<?, ?>) task).getProcessingTimeService();
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/67c2868b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 1bb3fb0..5aca7d6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -15,13 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.runtime.tasks;
+package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -29,12 +28,9 @@ import org.apache.flink.streaming.api.operators.co.CoStreamMap;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.TestHarnessUtil;
+
import org.junit.Assert;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -49,9 +45,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
* used as a representative to test TwoInputStreamTask, since TwoInputStreamTask is used for all
* TwoInputStreamOperators.
*/
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ResultPartitionWriter.class})
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class TwoInputStreamTaskTest {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/67c2868b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
index 0e7565e..edb1642 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
@@ -54,10 +54,6 @@ import java.util.List;
* of the Task read from this queue. Use {@link #waitForInputProcessing()} to wait until all
* queues are empty. This must be used after entering some elements before checking the
* desired output.
- *
- * <p>
- * When using this you need to add the following line to your test class to setup Powermock:
- * {@code {@literal @}PrepareForTest({ResultPartitionWriter.class})}
*/
public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTestHarness<OUT> {