You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2021/11/08 22:58:31 UTC

[gobblin] branch master updated: remove use of deprecated helix class (#3424)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2776796  remove use of deprecated helix class (#3424)
2776796 is described below

commit 2776796e9dd983a2e790e90e31eef9cd17dc1b71
Author: Arjun Singh Bora <ab...@linkedin.com>
AuthorDate: Mon Nov 8 14:58:23 2021 -0800

    remove use of deprecated helix class (#3424)
    
    codestyle changes
---
 .../gobblin/aws/GobblinAWSClusterManager.java      |  8 +--
 .../gobblin/cluster/GobblinClusterManager.java     | 23 ++------
 .../gobblin/cluster/GobblinHelixMultiManager.java  | 69 +++++++++-------------
 .../cluster/TestShutdownMessageHandlerFactory.java |  6 +-
 ...ControllerUserDefinedMessageHandlerFactory.java | 22 +++----
 .../modules/core/GobblinServiceManager.java        |  9 +--
 .../gobblin/yarn/GobblinApplicationMaster.java     | 10 ++--
 7 files changed, 60 insertions(+), 87 deletions(-)

diff --git a/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSClusterManager.java b/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSClusterManager.java
index 17865df..ebd0dc3 100644
--- a/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSClusterManager.java
+++ b/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSClusterManager.java
@@ -30,7 +30,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
-import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
 import org.apache.helix.model.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -83,10 +83,10 @@ public class GobblinAWSClusterManager extends GobblinClusterManager {
   }
 
   /**
-   * A custom {@link MessageHandlerFactory} for {@link ControllerUserDefinedMessageHandler}s that
+   * A custom {@link MultiTypeMessageHandlerFactory} for {@link ControllerUserDefinedMessageHandler}s that
    * handle messages of type {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.
    */
-  private static class ControllerUserDefinedMessageHandlerFactory implements MessageHandlerFactory {
+  private static class ControllerUserDefinedMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
 
     @Override
     public MessageHandler createHandler(Message message, NotificationContext context) {
@@ -122,7 +122,7 @@ public class GobblinAWSClusterManager extends GobblinClusterManager {
       }
 
       @Override
-      public HelixTaskResult handleMessage() throws InterruptedException {
+      public HelixTaskResult handleMessage() {
         LOGGER.warn(String
             .format("No handling setup for %s message of subtype: %s", Message.MessageType.USER_DEFINE_MSG.toString(),
                 this._message.getMsgSubType()));
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 3fb37ee..258ef2a 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
@@ -38,7 +37,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.helix.Criteria;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
-import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.Message;
 import org.slf4j.Logger;
@@ -310,12 +309,7 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
 
       // Need this in case a kill is issued to the process so that the idle thread does not keep the process up
       // since GobblinClusterManager.stop() is not called this case.
-      Runtime.getRuntime().addShutdownHook(new Thread() {
-        @Override
-        public void run() {
-          GobblinClusterManager.this.stopIdleProcessThread = true;
-        }
-      });
+      Runtime.getRuntime().addShutdownHook(new Thread(() -> GobblinClusterManager.this.stopIdleProcessThread = true));
     } else {
       startAppLauncherAndServices();
     }
@@ -403,12 +397,12 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
   }
 
   /**
-   * Creates and returns a {@link MessageHandlerFactory} for handling of Helix
+   * Creates and returns a {@link MultiTypeMessageHandlerFactory} for handling of Helix
    * {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}s.
    *
-   * @returns a {@link MessageHandlerFactory}.
+   * @returns a {@link MultiTypeMessageHandlerFactory}.
    */
-  protected MessageHandlerFactory getUserDefinedMessageHandlerFactory() {
+  protected MultiTypeMessageHandlerFactory getUserDefinedMessageHandlerFactory() {
     return new GobblinHelixMultiManager.ControllerUserDefinedMessageHandlerFactory();
   }
 
@@ -434,12 +428,7 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
   @VisibleForTesting
   void initializeHelixManager() {
     this.multiManager = new GobblinHelixMultiManager(
-        this.config, new Function<Void, MessageHandlerFactory>() {
-          @Override
-          public MessageHandlerFactory apply(Void aVoid) {
-            return GobblinClusterManager.this.getUserDefinedMessageHandlerFactory();
-          }
-        }, this.eventBus, stopStatus) ;
+        this.config, aVoid -> GobblinClusterManager.this.getUserDefinedMessageHandlerFactory(), this.eventBus, stopStatus) ;
     this.multiManager.addLeadershipChangeAwareComponent(this);
   }
 
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
index 5e2b727..d85e1f1 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
@@ -28,17 +28,17 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
-import org.apache.helix.ControllerChangeListener;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
-import org.apache.helix.LiveInstanceChangeListener;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.api.listeners.ControllerChangeListener;
+import org.apache.helix.api.listeners.LiveInstanceChangeListener;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
-import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.task.TargetState;
@@ -123,25 +123,19 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
    */
   private boolean dedicatedJobClusterController = true;
 
-  /**
-   * Create a dedicated controller for planning job distribution.
-   */
-  private boolean dedicatedTaskDriverClusterController = true;
-
   @Getter
   boolean isLeader = false;
   boolean isStandaloneMode = false;
-  private GobblinClusterManager.StopStatus stopStatus;
-  private Config config;
-  private EventBus eventBus;
-  private final MetricContext metricContext;
+  private final GobblinClusterManager.StopStatus stopStatus;
+  private final Config config;
+  private final EventBus eventBus;
   private final HelixManagerMetrics metrics;
-  private final MessageHandlerFactory userDefinedMessageHandlerFactory;
-  private List<LeadershipChangeAwareComponent> leadershipChangeAwareComponents = Lists.newArrayList();
+  private final MultiTypeMessageHandlerFactory userDefinedMessageHandlerFactory;
+  private final List<LeadershipChangeAwareComponent> leadershipChangeAwareComponents = Lists.newArrayList();
 
   public GobblinHelixMultiManager(
       Config config,
-      Function<Void, MessageHandlerFactory> messageHandlerFactoryFunction,
+      Function<Void, MultiTypeMessageHandlerFactory> messageHandlerFactoryFunction,
       EventBus eventBus,
       GobblinClusterManager.StopStatus stopStatus) {
     this.config = config;
@@ -149,8 +143,8 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
     this.stopStatus = stopStatus;
     this.isStandaloneMode = ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE_KEY,
         GobblinClusterConfigurationKeys.DEFAULT_STANDALONE_CLUSTER_MODE);
-    this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass());
-    this.metrics = new HelixManagerMetrics(this.metricContext, this.config);
+    MetricContext metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass());
+    this.metrics = new HelixManagerMetrics(metricContext, this.config);
     this.dedicatedManagerCluster = ConfigUtils.getBoolean(config,
        GobblinClusterConfigurationKeys.DEDICATED_MANAGER_CLUSTER_ENABLED,false);
     this.dedicatedTaskDriverCluster = ConfigUtils.getBoolean(config,
@@ -212,13 +206,15 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
             GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY,
             InstanceType.ADMINISTRATOR));
 
-        this.dedicatedTaskDriverClusterController = ConfigUtils.getBoolean(
-            this.config,
-            GobblinClusterConfigurationKeys.DEDICATED_TASK_DRIVER_CLUSTER_CONTROLLER_ENABLED,
-            true);
+        /**
+         * Create a dedicated controller for planning job distribution.
+         */
+        boolean dedicatedTaskDriverClusterController = ConfigUtils
+            .getBoolean(this.config, GobblinClusterConfigurationKeys.DEDICATED_TASK_DRIVER_CLUSTER_CONTROLLER_ENABLED,
+                true);
 
-        // This will creat a dedicated controller for planning job distribution
-        if (this.dedicatedTaskDriverClusterController) {
+        // This will create a dedicated controller for planning job distribution
+        if (dedicatedTaskDriverClusterController) {
           this.taskDriverClusterController = Optional.of(GobblinHelixMultiManager
               .buildHelixManager(this.config, zkConnectionString, GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY,
                   InstanceType.CONTROLLER));
@@ -266,12 +262,7 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
       // standalone mode listens for controller change
       if (this.isStandaloneMode) {
         // Subscribe to leadership changes
-        this.managerClusterHelixManager.addControllerListener(new ControllerChangeListener() {
-          @Override
-          public void onControllerChange(NotificationContext changeContext) {
-            handleLeadershipChange(changeContext);
-          }
-        });
+        this.managerClusterHelixManager.addControllerListener((ControllerChangeListener) this::handleLeadershipChange);
       }
     } catch (Exception e) {
       log.error("HelixManager failed to connect", e);
@@ -369,9 +360,7 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
   public void cleanUpJobs() {
     cleanUpJobs(this.jobClusterHelixManager);
 
-    if (this.taskDriverHelixManager.isPresent()) {
-      cleanUpJobs(this.taskDriverHelixManager.get());
-    }
+    this.taskDriverHelixManager.ifPresent(this::cleanUpJobs);
   }
 
   private void cleanUpJobs(HelixManager helixManager) {
@@ -409,10 +398,10 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
   }
 
   /**
-   * A custom {@link MessageHandlerFactory} for {@link MessageHandler}s that handle messages of type
+   * A custom {@link MultiTypeMessageHandlerFactory} for {@link MessageHandler}s that handle messages of type
    * "SHUTDOWN" for shutting down the controller.
    */
-  private class ControllerShutdownMessageHandlerFactory implements MessageHandlerFactory {
+  private class ControllerShutdownMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
 
     @Override
     public MessageHandler createHandler(Message message, NotificationContext context) {
@@ -444,7 +433,7 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
       }
 
       @Override
-      public HelixTaskResult handleMessage() throws InterruptedException {
+      public HelixTaskResult handleMessage() {
         String messageSubType = this._message.getMsgSubType();
         Preconditions.checkArgument(
             messageSubType.equalsIgnoreCase(HelixMessageSubTypes.APPLICATION_MASTER_SHUTDOWN.toString()),
@@ -494,10 +483,10 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
   }
 
   /**
-   * A custom {@link MessageHandlerFactory} for {@link ControllerUserDefinedMessageHandler}s that
+   * A custom {@link MultiTypeMessageHandlerFactory} for {@link ControllerUserDefinedMessageHandler}s that
    * handle messages of type {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.
    */
-  static class ControllerUserDefinedMessageHandlerFactory implements MessageHandlerFactory {
+  static class ControllerUserDefinedMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
 
     @Override
     public MessageHandler createHandler(Message message, NotificationContext context) {
@@ -534,7 +523,7 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
       }
 
       @Override
-      public HelixTaskResult handleMessage() throws InterruptedException {
+      public HelixTaskResult handleMessage() {
         log.warn(String
             .format("No handling setup for %s message of subtype: %s", Message.MessageType.USER_DEFINE_MSG.toString(),
                 this._message.getMsgSubType()));
@@ -556,9 +545,9 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
   /**
    * Helix related metrics
    */
-  private class HelixManagerMetrics extends StandardMetricsBridge.StandardMetrics {
+  private static class HelixManagerMetrics extends StandardMetricsBridge.StandardMetrics {
     public static final String CLUSTER_LEADERSHIP_CHANGE = "clusterLeadershipChange";
-    private ContextAwareHistogram clusterLeadershipChange;
+    private final ContextAwareHistogram clusterLeadershipChange;
     public HelixManagerMetrics(final MetricContext metricContext, final Config config) {
       int timeWindowSizeInMinutes = ConfigUtils.getInt(config, ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
       this.clusterLeadershipChange = metricContext.contextAwareHistogram(CLUSTER_LEADERSHIP_CHANGE, timeWindowSizeInMinutes, TimeUnit.MINUTES);
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestShutdownMessageHandlerFactory.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestShutdownMessageHandlerFactory.java
index cd805bf..cd22339 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestShutdownMessageHandlerFactory.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestShutdownMessageHandlerFactory.java
@@ -23,17 +23,17 @@ import java.util.List;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
-import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
 import org.apache.helix.model.Message;
 import org.testng.Assert;
 
 
 /**
- * A test implementation of {@link MessageHandlerFactory}.
+ * A test implementation of {@link MultiTypeMessageHandlerFactory}.
  *
  * @author Yinan Li
  */
-public class TestShutdownMessageHandlerFactory implements MessageHandlerFactory {
+public class TestShutdownMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
 
   private final HelixMessageTestBase helixMessageTestBase;
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
index 41e7a33..6e9cff7 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
@@ -27,7 +27,7 @@ import java.util.Properties;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
-import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
 import org.apache.helix.model.Message;
 
 import lombok.AllArgsConstructor;
@@ -44,15 +44,15 @@ import org.apache.gobblin.service.modules.restli.FlowConfigUtils;
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
 
 /**
- * A custom {@link MessageHandlerFactory} for {@link org.apache.gobblin.service.modules.core.ControllerUserDefinedMessageHandlerFactory}s that
+ * A custom {@link MultiTypeMessageHandlerFactory} for {@link org.apache.gobblin.service.modules.core.ControllerUserDefinedMessageHandlerFactory}s that
  * handle messages of type {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.
  */
 @AllArgsConstructor
-class ControllerUserDefinedMessageHandlerFactory implements MessageHandlerFactory {
-  private boolean flowCatalogLocalCommit;
-  private GobblinServiceJobScheduler jobScheduler;
-  private FlowConfigsResourceHandler resourceHandler;
-  private String serviceName;
+class ControllerUserDefinedMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
+  private final boolean flowCatalogLocalCommit;
+  private final GobblinServiceJobScheduler jobScheduler;
+  private final FlowConfigsResourceHandler resourceHandler;
+  private final String serviceName;
 
   @Override
   public MessageHandler createHandler(Message message, NotificationContext context) {
@@ -78,10 +78,10 @@ class ControllerUserDefinedMessageHandlerFactory implements MessageHandlerFactor
    */
   @Slf4j
   private static class ControllerUserDefinedMessageHandler extends MessageHandler {
-    private boolean flowCatalogLocalCommit;
-    private GobblinServiceJobScheduler jobScheduler;
-    private FlowConfigsResourceHandler resourceHandler;
-    private String serviceName;
+    private final boolean flowCatalogLocalCommit;
+    private final GobblinServiceJobScheduler jobScheduler;
+    private final FlowConfigsResourceHandler resourceHandler;
+    private final String serviceName;
 
     public ControllerUserDefinedMessageHandler(Message message, NotificationContext context, String serviceName,
         boolean flowCatalogLocalCommit, GobblinServiceJobScheduler scheduler,
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index e0b4d4d..c7fc958 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -34,9 +34,9 @@ import org.apache.commons.lang3.ObjectUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.helix.ControllerChangeListener;
 import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.api.listeners.ControllerChangeListener;
 import org.apache.helix.model.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -417,12 +417,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
 
     if (this.helixManager.isPresent()) {
       // Subscribe to leadership changes
-      this.helixManager.get().addControllerListener(new ControllerChangeListener() {
-        @Override
-        public void onControllerChange(NotificationContext changeContext) {
-          handleLeadershipChange(changeContext);
-        }
-      });
+      this.helixManager.get().addControllerListener((ControllerChangeListener) this::handleLeadershipChange);
 
 
       // Update for first time since there might be no notification
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index 9706314..e64bb21 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
-import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
 import org.apache.helix.model.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -131,15 +131,15 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
   }
 
   @Override
-  protected MessageHandlerFactory getUserDefinedMessageHandlerFactory() {
+  protected MultiTypeMessageHandlerFactory getUserDefinedMessageHandlerFactory() {
     return new ControllerUserDefinedMessageHandlerFactory();
   }
 
   /**
-   * A custom {@link MessageHandlerFactory} for {@link ControllerUserDefinedMessageHandler}s that
+   * A custom {@link MultiTypeMessageHandlerFactory} for {@link ControllerUserDefinedMessageHandler}s that
    * handle messages of type {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.
    */
-  private class ControllerUserDefinedMessageHandlerFactory implements MessageHandlerFactory {
+  private class ControllerUserDefinedMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
 
     @Override
     public MessageHandler createHandler(Message message, NotificationContext context) {
@@ -178,7 +178,7 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
       }
 
       @Override
-      public HelixTaskResult handleMessage() throws InterruptedException {
+      public HelixTaskResult handleMessage() {
         String messageSubType = this._message.getMsgSubType();
 
         if (messageSubType.equalsIgnoreCase(HelixMessageSubTypes.TOKEN_FILE_UPDATED.toString())) {