You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ku...@apache.org on 2020/04/07 22:23:36 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1110] fix
deadlock in job cancellation replacing deprecated class
MessageHandlerFactory with MultiTypeMessageHandlerFactory
This is an automated email from the ASF dual-hosted git repository.
kuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 713e1d9 [GOBBLIN-1110] fix deadlock in job cancellation replacing deprecated class MessageHandlerFactory with MultiTypeMessageHandlerFactory
713e1d9 is described below
commit 713e1d96c00a0e7dda58649991c09b2b24de68b0
Author: Arjun <ab...@linkedin.com>
AuthorDate: Tue Apr 7 15:23:25 2020 -0700
[GOBBLIN-1110] fix deadlock in job cancellation
replacing deprecated class MessageHandlerFactory with MultiTypeMessageHandlerFactory
Closes #2950 from arjun4084346/taskDriverStop
---
.../apache/gobblin/aws/GobblinAWSTaskRunner.java | 7 ++--
.../gobblin/cluster/GobblinHelixJobLauncher.java | 4 +--
.../apache/gobblin/cluster/GobblinTaskRunner.java | 42 +++++++++++-----------
.../org/apache/gobblin/metrics/GobblinMetrics.java | 6 +---
.../gobblin/runtime/AbstractJobLauncher.java | 2 ++
.../apache/gobblin/yarn/GobblinYarnTaskRunner.java | 12 +++----
6 files changed, 37 insertions(+), 36 deletions(-)
diff --git a/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSTaskRunner.java b/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSTaskRunner.java
index 4bff1ff..0e8f1eb 100644
--- a/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSTaskRunner.java
+++ b/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSTaskRunner.java
@@ -30,6 +30,7 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
import org.apache.helix.model.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,15 +89,15 @@ public class GobblinAWSTaskRunner extends GobblinTaskRunner {
}
@Override
- public MessageHandlerFactory getUserDefinedMessageHandlerFactory() {
+ public MultiTypeMessageHandlerFactory getUserDefinedMessageHandlerFactory() {
return new ParticipantUserDefinedMessageHandlerFactory();
}
/**
- * A custom {@link MessageHandlerFactory} for {@link ParticipantUserDefinedMessageHandler}s that
+ * A custom {@link MultiTypeMessageHandlerFactory} for {@link ParticipantUserDefinedMessageHandler}s that
* handle messages of type {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.
*/
- private static class ParticipantUserDefinedMessageHandlerFactory implements MessageHandlerFactory {
+ private static class ParticipantUserDefinedMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 418ce62..33758b0 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -255,9 +255,9 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
// work flow should never be deleted explicitly because it has a expiry time
// If cancellation is requested, we should set the job state to CANCELLED/ABORT
this.helixTaskDriver.waitToStop(this.helixWorkFlowName, this.helixJobStopTimeoutSeconds);
- log.info("stopped the workflow ", this.helixWorkFlowName);
+ log.info("stopped the workflow {}", this.helixWorkFlowName);
}
- } catch (HelixException e) {
+ } catch (RuntimeException e) {
// Cancellation may throw an exception, but Helix set the job state to STOP and it should eventually stop
// We will keep this.cancellationExecuted and this.cancellationRequested to true and not propagate the exception
log.error("Failed to stop workflow {} in Helix", helixWorkFlowName, e);
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 7ed555e..8e6cdd1 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -55,7 +55,7 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
-import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
import org.apache.helix.model.Message;
import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskStateModelFactory;
@@ -172,7 +172,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
this.clusterName = this.clusterConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
logger.info("Configured GobblinTaskRunner work dir to: {}", this.appWorkPath.toString());
- //Set system properties passed in via application config. As an example, Helix uses System#getProperty() for ZK configuration
+ // Set system properties passed in via application config. As an example, Helix uses System#getProperty() for ZK configuration
// overrides such as sessionTimeout. In this case, the overrides specified
// in the application configuration have to be extracted and set before initializing HelixManager.
HelixUtils.setSystemProperties(config);
@@ -182,7 +182,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
this.containerMetrics = buildContainerMetrics();
logger.info("GobblinTaskRunner({}): applicationName {}, helixInstanceName {}, applicationId {}, taskRunnerId {}, config {}, appWorkDir {}",
- this.isTaskDriver? "taskDriver" : "worker",
+ this.isTaskDriver ? "taskDriver" : "worker",
applicationName,
helixInstanceName,
applicationId,
@@ -324,7 +324,13 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
}
public synchronized void stop() {
- if (this.isStopped || this.stopInProgress) {
+ if (this.isStopped) {
+ logger.info("Gobblin Task runner is already stopped.");
+ return;
+ }
+
+ if (this.stopInProgress) {
+ logger.info("Gobblin Task runner stop already in progress.");
return;
}
@@ -340,8 +346,8 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
try {
stopServices();
} finally {
+ logger.info("All services are stopped.");
this.taskStateModelFactory.shutdown();
-
disconnectHelixManager();
}
@@ -471,12 +477,12 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
}
/**
- * Creates and returns a {@link MessageHandlerFactory} for handling of Helix
+ * Creates and returns a {@link MultiTypeMessageHandlerFactory} for handling of Helix
* {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}s.
*
- * @returns a {@link MessageHandlerFactory}.
+ * @returns a {@link MultiTypeMessageHandlerFactory}.
*/
- protected MessageHandlerFactory getUserDefinedMessageHandlerFactory() {
+ protected MultiTypeMessageHandlerFactory getUserDefinedMessageHandlerFactory() {
return new ParticipantUserDefinedMessageHandlerFactory();
}
@@ -518,10 +524,10 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
}
/**
- * A custom {@link MessageHandlerFactory} for {@link ParticipantShutdownMessageHandler}s that handle messages
+ * A custom {@link MultiTypeMessageHandlerFactory} for {@link ParticipantShutdownMessageHandler}s that handle messages
* of type "SHUTDOWN" for shutting down the participants.
*/
- private class ParticipantShutdownMessageHandlerFactory implements MessageHandlerFactory {
+ private class ParticipantShutdownMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
@@ -553,13 +559,11 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
}
@Override
- public HelixTaskResult handleMessage()
- throws InterruptedException {
+ public HelixTaskResult handleMessage() {
String messageSubType = this._message.getMsgSubType();
Preconditions.checkArgument(messageSubType
.equalsIgnoreCase(HelixMessageSubTypes.WORK_UNIT_RUNNER_SHUTDOWN.toString()), String
- .format("Unknown %s message subtype: %s", GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
- messageSubType));
+ .format("Unknown %s message subtype: %s", GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE, messageSubType));
HelixTaskResult result = new HelixTaskResult();
@@ -568,8 +572,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
return result;
}
- logger
- .info("Handling message " + HelixMessageSubTypes.WORK_UNIT_RUNNER_SHUTDOWN.toString());
+ logger.info("Handling message " + HelixMessageSubTypes.WORK_UNIT_RUNNER_SHUTDOWN.toString());
ScheduledExecutorService shutdownMessageHandlingCompletionWatcher =
MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
@@ -607,10 +610,10 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
}
/**
- * A custom {@link MessageHandlerFactory} for {@link ParticipantUserDefinedMessageHandler}s that
+ * A custom {@link MultiTypeMessageHandlerFactory} for {@link ParticipantUserDefinedMessageHandler}s that
* handle messages of type {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.
*/
- private static class ParticipantUserDefinedMessageHandlerFactory implements MessageHandlerFactory {
+ private static class ParticipantUserDefinedMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
@@ -647,8 +650,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
}
@Override
- public HelixTaskResult handleMessage()
- throws InterruptedException {
+ public HelixTaskResult handleMessage() {
logger.warn(String.format("No handling setup for %s message of subtype: %s",
Message.MessageType.USER_DEFINE_MSG.toString(), this._message.getMsgSubType()));
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
index f75da78..e969083 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
@@ -771,15 +771,13 @@ public class GobblinMetrics {
return;
}
this.codahaleReportersCloser.register(scheduledReporter);
- String reporterSinkMsg = reporterSink.isPresent()?"to " + reporterSink.get():"";
+ String reporterSinkMsg = reporterSink.isPresent() ? "to " + reporterSink.get() : "";
LOGGER.info("Will start reporting metrics " + reporterSinkMsg + " using " + reporterClass);
this.codahaleScheduledReporters.add(scheduledReporter);
-
} else if (CustomReporterFactory.class.isAssignableFrom(clazz)) {
CustomReporterFactory customReporterFactory = ((CustomReporterFactory) clazz.getConstructor().newInstance());
customReporterFactory.newScheduledReporter(properties);
LOGGER.info("Will start reporting metrics using " + reporterClass);
-
} else {
throw new IllegalArgumentException("Class " + reporterClass +
" specified by key " + ConfigurationKeys.METRICS_CUSTOM_BUILDERS + " must implement: "
@@ -789,11 +787,9 @@ public class GobblinMetrics {
LOGGER.warn(String
.format("Failed to create metric reporter: requested CustomReporterFactory %s not found.", reporterClass),
exception);
-
} catch (NoSuchMethodException exception) {
LOGGER.warn(String.format("Failed to create metric reporter: requested CustomReporterFactory %s "
+ "does not have parameterless constructor.", reporterClass), exception);
-
} catch (Exception exception) {
LOGGER.warn("Could not create metric reporter from builder " + reporterClass + ".", exception);
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 8ec6fed..7d3081b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -714,6 +714,8 @@ public abstract class AbstractJobLauncher implements JobLauncher {
/**
* Execute the job cancellation.
+ * The implementation should not throw any exceptions because that will kill the `Cancellation Executor` thread
+ * and will create a deadlock.
*/
protected abstract void executeCancellation();
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index d59a429..8e88b2e 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.helix.NotificationContext;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
-import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
import org.apache.helix.model.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,7 +84,7 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
services.add(gobblinYarnLogSource.buildLogCopier(this.clusterConfig, this.taskRunnerId, this.fs,
new Path(containerLogDir, GobblinClusterUtils.getAppWorkDirPath(this.applicationName, this.applicationId))));
} catch (Exception e) {
- LOGGER.warn("Cannot add LogCopier service to the service manager due to {}", e);
+ LOGGER.warn("Cannot add LogCopier service to the service manager due to", e);
}
}
}
@@ -92,15 +92,15 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
}
@Override
- public MessageHandlerFactory getUserDefinedMessageHandlerFactory() {
+ public MultiTypeMessageHandlerFactory getUserDefinedMessageHandlerFactory() {
return new ParticipantUserDefinedMessageHandlerFactory();
}
/**
- * A custom {@link MessageHandlerFactory} for {@link ParticipantUserDefinedMessageHandler}s that
+ * A custom {@link MultiTypeMessageHandlerFactory} for {@link ParticipantUserDefinedMessageHandler}s that
* handle messages of type {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.
*/
- private class ParticipantUserDefinedMessageHandlerFactory implements MessageHandlerFactory {
+ private class ParticipantUserDefinedMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
@@ -139,7 +139,7 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
}
@Override
- public HelixTaskResult handleMessage() throws InterruptedException {
+ public HelixTaskResult handleMessage() {
String messageSubType = this._message.getMsgSubType();
if (messageSubType.equalsIgnoreCase(org.apache.gobblin.cluster.HelixMessageSubTypes.TOKEN_FILE_UPDATED.toString())) {