You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/11/12 15:26:08 UTC
[flink] branch release-1.14 updated: [FLINK-24552][tests] Moved
randomization of buffer debloat from StreamEnvironment to
MiniClusterResource
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new e94f4de [FLINK-24552][tests] Moved randomization of buffer debloat from StreamEnvironment to MiniClusterResource
e94f4de is described below
commit e94f4de24b47c8b231ceb209b096e24dcfe2524e
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Wed Oct 27 17:48:19 2021 +0200
[FLINK-24552][tests] Moved randomization of buffer debloat from StreamEnvironment to MiniClusterResource
The reason we moved it is that it is not possible to configure it via StreamEnvironment.
This closes #17581
---
.../runtime/testutils/MiniClusterResource.java | 25 +++++++-
.../streaming/util/TestStreamEnvironment.java | 70 ++++++++++++----------
2 files changed, 63 insertions(+), 32 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
index d61a666..c6799f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
@@ -45,8 +45,12 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static org.apache.flink.runtime.testutils.PseudoRandomValueSelector.randomize;
+
/** Resource which starts a {@link MiniCluster} for testing purposes. */
public class MiniClusterResource extends ExternalResource {
+ private static final boolean RANDOMIZE_BUFFER_DEBLOAT_CONFIG =
+ Boolean.parseBoolean(System.getProperty("buffer-debloat.randomization", "false"));
private static final MemorySize DEFAULT_MANAGED_MEMORY_SIZE = MemorySize.parse("80m");
@@ -105,7 +109,8 @@ public class MiniClusterResource extends ExternalResource {
.toMilliseconds()));
final List<CompletableFuture<Acknowledge>> jobCancellationFutures =
- miniCluster.listJobs()
+ miniCluster
+ .listJobs()
.get(
jobCancellationDeadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS)
@@ -120,7 +125,8 @@ public class MiniClusterResource extends ExternalResource {
CommonTestUtils.waitUntilCondition(
() -> {
final long unfinishedJobs =
- miniCluster.listJobs()
+ miniCluster
+ .listJobs()
.get(
jobCancellationDeadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS)
@@ -186,6 +192,8 @@ public class MiniClusterResource extends ExternalResource {
configuration.setInteger(JobManagerOptions.PORT, 0);
configuration.setString(RestOptions.BIND_PORT, "0");
+ randomizeConfiguration(configuration);
+
final MiniClusterConfiguration miniClusterConfiguration =
new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
@@ -206,6 +214,19 @@ public class MiniClusterResource extends ExternalResource {
createClientConfiguration(restAddress);
}
+ /**
+ * This is the place for randomization the configuration that relates to task execution such as
+ * TaskManagerConf. Configurations which relates to streaming should be randomized in
+ * TestStreamEnvironment#randomizeConfiguration.
+ */
+ private static void randomizeConfiguration(Configuration configuration) {
+ // randomize ITTests for enabling buffer de-bloating
+ if (RANDOMIZE_BUFFER_DEBLOAT_CONFIG
+ && !configuration.contains(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED)) {
+ randomize(configuration, TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, true, false);
+ }
+ }
+
private void createClientConfiguration(URI restAddress) {
Configuration restClientConfig = new Configuration();
restClientConfig.setString(JobManagerOptions.ADDRESS, restAddress.getHost());
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 8df47f0..be095f8 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.util;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
@@ -45,8 +45,6 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
Boolean.parseBoolean(System.getProperty("checkpointing.randomization", "false"));
private static final String STATE_CHANGE_LOG_CONFIG =
System.getProperty("checkpointing.changelog", STATE_CHANGE_LOG_CONFIG_UNSET).trim();
- private static final boolean RANDOMIZE_BUFFER_DEBLOAT_CONFIG =
- Boolean.parseBoolean(System.getProperty("buffer-debloat.randomization", "false"));
public TestStreamEnvironment(
MiniCluster miniCluster,
@@ -86,33 +84,9 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
TestStreamEnvironment env =
new TestStreamEnvironment(
miniCluster, parallelism, jarFiles, classpaths);
- if (RANDOMIZE_CHECKPOINTING_CONFIG) {
- randomize(
- conf, ExecutionCheckpointingOptions.ENABLE_UNALIGNED, true, false);
- randomize(
- conf,
- ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT,
- Duration.ofSeconds(0),
- Duration.ofMillis(100),
- Duration.ofSeconds(2));
- }
- if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) {
- if (isConfigurationSupportedByChangelog(miniCluster.getConfiguration())) {
- conf.set(CheckpointingOptions.ENABLE_STATE_CHANGE_LOG, true);
- }
- } else if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(
- STATE_CHANGE_LOG_CONFIG_RAND)) {
- if (isConfigurationSupportedByChangelog(miniCluster.getConfiguration())) {
- randomize(
- conf,
- CheckpointingOptions.ENABLE_STATE_CHANGE_LOG,
- true,
- false);
- }
- }
- if (RANDOMIZE_BUFFER_DEBLOAT_CONFIG) {
- randomize(conf, TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, true, false);
- }
+
+ randomizeConfiguration(miniCluster, conf);
+
env.configure(conf, env.getUserClassloader());
return env;
};
@@ -120,6 +94,42 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
initializeContextEnvironment(factory);
}
+ /**
+ * This is the place for randomization the configuration that relates to DataStream API such as
+ * ExecutionConf, CheckpointConf, StreamExecutionEnvironment. List of the configurations can be
+ * found here {@link StreamExecutionEnvironment#configure(ReadableConfig, ClassLoader)}. All
+ * other configuration should be randomized here {@link
+ * org.apache.flink.runtime.testutils.MiniClusterResource#randomizeConfiguration(Configuration)}.
+ */
+ private static void randomizeConfiguration(MiniCluster miniCluster, Configuration conf) {
+ // randomize ITTests for enabling unaligned checkpoint
+ if (RANDOMIZE_CHECKPOINTING_CONFIG) {
+ randomize(conf, ExecutionCheckpointingOptions.ENABLE_UNALIGNED, true, false);
+ randomize(
+ conf,
+ ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT,
+ Duration.ofSeconds(0),
+ Duration.ofMillis(100),
+ Duration.ofSeconds(2));
+ }
+
+ // randomize ITTests for enabling state change log
+ if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) {
+ if (isConfigurationSupportedByChangelog(miniCluster.getConfiguration())) {
+ conf.set(CheckpointingOptions.ENABLE_STATE_CHANGE_LOG, true);
+ }
+ } else if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(
+ STATE_CHANGE_LOG_CONFIG_RAND)) {
+ if (isConfigurationSupportedByChangelog(miniCluster.getConfiguration())) {
+ randomize(
+ conf,
+ CheckpointingOptions.ENABLE_STATE_CHANGE_LOG,
+ true,
+ false);
+ }
+ }
+ }
+
private static boolean isConfigurationSupportedByChangelog(Configuration configuration) {
return !configuration.get(LOCAL_RECOVERY);
}