You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ku...@apache.org on 2020/04/07 22:23:36 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1110] fix deadlock in job cancellation replacing deprecated class MessageHandlerFactory with MultiTypeMessageHandlerFactory

This is an automated email from the ASF dual-hosted git repository.

kuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 713e1d9  [GOBBLIN-1110] fix deadlock in job cancellation replacing deprecated class MessageHandlerFactory with MultiTypeMessageHandlerFactory
713e1d9 is described below

commit 713e1d96c00a0e7dda58649991c09b2b24de68b0
Author: Arjun <ab...@linkedin.com>
AuthorDate: Tue Apr 7 15:23:25 2020 -0700

    [GOBBLIN-1110] fix deadlock in job cancellation
    replacing deprecated class MessageHandlerFactory with MultiTypeMessageHandlerFactory
    
    Closes #2950 from arjun4084346/taskDriverStop
---
 .../apache/gobblin/aws/GobblinAWSTaskRunner.java   |  7 ++--
 .../gobblin/cluster/GobblinHelixJobLauncher.java   |  4 +--
 .../apache/gobblin/cluster/GobblinTaskRunner.java  | 42 +++++++++++-----------
 .../org/apache/gobblin/metrics/GobblinMetrics.java |  6 +---
 .../gobblin/runtime/AbstractJobLauncher.java       |  2 ++
 .../apache/gobblin/yarn/GobblinYarnTaskRunner.java | 12 +++----
 6 files changed, 37 insertions(+), 36 deletions(-)

diff --git a/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSTaskRunner.java b/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSTaskRunner.java
index 4bff1ff..0e8f1eb 100644
--- a/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSTaskRunner.java
+++ b/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSTaskRunner.java
@@ -30,6 +30,7 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
 import org.apache.helix.model.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -88,15 +89,15 @@ public class GobblinAWSTaskRunner extends GobblinTaskRunner {
   }
 
   @Override
-  public MessageHandlerFactory getUserDefinedMessageHandlerFactory() {
+  public MultiTypeMessageHandlerFactory getUserDefinedMessageHandlerFactory() {
     return new ParticipantUserDefinedMessageHandlerFactory();
   }
 
   /**
-   * A custom {@link MessageHandlerFactory} for {@link ParticipantUserDefinedMessageHandler}s that
+   * A custom {@link MultiTypeMessageHandlerFactory} for {@link ParticipantUserDefinedMessageHandler}s that
    * handle messages of type {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.
    */
-  private static class ParticipantUserDefinedMessageHandlerFactory implements MessageHandlerFactory {
+  private static class ParticipantUserDefinedMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
 
     @Override
     public MessageHandler createHandler(Message message, NotificationContext context) {
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 418ce62..33758b0 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -255,9 +255,9 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
           // work flow should never be deleted explicitly because it has a expiry time
           // If cancellation is requested, we should set the job state to CANCELLED/ABORT
           this.helixTaskDriver.waitToStop(this.helixWorkFlowName, this.helixJobStopTimeoutSeconds);
-          log.info("stopped the workflow ", this.helixWorkFlowName);
+          log.info("stopped the workflow {}", this.helixWorkFlowName);
         }
-      } catch (HelixException e) {
+      } catch (RuntimeException e) {
         // Cancellation may throw an exception, but Helix set the job state to STOP and it should eventually stop
         // We will keep this.cancellationExecuted and this.cancellationRequested to true and not propagate the exception
         log.error("Failed to stop workflow {} in Helix", helixWorkFlowName, e);
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 7ed555e..8e6cdd1 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -55,7 +55,7 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
-import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
 import org.apache.helix.model.Message;
 import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskStateModelFactory;
@@ -172,7 +172,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
     this.clusterName = this.clusterConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
     logger.info("Configured GobblinTaskRunner work dir to: {}", this.appWorkPath.toString());
 
-    //Set system properties passed in via application config. As an example, Helix uses System#getProperty() for ZK configuration
+    // Set system properties passed in via application config. As an example, Helix uses System#getProperty() for ZK configuration
     // overrides such as sessionTimeout. In this case, the overrides specified
     // in the application configuration have to be extracted and set before initializing HelixManager.
     HelixUtils.setSystemProperties(config);
@@ -182,7 +182,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
     this.containerMetrics = buildContainerMetrics();
 
     logger.info("GobblinTaskRunner({}): applicationName {}, helixInstanceName {}, applicationId {}, taskRunnerId {}, config {}, appWorkDir {}",
-        this.isTaskDriver? "taskDriver" : "worker",
+        this.isTaskDriver ? "taskDriver" : "worker",
         applicationName,
         helixInstanceName,
         applicationId,
@@ -324,7 +324,13 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
   }
 
   public synchronized void stop() {
-    if (this.isStopped || this.stopInProgress) {
+    if (this.isStopped) {
+      logger.info("Gobblin Task runner is already stopped.");
+      return;
+    }
+
+    if (this.stopInProgress) {
+      logger.info("Gobblin Task runner stop already in progress.");
       return;
     }
 
@@ -340,8 +346,8 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
     try {
       stopServices();
     } finally {
+      logger.info("All services are stopped.");
       this.taskStateModelFactory.shutdown();
-
       disconnectHelixManager();
     }
 
@@ -471,12 +477,12 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
   }
 
   /**
-   * Creates and returns a {@link MessageHandlerFactory} for handling of Helix
+   * Creates and returns a {@link MultiTypeMessageHandlerFactory} for handling of Helix
    * {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}s.
    *
-   * @returns a {@link MessageHandlerFactory}.
+   * @returns a {@link MultiTypeMessageHandlerFactory}.
    */
-  protected MessageHandlerFactory getUserDefinedMessageHandlerFactory() {
+  protected MultiTypeMessageHandlerFactory getUserDefinedMessageHandlerFactory() {
     return new ParticipantUserDefinedMessageHandlerFactory();
   }
 
@@ -518,10 +524,10 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
   }
 
   /**
-   * A custom {@link MessageHandlerFactory} for {@link ParticipantShutdownMessageHandler}s that handle messages
+   * A custom {@link MultiTypeMessageHandlerFactory} for {@link ParticipantShutdownMessageHandler}s that handle messages
    * of type "SHUTDOWN" for shutting down the participants.
    */
-  private class ParticipantShutdownMessageHandlerFactory implements MessageHandlerFactory {
+  private class ParticipantShutdownMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
 
     @Override
     public MessageHandler createHandler(Message message, NotificationContext context) {
@@ -553,13 +559,11 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
       }
 
       @Override
-      public HelixTaskResult handleMessage()
-          throws InterruptedException {
+      public HelixTaskResult handleMessage() {
         String messageSubType = this._message.getMsgSubType();
         Preconditions.checkArgument(messageSubType
             .equalsIgnoreCase(HelixMessageSubTypes.WORK_UNIT_RUNNER_SHUTDOWN.toString()), String
-            .format("Unknown %s message subtype: %s", GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
-                messageSubType));
+            .format("Unknown %s message subtype: %s", GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE, messageSubType));
 
         HelixTaskResult result = new HelixTaskResult();
 
@@ -568,8 +572,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
           return result;
         }
 
-        logger
-            .info("Handling message " + HelixMessageSubTypes.WORK_UNIT_RUNNER_SHUTDOWN.toString());
+        logger.info("Handling message " + HelixMessageSubTypes.WORK_UNIT_RUNNER_SHUTDOWN.toString());
 
         ScheduledExecutorService shutdownMessageHandlingCompletionWatcher =
             MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
@@ -607,10 +610,10 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
   }
 
   /**
-   * A custom {@link MessageHandlerFactory} for {@link ParticipantUserDefinedMessageHandler}s that
+   * A custom {@link MultiTypeMessageHandlerFactory} for {@link ParticipantUserDefinedMessageHandler}s that
    * handle messages of type {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.
    */
-  private static class ParticipantUserDefinedMessageHandlerFactory implements MessageHandlerFactory {
+  private static class ParticipantUserDefinedMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
 
     @Override
     public MessageHandler createHandler(Message message, NotificationContext context) {
@@ -647,8 +650,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
       }
 
       @Override
-      public HelixTaskResult handleMessage()
-          throws InterruptedException {
+      public HelixTaskResult handleMessage() {
         logger.warn(String.format("No handling setup for %s message of subtype: %s",
             Message.MessageType.USER_DEFINE_MSG.toString(), this._message.getMsgSubType()));
 
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
index f75da78..e969083 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
@@ -771,15 +771,13 @@ public class GobblinMetrics {
           return;
         }
         this.codahaleReportersCloser.register(scheduledReporter);
-        String reporterSinkMsg = reporterSink.isPresent()?"to " + reporterSink.get():"";
+        String reporterSinkMsg = reporterSink.isPresent() ? "to " + reporterSink.get() : "";
         LOGGER.info("Will start reporting metrics " + reporterSinkMsg + " using " + reporterClass);
         this.codahaleScheduledReporters.add(scheduledReporter);
-
       } else if (CustomReporterFactory.class.isAssignableFrom(clazz)) {
         CustomReporterFactory customReporterFactory = ((CustomReporterFactory) clazz.getConstructor().newInstance());
         customReporterFactory.newScheduledReporter(properties);
         LOGGER.info("Will start reporting metrics using " + reporterClass);
-
       } else {
         throw new IllegalArgumentException("Class " + reporterClass +
             " specified by key " + ConfigurationKeys.METRICS_CUSTOM_BUILDERS + " must implement: "
@@ -789,11 +787,9 @@ public class GobblinMetrics {
       LOGGER.warn(String
           .format("Failed to create metric reporter: requested CustomReporterFactory %s not found.", reporterClass),
           exception);
-
     } catch (NoSuchMethodException exception) {
       LOGGER.warn(String.format("Failed to create metric reporter: requested CustomReporterFactory %s "
           + "does not have parameterless constructor.", reporterClass), exception);
-
     } catch (Exception exception) {
       LOGGER.warn("Could not create metric reporter from builder " + reporterClass + ".", exception);
     }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 8ec6fed..7d3081b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -714,6 +714,8 @@ public abstract class AbstractJobLauncher implements JobLauncher {
 
   /**
    * Execute the job cancellation.
+   * The implementation should not throw any exceptions because that will kill the `Cancellation Executor` thread
+   * and will create a deadlock.
    */
   protected abstract void executeCancellation();
 
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index d59a429..8e88b2e 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
-import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
 import org.apache.helix.model.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,7 +84,7 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
             services.add(gobblinYarnLogSource.buildLogCopier(this.clusterConfig, this.taskRunnerId, this.fs,
                 new Path(containerLogDir, GobblinClusterUtils.getAppWorkDirPath(this.applicationName, this.applicationId))));
         } catch (Exception e) {
-          LOGGER.warn("Cannot add LogCopier service to the service manager due to {}", e);
+          LOGGER.warn("Cannot add LogCopier service to the service manager due to", e);
         }
       }
     }
@@ -92,15 +92,15 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
   }
 
   @Override
-  public MessageHandlerFactory getUserDefinedMessageHandlerFactory() {
+  public MultiTypeMessageHandlerFactory getUserDefinedMessageHandlerFactory() {
     return new ParticipantUserDefinedMessageHandlerFactory();
   }
 
   /**
-   * A custom {@link MessageHandlerFactory} for {@link ParticipantUserDefinedMessageHandler}s that
+   * A custom {@link MultiTypeMessageHandlerFactory} for {@link ParticipantUserDefinedMessageHandler}s that
    * handle messages of type {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.
    */
-  private class ParticipantUserDefinedMessageHandlerFactory implements MessageHandlerFactory {
+  private class ParticipantUserDefinedMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
 
     @Override
     public MessageHandler createHandler(Message message, NotificationContext context) {
@@ -139,7 +139,7 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
       }
 
       @Override
-      public HelixTaskResult handleMessage() throws InterruptedException {
+      public HelixTaskResult handleMessage() {
         String messageSubType = this._message.getMsgSubType();
 
         if (messageSubType.equalsIgnoreCase(org.apache.gobblin.cluster.HelixMessageSubTypes.TOKEN_FILE_UPDATED.toString())) {