You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2022/11/24 12:15:19 UTC

[flink] branch master updated: [FLINK-29779] Pass PluginManager into MiniCluster

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9984b09cb9a [FLINK-29779] Pass PluginManager into MiniCluster
9984b09cb9a is described below

commit 9984b09cb9a2af9f2bde0e973cb8ce375942bd8c
Author: Robert Metzger <rm...@decodable.co>
AuthorDate: Wed Oct 26 15:34:17 2022 +0200

    [FLINK-29779] Pass PluginManager into MiniCluster
---
 .../flink/runtime/minicluster/MiniCluster.java      |  3 ++-
 .../minicluster/MiniClusterConfiguration.java       | 21 +++++++++++++++++++--
 .../TestingMiniClusterConfiguration.java            |  3 ++-
 3 files changed, 23 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index bc21f6108f6..bab16dd21f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -1103,7 +1103,8 @@ public class MiniCluster implements AutoCloseableAsync {
             Configuration config, long maximumMessageSizeInBytes) {
         return new MetricRegistryImpl(
                 MetricRegistryConfiguration.fromConfiguration(config, maximumMessageSizeInBytes),
-                ReporterSetup.fromConfiguration(config, null));
+                ReporterSetup.fromConfiguration(
+                        config, miniClusterConfiguration.getPluginManager()));
     }
 
     /**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 51be3944f3c..9b2b2bddc48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -50,6 +51,8 @@ public class MiniClusterConfiguration {
 
     private final MiniCluster.HaServices haServices;
 
+    @Nullable private final PluginManager pluginManager;
+
     // ------------------------------------------------------------------------
     //  Construction
     // ------------------------------------------------------------------------
@@ -59,12 +62,14 @@ public class MiniClusterConfiguration {
             int numTaskManagers,
             RpcServiceSharing rpcServiceSharing,
             @Nullable String commonBindAddress,
-            MiniCluster.HaServices haServices) {
+            MiniCluster.HaServices haServices,
+            @Nullable PluginManager pluginManager) {
         this.numTaskManagers = numTaskManagers;
         this.configuration = generateConfiguration(Preconditions.checkNotNull(configuration));
         this.rpcServiceSharing = Preconditions.checkNotNull(rpcServiceSharing);
         this.commonBindAddress = commonBindAddress;
         this.haServices = haServices;
+        this.pluginManager = pluginManager;
     }
 
     private UnmodifiableConfiguration generateConfiguration(final Configuration configuration) {
@@ -100,6 +105,11 @@ public class MiniClusterConfiguration {
         return rpcServiceSharing;
     }
 
+    @Nullable
+    public PluginManager getPluginManager() {
+        return pluginManager;
+    }
+
     public int getNumTaskManagers() {
         return numTaskManagers;
     }
@@ -176,6 +186,7 @@ public class MiniClusterConfiguration {
         @Nullable private String commonBindAddress = null;
         private MiniCluster.HaServices haServices = MiniCluster.HaServices.CONFIGURED;
         private boolean useRandomPorts = false;
+        @Nullable private PluginManager pluginManager;
 
         public Builder setConfiguration(Configuration configuration1) {
             this.configuration = Preconditions.checkNotNull(configuration1);
@@ -212,6 +223,11 @@ public class MiniClusterConfiguration {
             return this;
         }
 
+        public Builder setPluginManager(PluginManager pluginManager) {
+            this.pluginManager = Preconditions.checkNotNull(pluginManager);
+            return this;
+        }
+
         public MiniClusterConfiguration build() {
             final Configuration modifiedConfiguration = new Configuration(configuration);
             modifiedConfiguration.setInteger(
@@ -234,7 +250,8 @@ public class MiniClusterConfiguration {
                     numTaskManagers,
                     rpcServiceSharing,
                     commonBindAddress,
-                    haServices);
+                    haServices,
+                    pluginManager);
         }
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java
index a9b456909d9..9333b294212 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java
@@ -55,7 +55,8 @@ public class TestingMiniClusterConfiguration extends MiniClusterConfiguration {
                 numTaskManagers,
                 rpcServiceSharing,
                 commonBindAddress,
-                MiniCluster.HaServices.CONFIGURED);
+                MiniCluster.HaServices.CONFIGURED,
+                null);
         this.numberDispatcherResourceManagerComponents = numberDispatcherResourceManagerComponents;
         this.localCommunication = localCommunication;
     }