You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2022/11/24 12:15:19 UTC
[flink] branch master updated: [FLINK-29779] Pass PluginManager into MiniCluster
This is an automated email from the ASF dual-hosted git repository.
rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9984b09cb9a [FLINK-29779] Pass PluginManager into MiniCluster
9984b09cb9a is described below
commit 9984b09cb9a2af9f2bde0e973cb8ce375942bd8c
Author: Robert Metzger <rm...@decodable.co>
AuthorDate: Wed Oct 26 15:34:17 2022 +0200
[FLINK-29779] Pass PluginManager into MiniCluster
---
.../flink/runtime/minicluster/MiniCluster.java | 3 ++-
.../minicluster/MiniClusterConfiguration.java | 21 +++++++++++++++++++--
.../TestingMiniClusterConfiguration.java | 3 ++-
3 files changed, 23 insertions(+), 4 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index bc21f6108f6..bab16dd21f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -1103,7 +1103,8 @@ public class MiniCluster implements AutoCloseableAsync {
Configuration config, long maximumMessageSizeInBytes) {
return new MetricRegistryImpl(
MetricRegistryConfiguration.fromConfiguration(config, maximumMessageSizeInBytes),
- ReporterSetup.fromConfiguration(config, null));
+ ReporterSetup.fromConfiguration(
+ config, miniClusterConfiguration.getPluginManager()));
}
/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 51be3944f3c..9b2b2bddc48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
import org.apache.flink.util.Preconditions;
@@ -50,6 +51,8 @@ public class MiniClusterConfiguration {
private final MiniCluster.HaServices haServices;
+ @Nullable private final PluginManager pluginManager;
+
// ------------------------------------------------------------------------
// Construction
// ------------------------------------------------------------------------
@@ -59,12 +62,14 @@ public class MiniClusterConfiguration {
int numTaskManagers,
RpcServiceSharing rpcServiceSharing,
@Nullable String commonBindAddress,
- MiniCluster.HaServices haServices) {
+ MiniCluster.HaServices haServices,
+ @Nullable PluginManager pluginManager) {
this.numTaskManagers = numTaskManagers;
this.configuration = generateConfiguration(Preconditions.checkNotNull(configuration));
this.rpcServiceSharing = Preconditions.checkNotNull(rpcServiceSharing);
this.commonBindAddress = commonBindAddress;
this.haServices = haServices;
+ this.pluginManager = pluginManager;
}
private UnmodifiableConfiguration generateConfiguration(final Configuration configuration) {
@@ -100,6 +105,11 @@ public class MiniClusterConfiguration {
return rpcServiceSharing;
}
+ @Nullable
+ public PluginManager getPluginManager() {
+ return pluginManager;
+ }
+
public int getNumTaskManagers() {
return numTaskManagers;
}
@@ -176,6 +186,7 @@ public class MiniClusterConfiguration {
@Nullable private String commonBindAddress = null;
private MiniCluster.HaServices haServices = MiniCluster.HaServices.CONFIGURED;
private boolean useRandomPorts = false;
+ @Nullable private PluginManager pluginManager;
public Builder setConfiguration(Configuration configuration1) {
this.configuration = Preconditions.checkNotNull(configuration1);
@@ -212,6 +223,11 @@ public class MiniClusterConfiguration {
return this;
}
+ public Builder setPluginManager(PluginManager pluginManager) {
+ this.pluginManager = Preconditions.checkNotNull(pluginManager);
+ return this;
+ }
+
public MiniClusterConfiguration build() {
final Configuration modifiedConfiguration = new Configuration(configuration);
modifiedConfiguration.setInteger(
@@ -234,7 +250,8 @@ public class MiniClusterConfiguration {
numTaskManagers,
rpcServiceSharing,
commonBindAddress,
- haServices);
+ haServices,
+ pluginManager);
}
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java
index a9b456909d9..9333b294212 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java
@@ -55,7 +55,8 @@ public class TestingMiniClusterConfiguration extends MiniClusterConfiguration {
numTaskManagers,
rpcServiceSharing,
commonBindAddress,
- MiniCluster.HaServices.CONFIGURED);
+ MiniCluster.HaServices.CONFIGURED,
+ null);
this.numberDispatcherResourceManagerComponents = numberDispatcherResourceManagerComponents;
this.localCommunication = localCommunication;
}