You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/03 22:17:49 UTC

flink git commit: [FLINK-7444] [rpc] Make external calls non-blocking

Repository: flink
Updated Branches:
  refs/heads/master ab1fbfdfe -> a3df5a2ca


[FLINK-7444] [rpc] Make external calls non-blocking

Make all external calls from the RpcEndpoint's main thread non blocking by
executing them as a runnable in an Executor.

Make FatalErrorHandler calls non asynchronous

This closes #4540.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3df5a2c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3df5a2c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3df5a2c

Branch: refs/heads/master
Commit: a3df5a2ca52880b6681446d95f1d916a01f55681
Parents: ab1fbfd
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Aug 14 14:48:33 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Sep 4 00:17:23 2017 +0200

----------------------------------------------------------------------
 .../clusterframework/MesosResourceManager.java  |  4 +-
 .../flink/runtime/dispatcher/Dispatcher.java    | 22 ++----
 .../flink/runtime/jobmaster/JobMaster.java      | 25 +++----
 .../flink/runtime/minicluster/MiniCluster.java  | 27 ++++----
 .../resourcemanager/ResourceManager.java        | 73 ++++++++------------
 .../flink/runtime/rpc/FatalErrorHandler.java    | 11 +++
 .../runtime/taskexecutor/TaskExecutor.java      | 45 ++++--------
 .../flink/yarn/YarnClusterDescriptorV2.java     |  7 --
 .../apache/flink/yarn/YarnResourceManager.java  |  2 +-
 9 files changed, 89 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a3df5a2c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 9a2ad42..8a8f208 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -416,7 +416,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 			// tell the launch coordinator to launch the new tasks
 			launchCoordinator.tell(new LaunchCoordinator.Launch(Collections.singletonList((LaunchableTask) launchable)), selfActor);
 		} catch (Exception ex) {
-			onFatalErrorAsync(new ResourceManagerException("Unable to request new workers.", ex));
+			onFatalError(new ResourceManagerException("Unable to request new workers.", ex));
 		}
 	}
 
@@ -447,7 +447,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 			}
 		}
 		catch (Exception e) {
-			onFatalErrorAsync(new ResourceManagerException("Unable to release a worker.", e));
+			onFatalError(new ResourceManagerException("Unable to release a worker.", e));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a3df5a2c/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 29262cd..e7e1ec2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -390,48 +390,40 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
 		public void jobFinished(JobExecutionResult result) {
 			log.info("Job {} finished.", jobId);
 
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
+			runAsync(() -> {
 					try {
 						removeJob(jobId, true);
 					} catch (Exception e) {
 						log.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
 					}
-				}
-			});
+				});
 		}
 
 		@Override
 		public void jobFailed(Throwable cause) {
 			log.info("Job {} failed.", jobId);
 
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
+			runAsync(() -> {
 					try {
 						removeJob(jobId, true);
 					} catch (Exception e) {
 						log.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
 					}
-				}
-			});
+				});
 		}
 
 		@Override
 		public void jobFinishedByOther() {
 			log.info("Job {} was finished by other JobManager.", jobId);
 
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
+			runAsync(
+				() -> {
 					try {
 						removeJob(jobId, false);
 					} catch (Exception e) {
 						log.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
 					}
-				}
-			});
+				});
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a3df5a2c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 7e48da1..3e66a34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -877,20 +877,13 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 	//----------------------------------------------------------------------------------------------
 
 	private void handleFatalError(final Throwable cause) {
-		runAsync(new Runnable() {
-			@Override
-			public void run() {
-				log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause);
 
-				try {
-					shutDown();
-				} catch (Exception e) {
-					cause.addSuppressed(e);
-				}
+		try {
+			log.error("Fatal error occurred on JobManager.", cause);
+		} catch (Throwable ignore) {}
 
-				errorHandler.onFatalError(cause);
-			}
-		});
+		// The fatal error handler implementation should make sure that this call is non-blocking
+		errorHandler.onFatalError(cause);
 	}
 
 	private void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) {
@@ -910,7 +903,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 						Map<String, Object> accumulatorResults = executionGraph.getAccumulators();
 						JobExecutionResult result = new JobExecutionResult(jobID, 0L, accumulatorResults); 
 
-						jobCompletionActions.jobFinished(result);
+						executor.execute(() -> jobCompletionActions.jobFinished(result));
 					}
 					catch (Exception e) {
 						log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e);
@@ -920,7 +913,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 								"The job is registered as 'FINISHED (successful), but this notification describes " +
 								"a failure, since the resulting accumulators could not be fetched.", e);
 
-						jobCompletionActions.jobFailed(exception);
+						executor.execute(() ->jobCompletionActions.jobFailed(exception));
 					}
 					break;
 
@@ -928,7 +921,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 					final JobExecutionException exception = new JobExecutionException(
 						jobID, "Job was cancelled.", new Exception("The job was cancelled"));
 
-					jobCompletionActions.jobFailed(exception);
+					executor.execute(() -> jobCompletionActions.jobFailed(exception));
 					break;
 				}
 
@@ -936,7 +929,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 					final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
 					final JobExecutionException exception = new JobExecutionException(
 							jobID, "Job execution failed.", unpackedError);
-					jobCompletionActions.jobFailed(exception);
+					executor.execute(() -> jobCompletionActions.jobFailed(exception));
 					break;
 				}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a3df5a2c/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 2e36e9e..9a4a76a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -95,15 +95,13 @@ public class MiniCluster {
 	@GuardedBy("lock")
 	private ResourceManagerRunner[] resourceManagerRunners;
 
-	@GuardedBy("lock")
-	private TaskExecutor[] taskManagers;
+	private volatile TaskExecutor[] taskManagers;
 
 	@GuardedBy("lock")
 	private MiniClusterJobDispatcher jobDispatcher;
 
 	/** Flag marking the mini cluster as started/running */
-	@GuardedBy("lock")
-	private boolean running;
+	private volatile boolean running;
 
 	// ------------------------------------------------------------------------
 
@@ -150,6 +148,8 @@ public class MiniCluster {
 	@Deprecated
 	public MiniCluster(Configuration config, boolean singleRpcService) {
 		this(createConfig(config, singleRpcService));
+
+		running = false;
 	}
 
 	// ------------------------------------------------------------------------
@@ -645,17 +645,18 @@ public class MiniCluster {
 
 		@Override
 		public void onFatalError(Throwable exception) {
-			LOG.error("TaskManager #{} failed.", index, exception);
+			// first check if we are still running
+			if (running) {
+				LOG.error("TaskManager #{} failed.", index, exception);
 
-			try {
-				synchronized (lock) {
-					// note: if not running (after shutdown) taskManagers may be null!
-					if (running && taskManagers[index] != null) {
-						taskManagers[index].shutDown();
-					}
+				// let's check if there are still TaskManagers because there could be a concurrent
+				// shut down operation taking place
+				TaskExecutor[] currentTaskManagers = taskManagers;
+
+				if (currentTaskManagers != null) {
+					// the shutDown is asynchronous
+					currentTaskManagers[index].shutDown();
 				}
-			} catch (Exception e) {
-				LOG.error("TaskManager #{} could not be properly terminated.", index, e);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a3df5a2c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index a9a9e50..6b2c898 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -35,24 +35,23 @@ import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.registration.JobManagerRegistration;
 import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceManagerActions;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.jobmaster.JobMaster;
-import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.registration.RegistrationResponse;
-
 import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
@@ -76,7 +75,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * ResourceManager implementation. The resource manager is responsible for resource de-/allocation
  * and bookkeeping.
  *
- * It offers the following methods as part of its rpc interface to interact with him remotely:
+ * <p>It offers the following methods as part of its rpc interface to interact with him remotely:
  * <ul>
  *     <li>{@link #registerJobManager(UUID, UUID, ResourceID, String, JobID, Time)} registers a {@link JobMaster} at the resource manager</li>
  *     <li>{@link #requestSlot(UUID, UUID, SlotRequest, Time)} requests a slot from the resource manager</li>
@@ -88,10 +87,10 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 
 	public static final String RESOURCE_MANAGER_NAME = "resourcemanager";
 
-	/** Unique id of the resource manager */
+	/** Unique id of the resource manager. */
 	private final ResourceID resourceId;
 
-	/** Configuration of the resource manager */
+	/** Configuration of the resource manager. */
 	private final ResourceManagerConfiguration resourceManagerConfiguration;
 
 	/** All currently registered JobMasterGateways scoped by JobID. */
@@ -100,7 +99,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	/** All currently registered JobMasterGateways scoped by ResourceID. */
 	private final Map<ResourceID, JobManagerRegistration> jmResourceIdRegistrations;
 
-	/** Service to retrieve the job leader ids */
+	/** Service to retrieve the job leader ids. */
 	private final JobLeaderIdService jobLeaderIdService;
 
 	/** All currently registered TaskExecutors with there framework specific worker information. */
@@ -115,13 +114,13 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	/** The heartbeat manager with job managers. */
 	private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
 
-	/** Registry to use for metrics */
+	/** Registry to use for metrics. */
 	private final MetricRegistry metricRegistry;
 
-	/** Fatal error handler */
+	/** Fatal error handler. */
 	private final FatalErrorHandler fatalErrorHandler;
 
-	/** The slot manager maintains the available slots */
+	/** The slot manager maintains the available slots. */
 	private final SlotManager slotManager;
 
 	/** The service to elect a ResourceManager leader. */
@@ -268,7 +267,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 					ResourceManagerException exception = new ResourceManagerException("Could not add the job " +
 						jobId + " to the job id leader service.", e);
 
-					onFatalErrorAsync(exception);
+					onFatalError(exception);
 
 					log.error("Could not add job {} to job leader id service.", jobId, e);
 					return FutureUtils.completedExceptionally(exception);
@@ -287,7 +286,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 				ResourceManagerException exception = new ResourceManagerException("Cannot obtain the " +
 					"job leader id future to verify the correct job leader.", e);
 
-				onFatalErrorAsync(exception);
+				onFatalError(exception);
 
 				log.debug("Could not obtain the job leader id future to verify the correct job leader.");
 				return FutureUtils.completedExceptionally(exception);
@@ -346,7 +345,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	}
 
 	/**
-	 * Register a {@link TaskExecutor} at the resource manager
+	 * Register a {@link TaskExecutor} at the resource manager.
 	 *
 	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader
 	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
@@ -454,7 +453,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	}
 
 	/**
-	 * Notification from a TaskExecutor that a slot has become available
+	 * Notification from a TaskExecutor that a slot has become available.
+	 *
 	 * @param resourceManagerLeaderId TaskExecutor's resource manager leader id
 	 * @param instanceID TaskExecutor's instance id
 	 * @param slotId The slot id of the available slot
@@ -491,13 +491,13 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	}
 
 	/**
-	 * Registers an info message listener
+	 * Registers an info message listener.
 	 *
 	 * @param address address of infoMessage listener to register to this resource manager
 	 */
 	@Override
 	public void registerInfoMessageListener(final String address) {
-		if(infoMessageListeners.containsKey(address)) {
+		if (infoMessageListeners.containsKey(address)) {
 			log.warn("Receive a duplicate registration from info message listener on ({})", address);
 		} else {
 			CompletableFuture<InfoMessageListenerRpcGateway> infoMessageListenerRpcGatewayFuture = getRpcService()
@@ -517,7 +517,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	}
 
 	/**
-	 * Unregisters an info message listener
+	 * Unregisters an info message listener.
 	 *
 	 * @param address of the  info message listener to unregister from this resource manager
 	 *
@@ -528,7 +528,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	}
 
 	/**
-	 * Cleanup application and shut down cluster
+	 * Cleanup application and shut down cluster.
 	 *
 	 * @param finalStatus of the Flink application
 	 * @param optionalDiagnostics for the Flink application
@@ -825,28 +825,15 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 
 	/**
 	 * Notifies the ResourceManager that a fatal error has occurred and it cannot proceed.
-	 * This method should be used when asynchronous threads want to notify the
-	 * ResourceManager of a fatal error.
-	 *
-	 * @param t The exception describing the fatal error
-	 */
-	protected void onFatalErrorAsync(final Throwable t) {
-		runAsync(new Runnable() {
-			@Override
-			public void run() {
-				onFatalError(t);
-			}
-		});
-	}
-
-	/**
-	 * Notifies the ResourceManager that a fatal error has occurred and it cannot proceed.
-	 * This method must only be called from within the ResourceManager's main thread.
 	 *
 	 * @param t The exception describing the fatal error
 	 */
 	protected void onFatalError(Throwable t) {
-		log.error("Fatal error occurred.", t);
+		try {
+			log.error("Fatal error occurred in ResourceManager.", t);
+		} catch (Throwable ignored) {}
+
+		// The fatal error handler implementation should make sure that this call is non-blocking
 		fatalErrorHandler.onFatalError(t);
 	}
 
@@ -855,7 +842,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Callback method when current resourceManager is granted leadership
+	 * Callback method when current resourceManager is granted leadership.
 	 *
 	 * @param newLeaderSessionID unique leadershipID
 	 */
@@ -904,13 +891,13 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	}
 
 	/**
-	 * Handles error occurring in the leader election service
+	 * Handles error occurring in the leader election service.
 	 *
 	 * @param exception Exception being thrown in the leader election service
 	 */
 	@Override
 	public void handleError(final Exception exception) {
-		onFatalErrorAsync(new ResourceManagerException("Received an error from the LeaderElectionService.", exception));
+		onFatalError(new ResourceManagerException("Received an error from the LeaderElectionService.", exception));
 	}
 
 	// ------------------------------------------------------------------------
@@ -928,7 +915,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	 * The framework specific code for shutting down the application. This should report the
 	 * application's final status and shut down the resource manager cleanly.
 	 *
-	 * This method also needs to make sure all pending containers that are not registered
+	 * <p>This method also needs to make sure all pending containers that are not registered
 	 * yet are returned.
 	 *
 	 * @param finalStatus The application status to report.
@@ -1029,7 +1016,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 
 		@Override
 		public void handleError(Throwable error) {
-			onFatalErrorAsync(error);
+			onFatalError(error);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a3df5a2c/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java
index 7721117..dbccaa8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java
@@ -18,7 +18,18 @@
 
 package org.apache.flink.runtime.rpc;
 
+/**
+ * Handler for fatal errors.
+ */
 public interface FatalErrorHandler {
 
+	/**
+	 * Being called when a fatal error occurs.
+	 *
+	 * <p>IMPORTANT: This call should never be blocking since it might be called from within
+	 * the main thread of an {@link RpcEndpoint}.
+	 *
+	 * @param exception cause
+	 */
 	void onFatalError(Throwable exception);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a3df5a2c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index a5ce84b..21bdeec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -231,7 +231,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		try {
 			haServices.getResourceManagerLeaderRetriever().start(new ResourceManagerLeaderListener());
 		} catch (Exception e) {
-			onFatalErrorAsync(e);
+			onFatalError(e);
 		}
 
 		// tell the task slot table who's responsible for the task slot actions
@@ -1117,35 +1117,16 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 	/**
 	 * Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed.
-	 * This method should be used when asynchronous threads want to notify the
-	 * TaskExecutor of a fatal error.
-	 *
-	 * @param t The exception describing the fatal error
-	 */
-	void onFatalErrorAsync(final Throwable t) {
-		runAsync(new Runnable() {
-			@Override
-			public void run() {
-				onFatalError(t);
-			}
-		});
-	}
-
-	/**
-	 * Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed.
-	 * This method must only be called from within the TaskExecutor's main thread.
 	 *
 	 * @param t The exception describing the fatal error
 	 */
 	void onFatalError(final Throwable t) {
-		log.error("Fatal error occurred.", t);
-		// this could potentially be a blocking call -> call asynchronously:
-		getRpcService().execute(new Runnable() {
-			@Override
-			public void run() {
-				fatalErrorHandler.onFatalError(t);
-			}
-		});
+		try {
+			log.error("Fatal error occurred in TaskExecutor.", t);
+		} catch (Throwable ignored) {}
+
+		// The fatal error handler implementation should make sure that this call is non-blocking
+		fatalErrorHandler.onFatalError(t);
 	}
 
 	// ------------------------------------------------------------------------
@@ -1183,7 +1164,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 		@Override
 		public void handleError(Exception exception) {
-			onFatalErrorAsync(exception);
+			onFatalError(exception);
 		}
 	}
 
@@ -1223,7 +1204,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 		@Override
 		public void handleError(Throwable throwable) {
-			onFatalErrorAsync(throwable);
+			onFatalError(throwable);
 		}
 	}
 
@@ -1245,7 +1226,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 		@Override
 		public void onRegistrationFailure(Throwable failure) {
-			onFatalErrorAsync(failure);
+			onFatalError(failure);
 		}
 	}
 
@@ -1270,7 +1251,11 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 		@Override
 		public void notifyFatalError(String message, Throwable cause) {
-			log.error(message, cause);
+			try {
+				log.error(message, cause);
+			} catch (Throwable ignored) {}
+
+			// The fatal error handler implementation should make sure that this call is non-blocking
 			fatalErrorHandler.onFatalError(cause);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a3df5a2c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
index 00b73a8..3e58da5 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
@@ -18,9 +18,7 @@
 
 package org.apache.flink.yarn;
 
-import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
 import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
 
@@ -45,9 +43,4 @@ public class YarnClusterDescriptorV2 extends AbstractYarnClusterDescriptor {
 	protected String getYarnJobClusterEntrypoint() {
 		return YarnJobClusterEntrypoint.class.getName();
 	}
-
-	@Override
-	public YarnClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) {
-		throw new UnsupportedOperationException("Cannot yet deploy a per-job yarn cluster.");
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a3df5a2c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index c3398c4..dd12fef 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -293,7 +293,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 
 	@Override
 	public void onError(Throwable error) {
-		onFatalErrorAsync(error);
+		onFatalError(error);
 	}
 
 	//Utility methods