You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/25 07:33:34 UTC
[04/10] flink git commit: [FLINK-8967][tests] Port
NetworkStackThroughputITCase to flip6
[FLINK-8967][tests] Port NetworkStackThroughputITCase to flip6
This closes #5870.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f619f22d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f619f22d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f619f22d
Branch: refs/heads/release-1.5
Commit: f619f22dc9febb5d94152c165d0a5bb8cb70049e
Parents: 4af40d7
Author: zentol <ch...@apache.org>
Authored: Tue Apr 17 15:24:22 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 25 09:33:18 2018 +0200
----------------------------------------------------------------------
.../runtime/NetworkStackThroughputITCase.java | 40 +++++++++++---------
1 file changed, 23 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f619f22d/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index e6401c0..3b93ca2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -19,10 +19,11 @@
package org.apache.flink.test.runtime;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
@@ -32,9 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.TestLogger;
import org.junit.Ignore;
@@ -42,7 +41,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Arrays;
import java.util.concurrent.TimeUnit;
/**
@@ -234,43 +232,51 @@ public class NetworkStackThroughputITCase extends TestLogger {
final int numTaskManagers = parallelism / numSlotsPerTaskManager;
- final LocalFlinkMiniCluster localFlinkMiniCluster = TestBaseUtils.startCluster(
- numTaskManagers,
- numSlotsPerTaskManager,
- false,
- false,
- true);
+ final MiniClusterResource cluster = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ new Configuration(),
+ numTaskManagers,
+ numSlotsPerTaskManager
+ ),
+ true
+ );
+ cluster.before();
try {
- System.out.println(Arrays.toString(p));
+ System.out.println(String.format("Running test with parameters: dataVolumeGB=%s, useForwarder=%s, isSlowSender=%s, isSlowReceiver=%s, parallelism=%s, numSlotsPerTM=%s",
+ dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, parallelism, numSlotsPerTaskManager));
testProgram(
- localFlinkMiniCluster,
+ cluster,
dataVolumeGb,
useForwarder,
isSlowSender,
isSlowReceiver,
parallelism);
} finally {
- TestBaseUtils.stopCluster(localFlinkMiniCluster, FutureUtils.toFiniteDuration(TestingUtils.TIMEOUT()));
+ cluster.after();
}
}
}
private void testProgram(
- LocalFlinkMiniCluster localFlinkMiniCluster,
+ final MiniClusterResource cluster,
final int dataVolumeGb,
final boolean useForwarder,
final boolean isSlowSender,
final boolean isSlowReceiver,
final int parallelism) throws Exception {
- JobExecutionResult jer = localFlinkMiniCluster.submitJobAndWait(
+ ClusterClient<?> client = cluster.getClusterClient();
+ client.setDetached(false);
+ client.setPrintStatusDuringExecution(false);
+
+ JobExecutionResult jer = (JobExecutionResult) client.submitJob(
createJobGraph(
dataVolumeGb,
useForwarder,
isSlowSender,
isSlowReceiver,
parallelism),
- false);
+ getClass().getClassLoader());
long dataVolumeMbit = dataVolumeGb * 8192;
long runtimeSecs = jer.getNetRuntime(TimeUnit.SECONDS);