You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/25 07:33:34 UTC

[04/10] flink git commit: [FLINK-8967][tests] Port NetworkStackThroughputITCase to flip6

[FLINK-8967][tests] Port NetworkStackThroughputITCase to flip6

This closes #5870.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f619f22d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f619f22d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f619f22d

Branch: refs/heads/release-1.5
Commit: f619f22dc9febb5d94152c165d0a5bb8cb70049e
Parents: 4af40d7
Author: zentol <ch...@apache.org>
Authored: Tue Apr 17 15:24:22 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 25 09:33:18 2018 +0200

----------------------------------------------------------------------
 .../runtime/NetworkStackThroughputITCase.java   | 40 +++++++++++---------
 1 file changed, 23 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f619f22d/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index e6401c0..3b93ca2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -19,10 +19,11 @@
 package org.apache.flink.test.runtime;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.reader.RecordReader;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
@@ -32,9 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Ignore;
@@ -42,7 +41,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -234,43 +232,51 @@ public class NetworkStackThroughputITCase extends TestLogger {
 
 			final int numTaskManagers = parallelism / numSlotsPerTaskManager;
 
-			final LocalFlinkMiniCluster localFlinkMiniCluster = TestBaseUtils.startCluster(
-				numTaskManagers,
-				numSlotsPerTaskManager,
-				false,
-				false,
-				true);
+			final MiniClusterResource cluster = new MiniClusterResource(
+				new MiniClusterResource.MiniClusterResourceConfiguration(
+					new Configuration(),
+					numTaskManagers,
+					numSlotsPerTaskManager
+				),
+				true
+			);
+			cluster.before();
 
 			try {
-				System.out.println(Arrays.toString(p));
+				System.out.println(String.format("Running test with parameters: dataVolumeGB=%s, useForwarder=%s, isSlowSender=%s, isSlowReceiver=%s, parallelism=%s, numSlotsPerTM=%s",
+					dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, parallelism, numSlotsPerTaskManager));
 				testProgram(
-					localFlinkMiniCluster,
+					cluster,
 					dataVolumeGb,
 					useForwarder,
 					isSlowSender,
 					isSlowReceiver,
 					parallelism);
 			} finally {
-				TestBaseUtils.stopCluster(localFlinkMiniCluster, FutureUtils.toFiniteDuration(TestingUtils.TIMEOUT()));
+				cluster.after();
 			}
 		}
 	}
 
 	private void testProgram(
-			LocalFlinkMiniCluster localFlinkMiniCluster,
+			final MiniClusterResource cluster,
 			final int dataVolumeGb,
 			final boolean useForwarder,
 			final boolean isSlowSender,
 			final boolean isSlowReceiver,
 			final int parallelism) throws Exception {
-		JobExecutionResult jer = localFlinkMiniCluster.submitJobAndWait(
+		ClusterClient<?> client = cluster.getClusterClient();
+		client.setDetached(false);
+		client.setPrintStatusDuringExecution(false);
+
+		JobExecutionResult jer = (JobExecutionResult) client.submitJob(
 			createJobGraph(
 				dataVolumeGb,
 				useForwarder,
 				isSlowSender,
 				isSlowReceiver,
 				parallelism),
-			false);
+			getClass().getClassLoader());
 
 		long dataVolumeMbit = dataVolumeGb * 8192;
 		long runtimeSecs = jer.getNetRuntime(TimeUnit.SECONDS);