You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:45:54 UTC

[58/82] [abbrv] incubator-flink git commit: Reworked local cluster start. TaskManager watches JobManager and tries reregistration in case of disconnect. Introduced akka.ask.timeout config parameter to configure akka timeouts.

Reworked local cluster start. TaskManager watches JobManager and tries reregistration in case of disconnect. Introduced akka.ask.timeout config parameter to configure akka timeouts.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b8d0a0aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b8d0a0aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b8d0a0aa

Branch: refs/heads/master
Commit: b8d0a0aadaed012ccb93176dcc21acbdbd005eea
Parents: 26c7794
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 12 19:48:36 2014 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 18 18:58:31 2014 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 16 +++++--
 .../org/apache/flink/client/program/Client.java |  9 +++-
 .../flink/client/web/JobsInfoServlet.java       | 11 ++++-
 .../flink/client/web/WebInterfaceServer.java    |  5 ++-
 .../apache/flink/client/program/ClientTest.java |  6 ++-
 .../flink/configuration/ConfigConstants.java    |  7 +++
 flink-dist/src/main/flink-bin/bin/jobmanager.sh |  2 +-
 .../src/main/flink-bin/bin/start-local.bat      |  2 +-
 .../flink/runtime/executiongraph/Execution.java | 11 ++++-
 .../runtime/executiongraph/ExecutionGraph.java  |  1 -
 .../runtime/io/network/ChannelManager.java      | 10 ++++-
 .../jobmanager/web/JobmanagerInfoServlet.java   | 26 ++++++-----
 .../jobmanager/web/SetupInfoServlet.java        | 11 +++--
 .../runtime/jobmanager/web/WebInfoServer.java   | 15 +++++--
 .../taskmanager/TaskInputSplitProvider.java     |  9 +++-
 .../apache/flink/runtime/akka/AkkaUtils.scala   | 37 ++++++---------
 .../apache/flink/runtime/client/JobClient.scala | 18 ++++----
 .../flink/runtime/jobmanager/JobManager.scala   | 47 +++++++++++++++-----
 .../jobmanager/JobManagerCLIConfiguration.scala |  8 +++-
 .../runtime/jobmanager/WithWebServer.scala      |  2 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  | 15 ++++---
 .../minicluster/LocalFlinkMiniCluster.scala     | 36 ---------------
 .../flink/runtime/taskmanager/TaskManager.scala | 30 +++++++++----
 .../taskmanager/TaskManagerConfiguration.scala  |  5 ++-
 .../runtime/taskmanager/TaskManagerTest.java    | 17 ++++---
 .../runtime/jobmanager/JobManagerITCase.scala   |  2 +
 .../test/cancelling/CancellingTestBase.java     |  7 ++-
 27 files changed, 221 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 85ab07a..a4d1ac6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -33,6 +33,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -63,6 +64,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
 import org.apache.flink.util.StringUtils;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Implementation of a simple command line fronted for executing programs.
@@ -511,7 +513,7 @@ public class CliFrontend {
 			}
 
 			Iterable<ExecutionGraph> jobs = AkkaUtils.<RunningJobs>ask(jobManager,
-					RequestRunningJobs$.MODULE$).asJavaIterable();
+					RequestRunningJobs$.MODULE$, getAkkaTimeout()).asJavaIterable();
 
 			ArrayList<ExecutionGraph> runningJobs = null;
 			ArrayList<ExecutionGraph> scheduledJobs = null;
@@ -632,7 +634,7 @@ public class CliFrontend {
 				return 1;
 			}
 
-			AkkaUtils.ask(jobManager, new CancelJob(jobId));
+			AkkaUtils.ask(jobManager, new CancelJob(jobId), getAkkaTimeout());
 			return 0;
 		}
 		catch (Throwable t) {
@@ -756,7 +758,8 @@ public class CliFrontend {
 		}
 
 		return JobManager.getJobManager(jobManagerAddress,
-				ActorSystem.create("CliFrontendActorSystem", AkkaUtils.getDefaultActorSystemConfig()));
+				ActorSystem.create("CliFrontendActorSystem", AkkaUtils
+						.getDefaultActorSystemConfig()),getAkkaTimeout());
 	}
 	
 	
@@ -815,6 +818,13 @@ public class CliFrontend {
 		}
 		return GlobalConfiguration.getConfiguration();
 	}
+
+	protected FiniteDuration getAkkaTimeout(){
+		Configuration config = getGlobalConfiguration();
+
+		return new FiniteDuration(config.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
+				ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS);
+	}
 	
 	public static List<Tuple2<String, String>> getDynamicProperties(String dynamicPropertiesEncoded) {
 		List<Tuple2<String, String>> ret = new ArrayList<Tuple2<String, String>>();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index d2c9983..203f294 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -53,6 +54,7 @@ import com.google.common.base.Preconditions;
 
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Encapsulates the functionality necessary to submit a program to a remote cluster.
@@ -301,12 +303,15 @@ public class Client {
 		String hostname = configuration.getString(ConfigConstants
 				.JOB_MANAGER_IPC_ADDRESS_KEY, null);
 
+		FiniteDuration timeout = new FiniteDuration(configuration.getInteger(ConfigConstants
+				.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS);
+
 		if(hostname == null){
 			throw new ProgramInvocationException("Could not find hostname of job manager.");
 		}
 
 		try {
-			JobClient.uploadJarFiles(jobGraph, hostname, client);
+			JobClient.uploadJarFiles(jobGraph, hostname, client, timeout);
 		}catch(IOException e){
 			throw new ProgramInvocationException("Could not upload blobs.", e);
 		}
@@ -317,7 +322,7 @@ public class Client {
 				return JobClient.submitJobAndWait(jobGraph, printStatusDuringExecution, client);
 			}
 			else {
-				SubmissionResponse response =JobClient.submitJobDetached(jobGraph, client);
+				SubmissionResponse response =JobClient.submitJobDetached(jobGraph, client, timeout);
 
 				if(response instanceof SubmissionFailure){
 					SubmissionFailure failure = (SubmissionFailure) response;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java
index ba7f112..e339ec7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
 
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
+import scala.concurrent.duration.FiniteDuration;
 
 
 public class JobsInfoServlet extends HttpServlet {
@@ -51,11 +53,15 @@ public class JobsInfoServlet extends HttpServlet {
 	private final Configuration config;
 
 	private final ActorSystem system;
+
+	private final FiniteDuration timeout;
 	
 	public JobsInfoServlet(Configuration flinkConfig) {
 		this.config = flinkConfig;
 		system = ActorSystem.create("JobsInfoServletActorSystem",
 				AkkaUtils.getDefaultActorSystemConfig());
+		this.timeout = new FiniteDuration(flinkConfig.getInteger(ConfigConstants
+				.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS);
 	}
 
 	@Override
@@ -67,10 +73,11 @@ public class JobsInfoServlet extends HttpServlet {
 			int jmPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
 					ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
 
-			ActorRef jm = JobManager.getJobManager(new InetSocketAddress(jmHost, jmPort), system);
+			ActorRef jm = JobManager.getJobManager(new InetSocketAddress(jmHost, jmPort), system,
+					timeout);
 
 			Iterator<ExecutionGraph> graphs = AkkaUtils.<RunningJobs>ask(jm,
-					RequestRunningJobs$.MODULE$).asJavaIterable().iterator();
+					RequestRunningJobs$.MODULE$, timeout).asJavaIterable().iterator();
 
 
 			resp.setStatus(HttpServletResponse.SC_OK);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java b/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
index 1de21ff..ad7b6d4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
@@ -95,7 +95,7 @@ public class WebInterfaceServer {
 			throw new FileNotFoundException("Cannot start web interface server because the web " +
 					"root dir " + WEB_ROOT_DIR + " is not included in the jar.");
 		}
-		
+
 		String tmpDirPath = config.getString(ConfigConstants.WEB_TMP_DIR_KEY,
 			ConfigConstants.DEFAULT_WEB_TMP_DIR);
 		
@@ -155,7 +155,8 @@ public class WebInterfaceServer {
 		ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS);
 		servletContext.setContextPath("/");
 		servletContext.addServlet(new ServletHolder(new PactJobJSONServlet(uploadDir)), "/pactPlan");
-		servletContext.addServlet(new ServletHolder(new JobsInfoServlet(nepheleConfig)), "/jobsInfo");
+		servletContext.addServlet(new ServletHolder(new JobsInfoServlet(nepheleConfig)),
+				"/jobsInfo");
 		servletContext.addServlet(new ServletHolder(new PlanDisplayServlet(jobManagerWebPort)), "/showPlan");
 		servletContext.addServlet(new ServletHolder(new JobsServlet(uploadDir, tmpDir, "launch.html")), "/jobs");
 		servletContext.addServlet(new ServletHolder(new JobSubmissionServlet(nepheleConfig, uploadDir, planDumpDir)),

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 7f81ccd..aa7f4aa 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -43,8 +43,10 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.reflect.Whitebox;
 import scala.Tuple2;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
@@ -105,7 +107,7 @@ public class ClientTest {
 	@Test
 	public void shouldSubmitToJobClient() throws ProgramInvocationException, IOException {
 		when(mockJobClient.submitJobDetached(any(JobGraph.class),
-				any(ActorRef.class))).thenReturn(mockSubmissionSuccess);
+				any(ActorRef.class), any(FiniteDuration.class))).thenReturn(mockSubmissionSuccess);
 
 		Client out = new Client(configMock, getClass().getClassLoader());
 		out.run(program.getPlanWithJars(), -1, false);
@@ -118,7 +120,7 @@ public class ClientTest {
 	@Test(expected = ProgramInvocationException.class)
 	public void shouldThrowException() throws Exception {
 		when(mockJobClient.submitJobDetached(any(JobGraph.class),
-				any(ActorRef.class))).thenReturn(mockSubmissionFailure);
+				any(ActorRef.class), any(FiniteDuration.class))).thenReturn(mockSubmissionFailure);
 
 		Client out = new Client(configMock, getClass().getClassLoader());
 		out.run(program.getPlanWithJars(), -1, false);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 4e87581..047cfd1 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -346,6 +346,11 @@ public final class ConfigConstants {
 	 * Log level for akka
 	 */
 	public static final String AKKA_LOG_LEVEL = "akka.loglevel";
+
+	/**
+	 * Timeout for all blocking calls
+	 */
+	public static final String AKKA_ASK_TIMEOUT = "akka.ask.timeout";
 	
 	// ----------------------------- Miscellaneous ----------------------------
 	
@@ -594,6 +599,8 @@ public final class ConfigConstants {
 	public static String DEFAULT_AKKA_FRAMESIZE = "10485760b";
 
 	public static String DEFAULT_AKKA_LOG_LEVEL = "OFF";
+
+	public static int DEFAULT_AKKA_ASK_TIMEOUT = 100;
 	
 
 	// ----------------------------- LocalExecution ----------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-dist/src/main/flink-bin/bin/jobmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
index 78c9500..70dde8a 100755
--- a/flink-dist/src/main/flink-bin/bin/jobmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
@@ -80,7 +80,7 @@ case $STARTSTOP in
         rotateLogFile $out
 
         echo Starting job manager
-        $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "$FLINK_JM_CLASSPATH" org.apache.flink.runtime.jobmanager.JobManager --configDir "$FLINK_CONF_DIR"  > "$out" 2>&1 < /dev/null &
+        $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "$FLINK_JM_CLASSPATH" org.apache.flink.runtime.jobmanager.JobManager --executionMode $EXECUTIONMODE --configDir "$FLINK_CONF_DIR"  > "$out" 2>&1 < /dev/null &
         echo $! > $pid
     ;;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-dist/src/main/flink-bin/bin/start-local.bat
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-local.bat b/flink-dist/src/main/flink-bin/bin/start-local.bat
index 05d14e5..374254f 100644
--- a/flink-dist/src/main/flink-bin/bin/start-local.bat
+++ b/flink-dist/src/main/flink-bin/bin/start-local.bat
@@ -57,6 +57,6 @@ if not defined FOUND (
 echo Starting Flink job manager. Webinterface by default on http://localhost:8081/.
 echo Don't close this batch window. Stop job manager by pressing Ctrl+C.
 
-java %JVM_ARGS% %log_setting% -cp %FLINK_JM_CLASSPATH% org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster --configDir %FLINK_CONF_DIR%  > "%out%"  2>&1
+java %JVM_ARGS% %log_setting% -cp %FLINK_JM_CLASSPATH% org.apache.flink.runtime.jobmanager.JobManager --executionMode local --configDir %FLINK_CONF_DIR%  > "%out%"  2>&1
 
 endlocal

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index a42faf3..2024d0a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -27,10 +27,13 @@ import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
 import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
 import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
 
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import akka.dispatch.OnComplete;
 import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -51,6 +54,7 @@ import org.slf4j.Logger;
 
 import com.google.common.base.Preconditions;
 import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times (for recovery,
@@ -78,6 +82,9 @@ public class Execution {
 	private static final Logger LOG = ExecutionGraph.LOG;
 	
 	private static final int NUM_CANCEL_CALL_TRIES = 3;
+
+	public static FiniteDuration timeout = new FiniteDuration(ConfigConstants
+			.DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS);
 	
 	// --------------------------------------------------------------------------------------------
 	
@@ -273,7 +280,7 @@ public class Execution {
 
 			Instance instance = slot.getInstance();
 			Future<Object> deployAction = Patterns.ask(instance.getTaskManager(),
-					new TaskManagerMessages.SubmitTask(deployment),AkkaUtils.FUTURE_TIMEOUT());
+					new TaskManagerMessages.SubmitTask(deployment), new Timeout(timeout));
 
 			deployAction.onComplete(new OnComplete<Object>(){
 
@@ -583,7 +590,7 @@ public class Execution {
 
 		Future<Object> cancelResult = AkkaUtils.retry(slot.getInstance().getTaskManager(), new
 				TaskManagerMessages.CancelTask(attemptId), NUM_CANCEL_CALL_TRIES,
-				AkkaUtils.globalExecutionContext());
+				AkkaUtils.globalExecutionContext(), timeout);
 
 		cancelResult.onComplete(new OnComplete<Object>(){
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 99053ea..4e6a56b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -103,7 +103,6 @@ public class ExecutionGraph {
 	
 	private final long[] stateTimestamps;
 	
-	
 	private final Object progressLock = new Object();
 	
 	private int nextVertexToFinish;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
index 9e3b23c..cd07cc3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.util.ExceptionUtils;
+import scala.concurrent.duration.FiniteDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,14 +79,19 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 	
 	private final DiscardBufferPool discardBufferPool;
 
+	private final FiniteDuration timeout;
+
 	// -----------------------------------------------------------------------------------------------------------------
 
 	public ChannelManager(ActorRef channelLookup, InstanceConnectionInfo connectionInfo, int numNetworkBuffers,
-						int networkBufferSize, NetworkConnectionManager networkConnectionManager) throws IOException {
+						int networkBufferSize, NetworkConnectionManager networkConnectionManager,
+						FiniteDuration timeout) throws IOException {
 
 		this.channelLookup= channelLookup;
 		this.connectionInfo = connectionInfo;
 
+		this.timeout = timeout;
+
 		try {
 			this.globalBufferPool = new GlobalBufferPool(numNetworkBuffers, networkBufferSize);
 		} catch (Throwable e) {
@@ -378,7 +384,7 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 				try{
 					lookupResponse = AkkaUtils.<JobManagerMessages.ConnectionInformation>ask(channelLookup,
 							new JobManagerMessages.LookupConnectionInformation(connectionInfo, jobID,
-									sourceChannelID)).response();
+									sourceChannelID), timeout).response();
 				}catch(IOException ioe) {
 					throw ioe;
 				}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
index f52da0d..e1afa7a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
@@ -64,6 +64,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.StringUtils;
 import org.eclipse.jetty.io.EofException;
+import scala.concurrent.duration.FiniteDuration;
 
 
 public class JobmanagerInfoServlet extends HttpServlet {
@@ -75,11 +76,13 @@ public class JobmanagerInfoServlet extends HttpServlet {
 	/** Underlying JobManager */
 	private final ActorRef jobmanager;
 	private final ActorRef archive;
+	private final FiniteDuration timeout;
 	
 	
-	public JobmanagerInfoServlet(ActorRef jobmanager, ActorRef archive) {
+	public JobmanagerInfoServlet(ActorRef jobmanager, ActorRef archive, FiniteDuration timeout) {
 		this.jobmanager = jobmanager;
 		this.archive = archive;
+		this.timeout = timeout;
 	}
 	
 	
@@ -92,14 +95,15 @@ public class JobmanagerInfoServlet extends HttpServlet {
 		try {
 			if("archive".equals(req.getParameter("get"))) {
 				List<ExecutionGraph> archivedJobs = new ArrayList<ExecutionGraph>(AkkaUtils
-						.<ArchivedJobs>ask(archive,RequestArchivedJobs$.MODULE$).asJavaCollection());
+						.<ArchivedJobs>ask(archive,RequestArchivedJobs$.MODULE$, timeout)
+						.asJavaCollection());
 
 				writeJsonForArchive(resp.getWriter(), archivedJobs);
 			}
 			else if("job".equals(req.getParameter("get"))) {
 				String jobId = req.getParameter("job");
 				JobResponse response = AkkaUtils.ask(archive,
-						new RequestJob(JobID.fromHexString(jobId)));
+						new RequestJob(JobID.fromHexString(jobId)), timeout);
 
 				if(response instanceof JobFound){
 					ExecutionGraph archivedJob = ((JobFound)response).executionGraph();
@@ -113,7 +117,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
 				String groupvertexId = req.getParameter("groupvertex");
 
 				JobResponse response = AkkaUtils.ask(archive,
-						new RequestJob(JobID.fromHexString(jobId)));
+						new RequestJob(JobID.fromHexString(jobId)), timeout);
 
 				if(response instanceof JobFound && groupvertexId != null){
 					ExecutionGraph archivedJob = ((JobFound)response).executionGraph();
@@ -126,9 +130,9 @@ public class JobmanagerInfoServlet extends HttpServlet {
 			}
 			else if("taskmanagers".equals(req.getParameter("get"))) {
 				int numberOfTaskManagers = AkkaUtils.<Integer>ask(jobmanager,
-						RequestNumberRegisteredTaskManager$.MODULE$);
+						RequestNumberRegisteredTaskManager$.MODULE$, timeout);
 				int numberOfRegisteredSlots = AkkaUtils.<Integer>ask(jobmanager,
-						RequestTotalNumberOfSlots$.MODULE$);
+						RequestTotalNumberOfSlots$.MODULE$, timeout);
 
 				resp.getWriter().write("{\"taskmanagers\": " + numberOfTaskManagers +", " +
 						"\"slots\": "+numberOfRegisteredSlots+"}");
@@ -136,7 +140,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
 			else if("cancel".equals(req.getParameter("get"))) {
 				String jobId = req.getParameter("job");
 				AkkaUtils.<CancellationResponse>ask(jobmanager,
-						new CancelJob(JobID.fromHexString(jobId)));
+						new CancelJob(JobID.fromHexString(jobId)), timeout);
 			}
 			else if("updates".equals(req.getParameter("get"))) {
 				String jobId = req.getParameter("job");
@@ -146,7 +150,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
 			}
 			else{
 				Iterable<ExecutionGraph> runningJobs = AkkaUtils.<RunningJobs>ask
-						(jobmanager, RequestRunningJobs$.MODULE$).asJavaIterable();
+						(jobmanager, RequestRunningJobs$.MODULE$, timeout).asJavaIterable();
 				writeJsonForJobs(resp.getWriter(), runningJobs);
 			}
 			
@@ -324,7 +328,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
 			
 			// write accumulators
 			AccumulatorResultsResponse response = AkkaUtils.ask(jobmanager,
-					new RequestAccumulatorResults(graph.getJobID()));
+					new RequestAccumulatorResults(graph.getJobID()), timeout);
 
 			if(response instanceof AccumulatorResultsFound){
 				Map<String, Object> accMap = ((AccumulatorResultsFound)response).asJavaMap();
@@ -417,7 +421,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
 		
 		try {
 			Iterable<ExecutionGraph> graphs = AkkaUtils.<RunningJobs>ask(jobmanager,
-					RequestRunningJobs$.MODULE$).asJavaIterable();
+					RequestRunningJobs$.MODULE$, timeout).asJavaIterable();
 			
 			//Serialize job to json
 			wrt.write("{");
@@ -439,7 +443,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
 
 			wrt.write("],");
 
-			JobResponse response = AkkaUtils.ask(jobmanager, new RequestJob(jobId));
+			JobResponse response = AkkaUtils.ask(jobmanager, new RequestJob(jobId), timeout);
 
 			if(response instanceof JobFound){
 				ExecutionGraph graph = ((JobFound)response).executionGraph();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
index c0308cb..9e0a55b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
@@ -46,6 +46,7 @@ import org.codehaus.jettison.json.JSONObject;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * A Servlet that displays the Configuration in the web interface.
@@ -59,13 +60,15 @@ public class SetupInfoServlet extends HttpServlet {
 	private static final Logger LOG = LoggerFactory.getLogger(SetupInfoServlet.class);
 	
 	
-	private Configuration globalC;
-	private ActorRef jobmanager;
+	final private Configuration globalC;
+	final private ActorRef jobmanager;
+	final private FiniteDuration timeout;
 	
 	
-	public SetupInfoServlet(ActorRef jm) {
+	public SetupInfoServlet(ActorRef jm, FiniteDuration timeout) {
 		globalC = GlobalConfiguration.getConfiguration();
 		this.jobmanager = jm;
+		this.timeout = timeout;
 	}
 	
 	@Override
@@ -104,7 +107,7 @@ public class SetupInfoServlet extends HttpServlet {
 	private void writeTaskmanagers(HttpServletResponse resp) throws IOException {
 
 		List<Instance> instances = new ArrayList<Instance>(AkkaUtils.<RegisteredTaskManagers>ask
-				(jobmanager, RequestRegisteredTaskManagers$.MODULE$).asJavaCollection());
+				(jobmanager, RequestRegisteredTaskManagers$.MODULE$, timeout).asJavaCollection());
 
 		Collections.sort(instances, INSTANCE_SORTER);
 				

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index c397a30..7e67603 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -40,6 +40,7 @@ import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.handler.HandlerList;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
+import scala.concurrent.duration.FiniteDuration;
 
 
 /**
@@ -64,6 +65,11 @@ public class WebInfoServer {
 	private final Server server;
 
 	/**
+	 * Timeout for akka requests
+	 */
+	private final FiniteDuration timeout;
+
+	/**
 	 * Port for info server
 	 */
 	private int port;
@@ -78,10 +84,12 @@ public class WebInfoServer {
 	 *         Thrown, if the server setup failed for an I/O related reason.
 	 */
 	public WebInfoServer(Configuration config, ActorRef jobmanager,
-						ActorRef archive) throws IOException {
+						ActorRef archive, FiniteDuration timeout) throws IOException {
 		this.port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
 				ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
 
+		this.timeout = timeout;
+
 		// if no explicit configuration is given, use the global configuration
 		if (config == null) {
 			config = GlobalConfiguration.getConfiguration();
@@ -122,9 +130,10 @@ public class WebInfoServer {
 		ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS);
 		servletContext.setContextPath("/");
 		servletContext.addServlet(new ServletHolder(new JobmanagerInfoServlet(jobmanager,
-				archive)), "/jobsInfo");
+				archive, timeout)), "/jobsInfo");
 		servletContext.addServlet(new ServletHolder(new LogfileInfoServlet(logDirFiles)), "/logInfo");
-		servletContext.addServlet(new ServletHolder(new SetupInfoServlet(jobmanager)), "/setupInfo");
+		servletContext.addServlet(new ServletHolder(new SetupInfoServlet(jobmanager, timeout)),
+				"/setupInfo");
 		servletContext.addServlet(new ServletHolder(new MenuServlet()), "/menu");
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
index 8049004..669f94c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
+import scala.concurrent.duration.FiniteDuration;
 
 public class TaskInputSplitProvider implements InputSplitProvider {
 
@@ -34,18 +35,22 @@ public class TaskInputSplitProvider implements InputSplitProvider {
 	private final JobID jobId;
 	
 	private final JobVertexID vertexId;
+
+	private final FiniteDuration timeout;
 	
-	public TaskInputSplitProvider(ActorRef jobManager, JobID jobId, JobVertexID vertexId) {
+	public TaskInputSplitProvider(ActorRef jobManager, JobID jobId, JobVertexID vertexId,
+								  FiniteDuration timeout) {
 		this.jobManager = jobManager;
 		this.jobId = jobId;
 		this.vertexId = vertexId;
+		this.timeout = timeout;
 	}
 
 	@Override
 	public InputSplit getNextInputSplit() {
 		try {
 			TaskManagerMessages.NextInputSplit nextInputSplit = AkkaUtils.ask(jobManager,
-					new JobManagerMessages.RequestNextInputSplit(jobId, vertexId));
+					new JobManagerMessages.RequestNextInputSplit(jobId, vertexId), timeout);
 
 			return nextInputSplit.inputSplit();
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index ce87e0e..f931497 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -30,9 +30,7 @@ import scala.concurrent.{ExecutionContext, Future, Await}
 import scala.concurrent.duration._
 
 object AkkaUtils {
-  implicit val FUTURE_TIMEOUT: Timeout = 100 minute
-  implicit val AWAIT_DURATION: FiniteDuration = 1 minute
-  implicit val FUTURE_DURATION: FiniteDuration = 1 minute
+  val DEFAULT_TIMEOUT: FiniteDuration = 1 minute
 
   val INF_TIMEOUT = 21474835 seconds
 
@@ -122,34 +120,27 @@ object AkkaUtils {
     ConfigFactory.parseString(getDefaultActorSystemConfigString)
   }
 
-  def getChild(parent: ActorRef, child: String)(implicit system: ActorSystem): ActorRef = {
-    Await.result(system.actorSelection(parent.path / child).resolveOne(), AWAIT_DURATION)
+  def getChild(parent: ActorRef, child: String)(implicit system: ActorSystem, timeout:
+  FiniteDuration): ActorRef = {
+    Await.result(system.actorSelection(parent.path / child).resolveOne()(timeout), timeout)
   }
 
-  def getReference(path: String)(implicit system: ActorSystem): ActorRef = {
-    Await.result(system.actorSelection(path).resolveOne(), AWAIT_DURATION)
+  def getReference(path: String)(implicit system: ActorSystem, timeout: FiniteDuration): ActorRef
+  = {
+    Await.result(system.actorSelection(path).resolveOne()(timeout), timeout)
   }
 
   @throws(classOf[IOException])
-  def ask[T](actorSelection: ActorSelection, msg: Any): T = {
-    ask(actorSelection, msg, FUTURE_TIMEOUT, FUTURE_DURATION)
-  }
-
-  @throws(classOf[IOException])
-  def ask[T](actor: ActorRef, msg: Any): T = {
-    ask(actor, msg, FUTURE_TIMEOUT, FUTURE_DURATION)
-  }
-
-  @throws(classOf[IOException])
-  def ask[T](actorSelection: ActorSelection, msg: Any, timeout: Timeout, duration: Duration): T = {
+  def ask[T](actorSelection: ActorSelection, msg: Any)(implicit timeout: FiniteDuration): T
+    = {
     val future = Patterns.ask(actorSelection, msg, timeout)
-    Await.result(future, duration).asInstanceOf[T]
+    Await.result(future, timeout).asInstanceOf[T]
   }
 
   @throws(classOf[IOException])
-  def ask[T](actor: ActorRef, msg: Any, timeout: Timeout, duration: Duration): T = {
+  def ask[T](actor: ActorRef, msg: Any)(implicit timeout: FiniteDuration): T = {
     val future = Patterns.ask(actor, msg, timeout)
-    Await.result(future, duration).asInstanceOf[T]
+    Await.result(future, timeout).asInstanceOf[T]
   }
 
   def askInf[T](actor: ActorRef, msg: Any): T = {
@@ -174,8 +165,8 @@ object AkkaUtils {
   }
 
   def retry(target: ActorRef, message: Any, tries: Int)(implicit executionContext:
-  ExecutionContext): Future[Any] = {
-    (target ? message) recoverWith{
+  ExecutionContext, timeout: FiniteDuration): Future[Any] = {
+    (target ? message)(timeout) recoverWith{
       case t: Throwable =>
         if(tries > 0){
           retry(target, message, tries-1)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
index a9733de..0d9f672 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
@@ -35,13 +35,14 @@ import org.apache.flink.runtime.messages.JobClientMessages.{SubmitJobDetached, S
 import org.apache.flink.runtime.messages.JobManagerMessages._
 
 import scala.concurrent.Await
-import scala.concurrent.duration.Duration
+import scala.concurrent.duration.{FiniteDuration, Duration}
 
 
-class JobClient(jobManagerURL: String) extends Actor with ActorLogMessages with ActorLogging{
+class JobClient(jobManagerURL: String, timeout: FiniteDuration) extends Actor with ActorLogMessages
+with  ActorLogging{
   import context._
 
-  val jobManager = AkkaUtils.getReference(jobManagerURL)
+  val jobManager = AkkaUtils.getReference(jobManagerURL)(system, timeout)
 
   override def receiveWithLogMessages: Receive = {
     case SubmitJobDetached(jobGraph) =>
@@ -120,15 +121,16 @@ object JobClient{
   }
 
 
-  def submitJobDetached(jobGraph: JobGraph, jobClient: ActorRef): SubmissionResponse = {
-    import AkkaUtils.FUTURE_TIMEOUT
-    val response = jobClient ? SubmitJobDetached(jobGraph)
+  def submitJobDetached(jobGraph: JobGraph, jobClient: ActorRef)(implicit timeout: FiniteDuration):
+  SubmissionResponse = {
+    val response = (jobClient ? SubmitJobDetached(jobGraph))(timeout)
 
-    Await.result(response.mapTo[SubmissionResponse],AkkaUtils.FUTURE_DURATION)
+    Await.result(response.mapTo[SubmissionResponse],timeout)
   }
 
   @throws(classOf[IOException])
-  def uploadJarFiles(jobGraph: JobGraph, hostname: String, jobClient: ActorRef): Unit = {
+  def uploadJarFiles(jobGraph: JobGraph, hostname: String, jobClient: ActorRef)(implicit timeout:
+   FiniteDuration): Unit = {
     val port = AkkaUtils.ask[Int](jobClient, RequestBlobManagerPort)
 
     val serverAddress = new InetSocketAddress(hostname, port)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 75f2a63..1fa89c1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager
 
 import java.io.File
 import java.net.{InetSocketAddress}
+import java.util.concurrent.TimeUnit
 
 import akka.actor._
 import akka.pattern.Patterns
@@ -28,10 +29,11 @@ import com.google.common.base.Preconditions
 import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration}
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.runtime.blob.BlobServer
-import org.apache.flink.runtime.executiongraph.{ExecutionJobVertex, ExecutionGraph}
+import org.apache.flink.runtime.executiongraph.{Execution, ExecutionJobVertex, ExecutionGraph}
 import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
+import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.{JobException, ActorLogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
@@ -52,7 +54,10 @@ import scala.concurrent.duration._
 class JobManager(val configuration: Configuration) extends
 Actor with ActorLogMessages with ActorLogging with WrapAsScala {
   import context._
-  import AkkaUtils.FUTURE_TIMEOUT
+  implicit val timeout = FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
+    ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
+
+  Execution.timeout = timeout;
 
   log.info("Starting job manager.")
 
@@ -329,7 +334,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
     case RequestJobStatus(jobID) => {
       currentJobs.get(jobID) match {
         case Some((executionGraph,_)) => sender() ! CurrentJobStatus(jobID, executionGraph.getState)
-        case None => archive ? RequestJobStatus(jobID) pipeTo sender()
+        case None => (archive ? RequestJobStatus(jobID))(timeout) pipeTo sender()
       }
     }
 
@@ -344,7 +349,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
     case RequestJob(jobID) => {
       currentJobs.get(jobID) match {
         case Some((eg, _)) => sender() ! JobFound(jobID, eg)
-        case None => archive ? RequestJob(jobID) pipeTo sender()
+        case None => (archive ? RequestJob(jobID))(timeout) pipeTo sender()
       }
     }
 
@@ -384,6 +389,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
 }
 
 object JobManager {
+  import ExecutionMode._
   val LOG = LoggerFactory.getLogger(classOf[JobManager])
   val FAILURE_RETURN_CODE = 1
   val JOB_MANAGER_NAME = "jobmanager"
@@ -392,19 +398,34 @@ object JobManager {
   val PROFILER_NAME = "profiler"
 
   def main(args: Array[String]): Unit = {
-    val (hostname, port, configuration) = parseArgs(args)
+    val (hostname, port, configuration, executionMode) = parseArgs(args)
 
     val jobManagerSystem = AkkaUtils.createActorSystem(hostname, port, configuration)
 
     startActor(Props(new JobManager(configuration) with WithWebServer))(jobManagerSystem)
+
+    if(executionMode.equals(LOCAL)){
+      TaskManager.startActorWithConfiguration(hostname, configuration, true)(jobManagerSystem)
+    }
+
     jobManagerSystem.awaitTermination()
+    println("Shutting down.")
   }
 
-  def parseArgs(args: Array[String]): (String, Int, Configuration) = {
+  def parseArgs(args: Array[String]): (String, Int, Configuration, ExecutionMode) = {
     val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("jobmanager") {
       head("flink jobmanager")
       opt[String]("configDir") action { (x, c) => c.copy(configDir = x) } text ("Specify " +
         "configuration directory.")
+      opt[String]("executionMode") optional() action { (x, c) =>
+        if(x.equals("local")){
+          c.copy(executionMode = LOCAL)
+        }else{
+          c.copy(executionMode = CLUSTER)
+        }
+      } text {
+        "Specify execution mode of job manager"
+      }
     }
 
     parser.parse(args, JobManagerCLIConfiguration()) map {
@@ -419,7 +440,7 @@ object JobManager {
         val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
           ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
 
-        (hostname, port, configuration)
+        (hostname, port, configuration, config.executionMode)
     } getOrElse {
       LOG.error("CLI Parsing failed. Usage: " + parser.usage)
       sys.exit(FAILURE_RETURN_CODE)
@@ -456,19 +477,23 @@ object JobManager {
     s"akka.tcp://flink@${address}/user/${JOB_MANAGER_NAME}"
   }
 
-  def getProfiler(jobManager: ActorRef)(implicit system: ActorSystem): ActorRef = {
+  def getProfiler(jobManager: ActorRef)(implicit system: ActorSystem, timeout: FiniteDuration):
+  ActorRef = {
     AkkaUtils.getChild(jobManager, PROFILER_NAME)
   }
 
-  def getEventCollector(jobManager: ActorRef)(implicit system: ActorSystem): ActorRef = {
+  def getEventCollector(jobManager: ActorRef)(implicit system: ActorSystem, timeout:
+  FiniteDuration): ActorRef = {
     AkkaUtils.getChild(jobManager, EVENT_COLLECTOR_NAME)
   }
 
-  def getArchivist(jobManager: ActorRef)(implicit system: ActorSystem): ActorRef = {
+  def getArchivist(jobManager: ActorRef)(implicit system: ActorSystem, timeout: FiniteDuration):
+  ActorRef = {
     AkkaUtils.getChild(jobManager, ARCHIVE_NAME)
   }
 
-  def getJobManager(address: InetSocketAddress)(implicit system: ActorSystem): ActorRef = {
+  def getJobManager(address: InetSocketAddress)(implicit system: ActorSystem, timeout:
+  FiniteDuration): ActorRef = {
     AkkaUtils.getReference(getAkkaURL(address.getHostName + ":" + address.getPort))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
index d588f95..d08aecc 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
@@ -18,5 +18,11 @@
 
 package org.apache.flink.runtime.jobmanager
 
-case class JobManagerCLIConfiguration(configDir: String = null) {
+object ExecutionMode extends Enumeration{
+  type ExecutionMode = Value
+  val LOCAL = Value
+  val CLUSTER = Value
 }
+
+case class JobManagerCLIConfiguration(configDir: String = null, executionMode: ExecutionMode
+.ExecutionMode = ExecutionMode.CLUSTER) {}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala
index 81fecbf..715fc0c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 trait WithWebServer extends Actor {
   that: JobManager =>
 
-  val webServer = new WebInfoServer(configuration,self, archive)
+  val webServer = new WebInfoServer(configuration,self, archive, timeout)
   webServer.start()
 
   abstract override def postStop(): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 35def3d..6d0da27 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.minicluster
 
+import java.util.concurrent.TimeUnit
+
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
 import org.apache.flink.api.common.io.FileOutputFormat
@@ -27,11 +29,15 @@ import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegistere
 import org.apache.flink.runtime.util.EnvironmentInformation
 import org.slf4j.LoggerFactory
 
+import scala.concurrent.duration.FiniteDuration
 import scala.concurrent.{Future, Await}
 
 abstract class FlinkMiniCluster(userConfiguration: Configuration) {
   import FlinkMiniCluster._
 
+  implicit val timeout = FiniteDuration(userConfiguration.getInteger(ConfigConstants
+    .AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
+
   val configuration = generateConfiguration(userConfiguration)
 
   val jobManagerActorSystem = startJobManagerActorSystem()
@@ -92,19 +98,18 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
   }
 
   def awaitTermination(): Unit = {
-    taskManagerActorSystems foreach { _.awaitTermination(AkkaUtils.AWAIT_DURATION)}
-    jobManagerActorSystem.awaitTermination(AkkaUtils.AWAIT_DURATION)
+    taskManagerActorSystems foreach { _.awaitTermination()}
+    jobManagerActorSystem.awaitTermination()
   }
 
   def waitForTaskManagersToBeRegistered(): Unit = {
-    implicit val timeout = AkkaUtils.FUTURE_TIMEOUT
     implicit val executionContext = AkkaUtils.globalExecutionContext
 
     val futures = taskManagerActors map {
-      _ ? NotifyWhenRegisteredAtJobManager
+      taskManager => (taskManager ? NotifyWhenRegisteredAtJobManager)(timeout)
     }
 
-    Await.ready(Future.sequence(futures), AkkaUtils.AWAIT_DURATION)
+    Await.ready(Future.sequence(futures), timeout)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 8625983..fb6e36b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -118,40 +118,4 @@ FlinkMiniCluster(userConfiguration){
 
 object LocalFlinkMiniCluster{
   val LOG = LoggerFactory.getLogger(classOf[LocalFlinkMiniCluster])
-  val FAILURE_RETURN_CODE = 1
-
-  def main(args: Array[String]): Unit = {
-    val configuration = parseArgs(args)
-
-    val cluster = new LocalFlinkMiniCluster(configuration)
-
-    cluster.awaitTermination()
-  }
-
-  def parseArgs(args: Array[String]): Configuration = {
-    val parser = new OptionParser[LocalFlinkMiniClusterConfiguration]("LocalFlinkMiniCluster") {
-      head("LocalFlinkMiniCluster")
-      opt[String]("configDir") action { (value, config) => config.copy(configDir = value) } text
-        {"Specify configuration directory."}
-    }
-
-    parser.parse(args, LocalFlinkMiniClusterConfiguration()) map {
-      config =>{
-        GlobalConfiguration.loadConfiguration(config.configDir)
-        val configuration = GlobalConfiguration.getConfiguration
-
-        if(config.configDir != null && new File(config.configDir).isDirectory){
-          configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, config.configDir + "/..")
-        }
-
-        configuration
-      }
-    } getOrElse{
-      LOG.error("CLI parsing failed. Usage: " + parser.usage)
-      sys.exit(FAILURE_RETURN_CODE)
-    }
-  }
-
-
-  case class LocalFlinkMiniClusterConfiguration(val configDir: String = "")
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 136dc7f..66d25c5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -69,8 +69,9 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
   extends Actor with ActorLogMessages with ActorLogging with DecorateAsScala with WrapAsScala {
 
   import context._
-  import AkkaUtils.FUTURE_TIMEOUT
-  import taskManagerConfig._
+  import taskManagerConfig.{timeout => tmTimeout, _}
+  implicit val timeout = tmTimeout
+
 
   log.info(s"Starting task manager at ${self.path}.")
 
@@ -172,7 +173,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
         jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots)
       } else {
         log.error("TaskManager could not register at JobManager.");
-        throw new RuntimeException("TaskManager could not register at JobManager");
+        self ! PoisonPill
       }
     }
 
@@ -182,6 +183,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
         currentJobManager = sender()
         instanceID = id
 
+        context.watch(currentJobManager)
+
         log.info(s"TaskManager successfully registered at JobManager ${
           currentJobManager.path
             .toString
@@ -247,7 +250,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
           case None =>
         }
 
-        val splitProvider = new TaskInputSplitProvider(currentJobManager, jobID, vertexID)
+        val splitProvider = new TaskInputSplitProvider(currentJobManager, jobID, vertexID, timeout)
         val env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, memoryManager,
           ioManager, splitProvider,currentJobManager)
 
@@ -356,14 +359,19 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
           log.error(s"Cannot find task with ID ${executionID} to unregister.")
       }
     }
+
+    case Terminated(jobManager) => {
+      log.info(s"Job manager ${jobManager.path} is no longer reachable. Try to reregister.")
+      tryJobManagerRegistration()
+    }
   }
 
   def notifyExecutionStateChange(jobID: JobID, executionID: ExecutionAttemptID,
                                  executionState: ExecutionState,
                                  optionalError: Throwable): Unit = {
     log.info(s"Update execution state to ${executionState}.")
-    val futureResponse = currentJobManager ? UpdateTaskExecutionState(new TaskExecutionState
-    (jobID, executionID, executionState, optionalError))
+    val futureResponse = (currentJobManager ? UpdateTaskExecutionState(new TaskExecutionState
+    (jobID, executionID, executionState, optionalError)))(timeout)
 
     val receiver = this.self
 
@@ -402,7 +410,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
       }
 
       channelManager = Some(new ChannelManager(currentJobManager, connectionInfo, numBuffers,
-        bufferSize, connectionManager))
+        bufferSize, connectionManager, timeout))
     } catch {
       case ioe: IOException =>
         log.error(ioe, "Failed to instantiate ChannelManager.")
@@ -412,7 +420,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
 
   def setupLibraryCacheManager(blobPort: Int): Unit = {
     if(blobPort > 0){
-      val address = new InetSocketAddress(currentJobManager.path.address.host.get, blobPort)
+      val address = new InetSocketAddress(currentJobManager.path.address.host.getOrElse
+        ("localhost"), blobPort)
       libraryCacheManager = new BlobLibraryCacheManager(new BlobCache(address), cleanupInterval)
     }else{
       libraryCacheManager = new FallbackLibraryCacheManager
@@ -598,8 +607,11 @@ object TaskManager {
       .LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
       ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
 
+    val timeout = FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
+      ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
+
     val taskManagerConfig = TaskManagerConfiguration(numberOfSlots, memorySize, pageSize,
-      tmpDirs, cleanupInterval, memoryLoggingIntervalMs, profilingInterval)
+      tmpDirs, cleanupInterval, memoryLoggingIntervalMs, profilingInterval, timeout)
 
     (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration)
   }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
index 150db73..a6a76a3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
@@ -18,7 +18,10 @@
 
 package org.apache.flink.runtime.taskmanager
 
+import scala.concurrent.duration.FiniteDuration
+
 case class TaskManagerConfiguration(numberOfSlots: Int, memorySize: Long, pageSize: Int,
                                     tmpDirPaths: Array[String], cleanupInterval: Long,
                                     memoryLogggingIntervalMs: Option[Long],
-                                    profilingInterval: Option[Long])
+                                    profilingInterval: Option[Long],
+                                    timeout: FiniteDuration)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index c6b4fb5..b76944b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -32,6 +32,7 @@ import akka.actor.UntypedActor;
 import akka.japi.Creator;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -73,6 +74,8 @@ public class TaskManagerTest {
 
 	private static ActorSystem system;
 
+	private static Timeout timeout = new Timeout(1, TimeUnit.MINUTES);
+
 	@BeforeClass
 	public static void setup(){
 		system = ActorSystem.create("TestActorSystem", TestingUtils.testConfig());
@@ -178,7 +181,7 @@ public class TaskManagerTest {
 							expectMsgEquals(new TaskOperationResult(eid1, true));
 
 							Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
-									AkkaUtils.FUTURE_TIMEOUT());
+									timeout);
 							Await.ready(response, d);
 
 							assertEquals(ExecutionState.CANCELED, t1.getExecutionState());
@@ -197,7 +200,7 @@ public class TaskManagerTest {
 							expectMsgEquals(new TaskOperationResult(eid2, true));
 
 							response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
-									AkkaUtils.FUTURE_TIMEOUT());
+									timeout);
 							Await.ready(response, d);
 
 							assertEquals(ExecutionState.CANCELED, t2.getExecutionState());
@@ -336,13 +339,13 @@ public class TaskManagerTest {
 			// we get to the check, so we need to guard the check
 						if (t1 != null) {
 							Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
-									AkkaUtils.FUTURE_TIMEOUT());
+									timeout);
 							Await.ready(response, d);
 						}
 
 						if (t2 != null) {
 							Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
-									AkkaUtils.FUTURE_TIMEOUT());
+									timeout);
 							Await.ready(response, d);
 				assertEquals(ExecutionState.FINISHED, t2.getExecutionState());
 			}
@@ -424,7 +427,7 @@ public class TaskManagerTest {
 
 						if (t2 != null) {
 							Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
-									AkkaUtils.FUTURE_TIMEOUT());
+									timeout);
 							Await.ready(response, d);
 						}
 
@@ -434,7 +437,7 @@ public class TaskManagerTest {
 								expectMsgEquals(new TaskOperationResult(eid1, true));
 							}
 							Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
-									AkkaUtils.FUTURE_TIMEOUT());
+									timeout);
 							Await.ready(response, d);
 						}
 
@@ -538,7 +541,7 @@ public class TaskManagerTest {
 		ActorRef taskManager = TestingUtils.startTestingTaskManagerWithConfiguration("localhost", cfg, system);
 
 		Future<Object> response = Patterns.ask(taskManager, NotifyWhenRegisteredAtJobManager$.MODULE$,
-				AkkaUtils.FUTURE_TIMEOUT());
+				timeout);
 
 		try {
 			FiniteDuration d = new FiniteDuration(2, TimeUnit.SECONDS);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index e23992c..6689f93 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -37,6 +37,8 @@ import scala.concurrent.duration._
 @RunWith(classOf[JUnitRunner])
 class JobManagerITCase(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with
 WordSpecLike with Matchers with BeforeAndAfterAll {
+  implicit val timeout = 1 minute
+
   def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
 
   override def afterAll: Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
index 2e08dc3..6347cb5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
@@ -23,9 +23,8 @@ import java.util.concurrent.TimeUnit;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import akka.dispatch.ExecutionContexts;
 import akka.pattern.Patterns;
-import com.amazonaws.http.ExecutionContext;
+import akka.util.Timeout;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -116,7 +115,7 @@ public abstract class CancellingTestBase {
 			boolean jobSuccessfullyCancelled = false;
 
 			Future<Object> result = Patterns.ask(client, new JobClientMessages.SubmitJobAndWait
-					(jobGraph, false), AkkaUtils.FUTURE_TIMEOUT());
+					(jobGraph, false), new Timeout(AkkaUtils.DEFAULT_TIMEOUT()));
 
 			actorSystem.scheduler().scheduleOnce(new FiniteDuration(msecsTillCanceling,
 							TimeUnit.MILLISECONDS), client, new JobManagerMessages.CancelJob(jobGraph.getJobID()),
@@ -125,7 +124,7 @@ public abstract class CancellingTestBase {
 							throw new IllegalStateException("Job restarted");
 
 			try {
-				Await.result(result, AkkaUtils.AWAIT_DURATION());
+				Await.result(result, AkkaUtils.DEFAULT_TIMEOUT());
 			} catch (JobExecutionException exception) {
 				if (!exception.isJobCanceledByUser()) {
 					throw new IllegalStateException("Job Failed.");