You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/03 22:17:49 UTC
flink git commit: [FLINK-7444] [rpc] Make external calls non-blocking
Repository: flink
Updated Branches:
refs/heads/master ab1fbfdfe -> a3df5a2ca
[FLINK-7444] [rpc] Make external calls non-blocking
Make all external calls from the RpcEndpoint's main thread non blocking by
executing them as a runnable in an Executor.
Make FatalErrorHandler calls non asynchronous
This closes #4540.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3df5a2c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3df5a2c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3df5a2c
Branch: refs/heads/master
Commit: a3df5a2ca52880b6681446d95f1d916a01f55681
Parents: ab1fbfd
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Aug 14 14:48:33 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Sep 4 00:17:23 2017 +0200
----------------------------------------------------------------------
.../clusterframework/MesosResourceManager.java | 4 +-
.../flink/runtime/dispatcher/Dispatcher.java | 22 ++----
.../flink/runtime/jobmaster/JobMaster.java | 25 +++----
.../flink/runtime/minicluster/MiniCluster.java | 27 ++++----
.../resourcemanager/ResourceManager.java | 73 ++++++++------------
.../flink/runtime/rpc/FatalErrorHandler.java | 11 +++
.../runtime/taskexecutor/TaskExecutor.java | 45 ++++--------
.../flink/yarn/YarnClusterDescriptorV2.java | 7 --
.../apache/flink/yarn/YarnResourceManager.java | 2 +-
9 files changed, 89 insertions(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a3df5a2c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 9a2ad42..8a8f208 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -416,7 +416,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
// tell the launch coordinator to launch the new tasks
launchCoordinator.tell(new LaunchCoordinator.Launch(Collections.singletonList((LaunchableTask) launchable)), selfActor);
} catch (Exception ex) {
- onFatalErrorAsync(new ResourceManagerException("Unable to request new workers.", ex));
+ onFatalError(new ResourceManagerException("Unable to request new workers.", ex));
}
}
@@ -447,7 +447,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
}
}
catch (Exception e) {
- onFatalErrorAsync(new ResourceManagerException("Unable to release a worker.", e));
+ onFatalError(new ResourceManagerException("Unable to release a worker.", e));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a3df5a2c/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 29262cd..e7e1ec2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -390,48 +390,40 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
public void jobFinished(JobExecutionResult result) {
log.info("Job {} finished.", jobId);
- runAsync(new Runnable() {
- @Override
- public void run() {
+ runAsync(() -> {
try {
removeJob(jobId, true);
} catch (Exception e) {
log.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
}
- }
- });
+ });
}
@Override
public void jobFailed(Throwable cause) {
log.info("Job {} failed.", jobId);
- runAsync(new Runnable() {
- @Override
- public void run() {
+ runAsync(() -> {
try {
removeJob(jobId, true);
} catch (Exception e) {
log.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
}
- }
- });
+ });
}
@Override
public void jobFinishedByOther() {
log.info("Job {} was finished by other JobManager.", jobId);
- runAsync(new Runnable() {
- @Override
- public void run() {
+ runAsync(
+ () -> {
try {
removeJob(jobId, false);
} catch (Exception e) {
log.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
}
- }
- });
+ });
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a3df5a2c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 7e48da1..3e66a34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -877,20 +877,13 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
//----------------------------------------------------------------------------------------------
private void handleFatalError(final Throwable cause) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause);
- try {
- shutDown();
- } catch (Exception e) {
- cause.addSuppressed(e);
- }
+ try {
+ log.error("Fatal error occurred on JobManager.", cause);
+ } catch (Throwable ignore) {}
- errorHandler.onFatalError(cause);
- }
- });
+ // The fatal error handler implementation should make sure that this call is non-blocking
+ errorHandler.onFatalError(cause);
}
private void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) {
@@ -910,7 +903,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
Map<String, Object> accumulatorResults = executionGraph.getAccumulators();
JobExecutionResult result = new JobExecutionResult(jobID, 0L, accumulatorResults);
- jobCompletionActions.jobFinished(result);
+ executor.execute(() -> jobCompletionActions.jobFinished(result));
}
catch (Exception e) {
log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e);
@@ -920,7 +913,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
"The job is registered as 'FINISHED (successful), but this notification describes " +
"a failure, since the resulting accumulators could not be fetched.", e);
- jobCompletionActions.jobFailed(exception);
+ executor.execute(() ->jobCompletionActions.jobFailed(exception));
}
break;
@@ -928,7 +921,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
final JobExecutionException exception = new JobExecutionException(
jobID, "Job was cancelled.", new Exception("The job was cancelled"));
- jobCompletionActions.jobFailed(exception);
+ executor.execute(() -> jobCompletionActions.jobFailed(exception));
break;
}
@@ -936,7 +929,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
final JobExecutionException exception = new JobExecutionException(
jobID, "Job execution failed.", unpackedError);
- jobCompletionActions.jobFailed(exception);
+ executor.execute(() -> jobCompletionActions.jobFailed(exception));
break;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a3df5a2c/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 2e36e9e..9a4a76a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -95,15 +95,13 @@ public class MiniCluster {
@GuardedBy("lock")
private ResourceManagerRunner[] resourceManagerRunners;
- @GuardedBy("lock")
- private TaskExecutor[] taskManagers;
+ private volatile TaskExecutor[] taskManagers;
@GuardedBy("lock")
private MiniClusterJobDispatcher jobDispatcher;
/** Flag marking the mini cluster as started/running */
- @GuardedBy("lock")
- private boolean running;
+ private volatile boolean running;
// ------------------------------------------------------------------------
@@ -150,6 +148,8 @@ public class MiniCluster {
@Deprecated
public MiniCluster(Configuration config, boolean singleRpcService) {
this(createConfig(config, singleRpcService));
+
+ running = false;
}
// ------------------------------------------------------------------------
@@ -645,17 +645,18 @@ public class MiniCluster {
@Override
public void onFatalError(Throwable exception) {
- LOG.error("TaskManager #{} failed.", index, exception);
+ // first check if we are still running
+ if (running) {
+ LOG.error("TaskManager #{} failed.", index, exception);
- try {
- synchronized (lock) {
- // note: if not running (after shutdown) taskManagers may be null!
- if (running && taskManagers[index] != null) {
- taskManagers[index].shutDown();
- }
+ // let's check if there are still TaskManagers because there could be a concurrent
+ // shut down operation taking place
+ TaskExecutor[] currentTaskManagers = taskManagers;
+
+ if (currentTaskManagers != null) {
+ // the shutDown is asynchronous
+ currentTaskManagers[index].shutDown();
}
- } catch (Exception e) {
- LOG.error("TaskManager #{} could not be properly terminated.", index, e);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a3df5a2c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index a9a9e50..6b2c898 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -35,24 +35,23 @@ import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.registration.JobManagerRegistration;
import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceManagerActions;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.jobmaster.JobMaster;
-import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.registration.RegistrationResponse;
-
import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
@@ -76,7 +75,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* ResourceManager implementation. The resource manager is responsible for resource de-/allocation
* and bookkeeping.
*
- * It offers the following methods as part of its rpc interface to interact with him remotely:
+ * <p>It offers the following methods as part of its rpc interface to interact with him remotely:
* <ul>
* <li>{@link #registerJobManager(UUID, UUID, ResourceID, String, JobID, Time)} registers a {@link JobMaster} at the resource manager</li>
* <li>{@link #requestSlot(UUID, UUID, SlotRequest, Time)} requests a slot from the resource manager</li>
@@ -88,10 +87,10 @@ public abstract class ResourceManager<WorkerType extends Serializable>
public static final String RESOURCE_MANAGER_NAME = "resourcemanager";
- /** Unique id of the resource manager */
+ /** Unique id of the resource manager. */
private final ResourceID resourceId;
- /** Configuration of the resource manager */
+ /** Configuration of the resource manager. */
private final ResourceManagerConfiguration resourceManagerConfiguration;
/** All currently registered JobMasterGateways scoped by JobID. */
@@ -100,7 +99,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
/** All currently registered JobMasterGateways scoped by ResourceID. */
private final Map<ResourceID, JobManagerRegistration> jmResourceIdRegistrations;
- /** Service to retrieve the job leader ids */
+ /** Service to retrieve the job leader ids. */
private final JobLeaderIdService jobLeaderIdService;
/** All currently registered TaskExecutors with there framework specific worker information. */
@@ -115,13 +114,13 @@ public abstract class ResourceManager<WorkerType extends Serializable>
/** The heartbeat manager with job managers. */
private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
- /** Registry to use for metrics */
+ /** Registry to use for metrics. */
private final MetricRegistry metricRegistry;
- /** Fatal error handler */
+ /** Fatal error handler. */
private final FatalErrorHandler fatalErrorHandler;
- /** The slot manager maintains the available slots */
+ /** The slot manager maintains the available slots. */
private final SlotManager slotManager;
/** The service to elect a ResourceManager leader. */
@@ -268,7 +267,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
ResourceManagerException exception = new ResourceManagerException("Could not add the job " +
jobId + " to the job id leader service.", e);
- onFatalErrorAsync(exception);
+ onFatalError(exception);
log.error("Could not add job {} to job leader id service.", jobId, e);
return FutureUtils.completedExceptionally(exception);
@@ -287,7 +286,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
ResourceManagerException exception = new ResourceManagerException("Cannot obtain the " +
"job leader id future to verify the correct job leader.", e);
- onFatalErrorAsync(exception);
+ onFatalError(exception);
log.debug("Could not obtain the job leader id future to verify the correct job leader.");
return FutureUtils.completedExceptionally(exception);
@@ -346,7 +345,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
}
/**
- * Register a {@link TaskExecutor} at the resource manager
+ * Register a {@link TaskExecutor} at the resource manager.
*
* @param resourceManagerLeaderId The fencing token for the ResourceManager leader
* @param taskExecutorAddress The address of the TaskExecutor that registers
@@ -454,7 +453,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
}
/**
- * Notification from a TaskExecutor that a slot has become available
+ * Notification from a TaskExecutor that a slot has become available.
+ *
* @param resourceManagerLeaderId TaskExecutor's resource manager leader id
* @param instanceID TaskExecutor's instance id
* @param slotId The slot id of the available slot
@@ -491,13 +491,13 @@ public abstract class ResourceManager<WorkerType extends Serializable>
}
/**
- * Registers an info message listener
+ * Registers an info message listener.
*
* @param address address of infoMessage listener to register to this resource manager
*/
@Override
public void registerInfoMessageListener(final String address) {
- if(infoMessageListeners.containsKey(address)) {
+ if (infoMessageListeners.containsKey(address)) {
log.warn("Receive a duplicate registration from info message listener on ({})", address);
} else {
CompletableFuture<InfoMessageListenerRpcGateway> infoMessageListenerRpcGatewayFuture = getRpcService()
@@ -517,7 +517,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
}
/**
- * Unregisters an info message listener
+ * Unregisters an info message listener.
*
* @param address of the info message listener to unregister from this resource manager
*
@@ -528,7 +528,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
}
/**
- * Cleanup application and shut down cluster
+ * Cleanup application and shut down cluster.
*
* @param finalStatus of the Flink application
* @param optionalDiagnostics for the Flink application
@@ -825,28 +825,15 @@ public abstract class ResourceManager<WorkerType extends Serializable>
/**
* Notifies the ResourceManager that a fatal error has occurred and it cannot proceed.
- * This method should be used when asynchronous threads want to notify the
- * ResourceManager of a fatal error.
- *
- * @param t The exception describing the fatal error
- */
- protected void onFatalErrorAsync(final Throwable t) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- onFatalError(t);
- }
- });
- }
-
- /**
- * Notifies the ResourceManager that a fatal error has occurred and it cannot proceed.
- * This method must only be called from within the ResourceManager's main thread.
*
* @param t The exception describing the fatal error
*/
protected void onFatalError(Throwable t) {
- log.error("Fatal error occurred.", t);
+ try {
+ log.error("Fatal error occurred in ResourceManager.", t);
+ } catch (Throwable ignored) {}
+
+ // The fatal error handler implementation should make sure that this call is non-blocking
fatalErrorHandler.onFatalError(t);
}
@@ -855,7 +842,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
// ------------------------------------------------------------------------
/**
- * Callback method when current resourceManager is granted leadership
+ * Callback method when current resourceManager is granted leadership.
*
* @param newLeaderSessionID unique leadershipID
*/
@@ -904,13 +891,13 @@ public abstract class ResourceManager<WorkerType extends Serializable>
}
/**
- * Handles error occurring in the leader election service
+ * Handles error occurring in the leader election service.
*
* @param exception Exception being thrown in the leader election service
*/
@Override
public void handleError(final Exception exception) {
- onFatalErrorAsync(new ResourceManagerException("Received an error from the LeaderElectionService.", exception));
+ onFatalError(new ResourceManagerException("Received an error from the LeaderElectionService.", exception));
}
// ------------------------------------------------------------------------
@@ -928,7 +915,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
* The framework specific code for shutting down the application. This should report the
* application's final status and shut down the resource manager cleanly.
*
- * This method also needs to make sure all pending containers that are not registered
+ * <p>This method also needs to make sure all pending containers that are not registered
* yet are returned.
*
* @param finalStatus The application status to report.
@@ -1029,7 +1016,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
@Override
public void handleError(Throwable error) {
- onFatalErrorAsync(error);
+ onFatalError(error);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a3df5a2c/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java
index 7721117..dbccaa8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java
@@ -18,7 +18,18 @@
package org.apache.flink.runtime.rpc;
+/**
+ * Handler for fatal errors.
+ */
public interface FatalErrorHandler {
+ /**
+ * Being called when a fatal error occurs.
+ *
+ * <p>IMPORTANT: This call should never be blocking since it might be called from within
+ * the main thread of an {@link RpcEndpoint}.
+ *
+ * @param exception cause
+ */
void onFatalError(Throwable exception);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a3df5a2c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index a5ce84b..21bdeec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -231,7 +231,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
try {
haServices.getResourceManagerLeaderRetriever().start(new ResourceManagerLeaderListener());
} catch (Exception e) {
- onFatalErrorAsync(e);
+ onFatalError(e);
}
// tell the task slot table who's responsible for the task slot actions
@@ -1117,35 +1117,16 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
/**
* Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed.
- * This method should be used when asynchronous threads want to notify the
- * TaskExecutor of a fatal error.
- *
- * @param t The exception describing the fatal error
- */
- void onFatalErrorAsync(final Throwable t) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- onFatalError(t);
- }
- });
- }
-
- /**
- * Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed.
- * This method must only be called from within the TaskExecutor's main thread.
*
* @param t The exception describing the fatal error
*/
void onFatalError(final Throwable t) {
- log.error("Fatal error occurred.", t);
- // this could potentially be a blocking call -> call asynchronously:
- getRpcService().execute(new Runnable() {
- @Override
- public void run() {
- fatalErrorHandler.onFatalError(t);
- }
- });
+ try {
+ log.error("Fatal error occurred in TaskExecutor.", t);
+ } catch (Throwable ignored) {}
+
+ // The fatal error handler implementation should make sure that this call is non-blocking
+ fatalErrorHandler.onFatalError(t);
}
// ------------------------------------------------------------------------
@@ -1183,7 +1164,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
@Override
public void handleError(Exception exception) {
- onFatalErrorAsync(exception);
+ onFatalError(exception);
}
}
@@ -1223,7 +1204,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
@Override
public void handleError(Throwable throwable) {
- onFatalErrorAsync(throwable);
+ onFatalError(throwable);
}
}
@@ -1245,7 +1226,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
@Override
public void onRegistrationFailure(Throwable failure) {
- onFatalErrorAsync(failure);
+ onFatalError(failure);
}
}
@@ -1270,7 +1251,11 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
@Override
public void notifyFatalError(String message, Throwable cause) {
- log.error(message, cause);
+ try {
+ log.error(message, cause);
+ } catch (Throwable ignored) {}
+
+ // The fatal error handler implementation should make sure that this call is non-blocking
fatalErrorHandler.onFatalError(cause);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a3df5a2c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
index 00b73a8..3e58da5 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
@@ -18,9 +18,7 @@
package org.apache.flink.yarn;
-import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
@@ -45,9 +43,4 @@ public class YarnClusterDescriptorV2 extends AbstractYarnClusterDescriptor {
protected String getYarnJobClusterEntrypoint() {
return YarnJobClusterEntrypoint.class.getName();
}
-
- @Override
- public YarnClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) {
- throw new UnsupportedOperationException("Cannot yet deploy a per-job yarn cluster.");
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a3df5a2c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index c3398c4..dd12fef 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -293,7 +293,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
@Override
public void onError(Throwable error) {
- onFatalErrorAsync(error);
+ onFatalError(error);
}
//Utility methods