You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/22 18:27:49 UTC

[GitHub] pnowojski closed pull request #6892: [FLINK-10607][network][test] Unify to remove duplicated NoOpResultPartitionConsumableNotifier

pnowojski closed pull request #6892: [FLINK-10607][network][test] Unify to remove duplicated NoOpResultPartitionConsumableNotifier
URL: https://github.com/apache/flink/pull/6892
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index f790b5f02b9..f0f1926b008 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -22,8 +22,8 @@
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -313,7 +313,7 @@ private static ResultPartition createResultPartition(
 			channels,
 			channels,
 			mock(ResultPartitionManager.class),
-			mock(ResultPartitionConsumableNotifier.class),
+			new NoOpResultPartitionConsumableNotifier(),
 			mock(IOManager.class),
 			false);
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpResultPartitionConsumableNotifier.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpResultPartitionConsumableNotifier.java
new file mode 100644
index 00000000000..c666d649077
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpResultPartitionConsumableNotifier.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+
+/**
+ * Test implementation of {@link ResultPartitionConsumableNotifier}.
+ */
+public class NoOpResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
+	@Override
+	public void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, TaskActions taskActions) {}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 2afd6d4aff9..448e989c6e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -27,6 +27,7 @@
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
@@ -105,8 +106,7 @@ public void testConcurrentConsumeMultiplePartitions() throws Exception {
 			(parallelism * producerBufferPoolSize) + (parallelism * parallelism),
 			TestBufferFactory.BUFFER_SIZE);
 
-		final ResultPartitionConsumableNotifier partitionConsumableNotifier =
-			mock(ResultPartitionConsumableNotifier.class);
+		final ResultPartitionConsumableNotifier partitionConsumableNotifier = new NoOpResultPartitionConsumableNotifier();
 
 		final TaskActions taskActions = mock(TaskActions.class);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 02d7a1da5a2..5ceb8236691 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -55,7 +55,7 @@
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
@@ -714,7 +714,7 @@ public void testTaskSubmission() throws Exception {
 			mock(TaskManagerActions.class),
 			mock(CheckpointResponder.class),
 			libraryCacheManager,
-			mock(ResultPartitionConsumableNotifier.class),
+			new NoOpResultPartitionConsumableNotifier(),
 			mock(PartitionProducerStateChecker.class));
 
 		final JobManagerTable jobManagerTable = new JobManagerTable();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 3edf5f11975..a05a2e0e580 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -43,6 +43,7 @@
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
+import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -216,7 +217,7 @@ private Task createTask(Class<? extends AbstractInvokable> invokableClass) throw
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(new TestUserCodeClassLoader());
 
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
-		ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
+		ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
 		PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
 		Executor executor = mock(Executor.class);
 		TaskEventDispatcher taskEventDispatcher = mock(TaskEventDispatcher.class);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 49e995e8d81..a354e2cc049 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -47,6 +47,7 @@
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
+import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
@@ -250,7 +251,7 @@ public void testExecutionFailsInBlobsMissing() throws Exception {
 	public void testExecutionFailsInNetworkRegistration() throws Exception {
 		// mock a network manager that rejects registration
 		final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
-		final ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
+		final ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
 		final PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
 		final TaskEventDispatcher taskEventDispatcher = mock(TaskEventDispatcher.class);
 
@@ -571,7 +572,7 @@ public void testTriggerPartitionStateUpdate() throws Exception {
 		final PartitionProducerStateChecker partitionChecker = mock(PartitionProducerStateChecker.class);
 		final TaskEventDispatcher taskEventDispatcher = mock(TaskEventDispatcher.class);
 
-		final ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
+		final ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
 		final NetworkEnvironment network = mock(NetworkEnvironment.class);
 		when(network.getResultPartitionManager()).thenReturn(mock(ResultPartitionManager.class));
 		when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
@@ -941,7 +942,7 @@ private void setState(Task task, ExecutionState state) {
 			libraryCacheManager = mock(LibraryCacheManager.class);
 			when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
 
-			consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
+			consumableNotifier = new NoOpResultPartitionConsumableNotifier();
 			partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
 
 			final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index c826e774ec2..88aff103790 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -46,7 +46,7 @@
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -64,9 +64,6 @@
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
 import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.runtime.taskmanager.TaskActions;
-import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.testutils.TestJvmProcess;
 import org.apache.flink.util.OperatingSystem;
@@ -269,12 +266,6 @@ public void acknowledgeCheckpoint(JobID j, ExecutionAttemptID e, long i, Checkpo
 			public void declineCheckpoint(JobID j, ExecutionAttemptID e, long l, Throwable t) {}
 		}
 
-		private static final class NoOpResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
-
-			@Override
-			public void notifyPartitionConsumable(JobID j, ResultPartitionID p, TaskActions t) {}
-		}
-
 		private static final class NoOpPartitionProducerStateChecker implements PartitionProducerStateChecker {
 
 			@Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 1b769c80f0a..941b9372d1f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -39,8 +39,8 @@
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
+import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -316,10 +316,4 @@ public void triggerPartitionProducerStateCheck(
 		@Override
 		public void failExternally(Throwable cause) {}
 	}
-
-	private static final class NoOpResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
-
-		@Override
-		public void notifyPartitionConsumable(JobID j, ResultPartitionID p, TaskActions t) {}
-	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index beed46df399..d3ef2b286b3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -46,7 +46,7 @@
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -288,7 +288,7 @@ private static Task createTask(
 				blobService.getPermanentBlobService()),
 			new TestingTaskManagerRuntimeInfo(),
 			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
-			mock(ResultPartitionConsumableNotifier.class),
+			new NoOpResultPartitionConsumableNotifier(),
 			mock(PartitionProducerStateChecker.class),
 			mock(Executor.class));
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 09276986c58..95227a6a4b3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -45,7 +45,7 @@
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -175,7 +175,7 @@ public void testConcurrentAsyncCheckpointCannotFailFinishedStreamTask() throws E
 			mock(FileCache.class),
 			taskManagerRuntimeInfo,
 			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
-			mock(ResultPartitionConsumableNotifier.class),
+			new NoOpResultPartitionConsumableNotifier(),
 			mock(PartitionProducerStateChecker.class),
 			Executors.directExecutor());
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index bc145799fb3..a593bd5e004 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -52,6 +52,7 @@
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
+import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -892,7 +893,7 @@ public static Task createTask(
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(StreamTaskTest.class.getClassLoader());
 
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
-		ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
+		ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
 		PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
 		Executor executor = mock(Executor.class);
 		TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index cd8a4fafd9a..51c15afeca9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -47,7 +47,7 @@
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -254,7 +254,7 @@ private static Task createTask(
 					blobService.getPermanentBlobService()),
 				new TestingTaskManagerRuntimeInfo(),
 				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
-				mock(ResultPartitionConsumableNotifier.class),
+				new NoOpResultPartitionConsumableNotifier(),
 				mock(PartitionProducerStateChecker.class),
 				Executors.directExecutor());
 	}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services