You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/21 12:46:56 UTC

[1/2] flink git commit: [FLINK-4836] [cluster management] Start ResourceManager in MiniCluster [Forced Update!]

Repository: flink
Updated Branches:
  refs/heads/flip-6 7041f9344 -> 930334ef7 (forced update)


[FLINK-4836] [cluster management] Start ResourceManager in MiniCluster


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0b981d6c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0b981d6c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0b981d6c

Branch: refs/heads/flip-6
Commit: 0b981d6c952eaa93a13d65ce8284e62edd48f5ce
Parents: b9cebaf
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Oct 17 17:02:33 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 20 19:50:36 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/minicluster/MiniCluster.java  | 126 +++++++++++++++----
 .../minicluster/MiniClusterJobDispatcher.java   |   4 +-
 2 files changed, 102 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0b981d6c/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1ffcd12..d63f9a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -32,11 +32,18 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.util.ExceptionUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.concurrent.GuardedBy;
 
 import java.util.UUID;
@@ -48,13 +55,15 @@ import static org.apache.flink.util.Preconditions.checkState;
 
 public class MiniCluster {
 
+	private static final Logger LOG = LoggerFactory.getLogger(MiniCluster.class);
+
 	/** The lock to guard startup / shutdown / manipulation methods */
 	private final Object lock = new Object();
 
 	/** The configuration for this mini cluster */
 	private final MiniClusterConfiguration config;
 
-	@GuardedBy("lock")
+	@GuardedBy("lock") 
 	private MetricRegistry metricRegistry;
 
 	@GuardedBy("lock")
@@ -73,6 +82,9 @@ public class MiniCluster {
 	private HighAvailabilityServices haServices;
 
 	@GuardedBy("lock")
+	private ResourceManager<?>[] resourceManagers;
+
+	@GuardedBy("lock")
 	private TaskManagerRunner[] taskManagerRunners;
 
 	@GuardedBy("lock")
@@ -98,6 +110,7 @@ public class MiniCluster {
 	}
 
 	/**
+	 * Creates a new Flink mini cluster based on the given configuration.
 	 * 
 	 * @param config The configuration for the mini cluster
 	 */
@@ -149,6 +162,9 @@ public class MiniCluster {
 		synchronized (lock) {
 			checkState(!running, "FlinkMiniCluster is already running");
 
+			LOG.info("Starting Flink Mini Cluster");
+			LOG.debug("Using configuration {}", config);
+
 			final Configuration configuration = new UnmodifiableConfiguration(config.getConfiguration());
 			final Time rpcTimeout = config.getRpcTimeout();
 			final int numJobManagers = config.getNumJobManagers();
@@ -210,13 +226,21 @@ public class MiniCluster {
 				}
 
 				// create the high-availability services
+				LOG.info("Starting high-availability services");
 				haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration);
 
-				// bring up the task managers for the mini cluster
+				// bring up the ResourceManager(s)
+				LOG.info("Starting {} ResourceManger(s)", numResourceManagers);
+				resourceManagers = startResourceManagers(
+						configuration, haServices, metricRegistry, numResourceManagers, resourceManagerRpcServices);
+
+				// bring up the TaskManager(s) for the mini cluster
+				LOG.info("Starting {} TaskManger(s)", numTaskManagers);
 				taskManagerRunners = startTaskManagers(
 						configuration, haServices, metricRegistry, numTaskManagers, taskManagerRpcServices);
 
 				// bring up the dispatcher that launches JobManagers when jobs submitted
+				LOG.info("Starting job dispatcher for {} JobManger(s)", numJobManagers);
 				jobDispatcher = new MiniClusterJobDispatcher(
 						configuration, haServices, metricRegistry, numJobManagers, jobManagerRpcServices);
 			}
@@ -232,6 +256,8 @@ public class MiniCluster {
 
 			// now officially mark this as running
 			running = true;
+
+			LOG.info("Flink Mini Cluster started successfully");
 		}
 	}
 
@@ -247,11 +273,13 @@ public class MiniCluster {
 	public void shutdown() throws Exception {
 		synchronized (lock) {
 			if (running) {
+				LOG.info("Shutting down Flink Mini Cluster");
 				try {
 					shutdownInternally();
 				} finally {
 					running = false;
 				}
+				LOG.info("Flink Mini Cluster is shut down");
 			}
 		}
 	}
@@ -270,11 +298,34 @@ public class MiniCluster {
 			try {
 				jobDispatcher.shutdown();
 			} catch (Exception e) {
-				exception = firstOrSuppressed(e, exception);
+				exception = e;
 			}
 			jobDispatcher = null;
 		}
 
+		if (resourceManagers != null) {
+			for (ResourceManager<?> rm : resourceManagers) {
+				if (rm != null) {
+					try {
+						rm.shutDown();
+					} catch (Throwable t) {
+						exception = firstOrSuppressed(t, exception);
+					}
+				}
+			}
+			resourceManagers = null;
+		}
+
+		// shut down the RpcServices
+		exception = shutDownRpc(commonRpcService, exception);
+		exception = shutDownRpcs(jobManagerRpcServices, exception);
+		exception = shutDownRpcs(taskManagerRpcServices, exception);
+		exception = shutDownRpcs(resourceManagerRpcServices, exception);
+		commonRpcService = null;
+		jobManagerRpcServices = null;
+		taskManagerRpcServices = null;
+		resourceManagerRpcServices = null;
+
 		// shut down high-availability services
 		if (haServices != null) {
 			try {
@@ -285,24 +336,6 @@ public class MiniCluster {
 			haServices = null;
 		}
 
-		// shut down the RpcServices
-		if (commonRpcService != null) {
-			exception = shutDownRpc(commonRpcService, exception);
-			commonRpcService = null;
-		}
-		if (jobManagerRpcServices != null) {
-			for (RpcService service : jobManagerRpcServices) {
-				exception = shutDownRpc(service, exception);
-			}
-			jobManagerRpcServices = null;
-		}
-		if (taskManagerRpcServices != null) {
-			for (RpcService service : taskManagerRpcServices) {
-				exception = shutDownRpc(service, exception);
-			}
-			taskManagerRpcServices = null;
-		}
-
 		// metrics shutdown
 		if (metricRegistry != null) {
 			metricRegistry.shutdown();
@@ -402,6 +435,28 @@ public class MiniCluster {
 		return new AkkaRpcService(actorSystem, askTimeout);
 	}
 
+	protected ResourceManager<?>[] startResourceManagers(
+			Configuration configuration,
+			HighAvailabilityServices haServices,
+			MetricRegistry metricRegistry,
+			int numResourceManagers,
+			RpcService[] resourceManagerRpcServices) throws Exception {
+
+		final StandaloneResourceManager[] resourceManagers = new StandaloneResourceManager[numResourceManagers];
+		final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory(); 
+
+		for (int i = 0; i < numResourceManagers; i++) {
+			resourceManagers[i] = new StandaloneResourceManager(
+					resourceManagerRpcServices[i],
+					haServices,
+					slotManagerFactory);
+
+			resourceManagers[i].start();
+		}
+
+		return resourceManagers;
+	}
+
 	protected TaskManagerRunner[] startTaskManagers(
 			Configuration configuration,
 			HighAvailabilityServices haServices,
@@ -429,15 +484,34 @@ public class MiniCluster {
 	// ------------------------------------------------------------------------
 
 	private static Throwable shutDownRpc(RpcService rpcService, Throwable priorException) {
-		try {
-			if (rpcService != null) {
+		if (rpcService != null) {
+			try {
 				rpcService.stopService();
 			}
-			return priorException;
+			catch (Throwable t) {
+				return firstOrSuppressed(t, priorException);
+			}
 		}
-		catch (Throwable t) {
-			return firstOrSuppressed(t, priorException);
+
+		return priorException;
+	}
+
+	private static Throwable shutDownRpcs(RpcService[] rpcServices, Throwable priorException) {
+		if (rpcServices != null) {
+			Throwable exception = priorException;
+
+			for (RpcService service : rpcServices) {
+				try {
+					if (service != null) {
+						service.stopService();
+					}
+				}
+				catch (Throwable t) {
+					exception = firstOrSuppressed(t, exception);
+				}
+			}
 		}
+		return priorException;
 	}
 
 	private static MiniClusterConfiguration createConfig(Configuration cfg, boolean singleRpcService) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0b981d6c/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index d0df293..8ac8eba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -176,7 +176,7 @@ public class MiniClusterJobDispatcher {
 	public void runDetached(JobGraph job) throws JobExecutionException {
 		checkNotNull(job);
 
-		LOG.info("Received job for detached execution {} ({})", job.getName(), job.getJobID());
+		LOG.info("Received job for detached execution: {} ({})", job.getName(), job.getJobID());
 
 		synchronized (lock) {
 			checkState(!shutdown, "mini cluster is shut down");
@@ -201,7 +201,7 @@ public class MiniClusterJobDispatcher {
 	public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
 		checkNotNull(job);
 		
-		LOG.info("Received job for blocking execution {} ({})", job.getName(), job.getJobID());
+		LOG.info("Received job for blocking execution: {} ({})", job.getName(), job.getJobID());
 		final BlockingJobSync sync = new BlockingJobSync(job.getJobID(), numJobManagers);
 
 		synchronized (lock) {


[2/2] flink git commit: Rebasing fixes on latest master

Posted by tr...@apache.org.
Rebasing fixes on latest master


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/930334ef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/930334ef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/930334ef

Branch: refs/heads/flip-6
Commit: 930334ef7336d66b9161003575d12a3d66805c89
Parents: 0b981d6
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Oct 20 19:54:12 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Oct 21 14:46:37 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/storm/wrappers/BoltWrapperTest.java    |  4 ++--
 .../checkpoint/CheckpointCoordinatorGateway.java        |  5 ++---
 .../org/apache/flink/runtime/jobmaster/JobMaster.java   | 12 ++++++------
 .../taskexecutor/rpc/RpcCheckpointResponder.java        |  6 +++---
 .../org/apache/flink/runtime/util/ZooKeeperUtils.java   | 10 +++-------
 .../apache/flink/runtime/jobmanager/JobSubmitTest.java  |  4 +++-
 .../runtime/operators/testutils/DummyEnvironment.java   |  3 ++-
 .../flink/runtime/taskmanager/TaskAsyncCallTest.java    |  6 ------
 ...mulatingAlignedProcessingTimeWindowOperatorTest.java |  3 ++-
 9 files changed, 23 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
index e0659da..ec81fa6 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
@@ -35,7 +35,7 @@ import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.storm.util.AbstractTest;
 import org.apache.flink.storm.util.SplitStreamType;
 import org.apache.flink.storm.util.StormConfig;
@@ -370,7 +370,7 @@ public class BoltWrapperTest extends AbstractTest {
 		when(env.getTaskInfo()).thenReturn(new TaskInfo("Mock Task", 1, 0, 1, 0));
 		when(env.getUserClassLoader()).thenReturn(BoltWrapperTest.class.getClassLoader());
 		when(env.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup());
-		when(env.getTaskManagerInfo()).thenReturn(new TaskManagerRuntimeInfo("foo", new Configuration(), "foo"));
+		when(env.getTaskManagerInfo()).thenReturn(new TestingTaskManagerRuntimeInfo());
 
 		StreamTask<?, ?> mockTask = mock(StreamTask.class);
 		when(mockTask.getCheckpointLock()).thenReturn(new Object());

http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
index 2634006..f9786f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
@@ -21,15 +21,14 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.state.CheckpointStateHandles;
 
 public interface CheckpointCoordinatorGateway extends RpcGateway {
 
 	void acknowledgeCheckpoint(
 			final JobID jobID,
 			final ExecutionAttemptID executionAttemptID,
-			final CheckpointMetaData checkpointInfo,
-			final CheckpointStateHandles checkpointStateHandles);
+			final CheckpointMetaData checkpointMetaData,
+			final SubtaskState subtaskState);
 
 	void declineCheckpoint(
 			final JobID jobID,

http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 56fa3e7..5a7c9a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
@@ -83,7 +84,6 @@ import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.StartStoppable;
-import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
@@ -529,12 +529,12 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	public void acknowledgeCheckpoint(
 			final JobID jobID,
 			final ExecutionAttemptID executionAttemptID,
-			final CheckpointMetaData checkpointInfo,
-			final CheckpointStateHandles checkpointState)
+			final CheckpointMetaData checkpointMetaData,
+			final SubtaskState subtaskState)
 	{
 		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
 		final AcknowledgeCheckpoint ackMessage = 
-				new AcknowledgeCheckpoint(jobID, executionAttemptID, checkpointInfo, checkpointState);
+				new AcknowledgeCheckpoint(jobID, executionAttemptID, checkpointMetaData, subtaskState);
 
 		if (checkpointCoordinator != null) {
 			getRpcService().execute(new Runnable() {
@@ -543,10 +543,10 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 					try {
 						if (!checkpointCoordinator.receiveAcknowledgeMessage(ackMessage)) {
 							log.info("Received message for non-existing checkpoint {}.",
-									checkpointInfo.getCheckpointId());
+									checkpointMetaData.getCheckpointId());
 						}
 					} catch (Exception e) {
-						log.error("Error in CheckpointCoordinator while processing {}", checkpointInfo, e);
+						log.error("Error in CheckpointCoordinator while processing {}", checkpointMetaData, e);
 					}
 				}
 			});

http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
index 9669da0..c18da67 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.taskexecutor.rpc;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.util.Preconditions;
 
@@ -39,13 +39,13 @@ public class RpcCheckpointResponder implements CheckpointResponder {
 			JobID jobID,
 			ExecutionAttemptID executionAttemptID,
 			CheckpointMetaData checkpointMetaData,
-			CheckpointStateHandles checkpointStateHandles) {
+			SubtaskState subtaskState) {
 
 		checkpointCoordinatorGateway.acknowledgeCheckpoint(
 			jobID,
 			executionAttemptID,
 			checkpointMetaData,
-			checkpointStateHandles);
+			subtaskState);
 
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index e9777a3..cb5dc31 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -161,7 +161,6 @@ public class ZooKeeperUtils {
 	 */
 	public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
 			Configuration configuration) {
-	{
 		final CuratorFramework client = startCuratorFramework(configuration);
 		return createLeaderRetrievalService(client, configuration);
 	}
@@ -172,11 +171,10 @@ public class ZooKeeperUtils {
 	 * @param client        The {@link CuratorFramework} ZooKeeper client to use
 	 * @param configuration {@link Configuration} object containing the configuration values
 	 * @return {@link ZooKeeperLeaderRetrievalService} instance.
-	 * @throws Exception
 	 */
 	public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
 		final CuratorFramework client,
-		final Configuration configuration) throws Exception
+		final Configuration configuration)
 	{
 		return createLeaderRetrievalService(client, configuration, "");
 	}
@@ -188,12 +186,11 @@ public class ZooKeeperUtils {
 	 * @param configuration {@link Configuration} object containing the configuration values
 	 * @param pathSuffix    The path suffix which we want to append
 	 * @return {@link ZooKeeperLeaderRetrievalService} instance.
-	 * @throws Exception
 	 */
 	public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
 		final CuratorFramework client,
 		final Configuration configuration,
-		final String pathSuffix) throws Exception
+		final String pathSuffix)
 	{
 		String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys(
 			configuration,
@@ -240,12 +237,11 @@ public class ZooKeeperUtils {
 	 * @param configuration {@link Configuration} object containing the configuration values
 	 * @param pathSuffix    The path suffix which we want to append
 	 * @return {@link ZooKeeperLeaderElectionService} instance.
-	 * @throws Exception
 	 */
 	public static ZooKeeperLeaderElectionService createLeaderElectionService(
 		final CuratorFramework client,
 		final Configuration configuration,
-		final String pathSuffix) throws Exception
+		final String pathSuffix)
 	{
 		final String latchPath = ConfigurationUtil.getStringWithDeprecatedKeys(
 			configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 260b4d4..3c45ccc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -168,6 +168,8 @@ public class JobSubmitTest {
 
 			JobVertex jobVertex = new JobVertex("Vertex that fails in initializeOnMaster") {
 
+				private static final long serialVersionUID = -3540303593784587652L;
+
 				@Override
 				public void initializeOnMaster(ClassLoader loader) throws Exception {
 					throw new RuntimeException("test exception");
@@ -217,7 +219,7 @@ public class JobSubmitTest {
 	private JobGraph createSimpleJobGraph() {
 		JobVertex jobVertex = new JobVertex("Vertex");
 
-		jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+		jobVertex.setInvokableClass(NoOpInvokable.class);
 		List<JobVertexID> vertexIdList = Collections.singletonList(jobVertex.getID());
 
 		JobGraph jg = new JobGraph("test job", jobVertex);

http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index f2616b5..3eba048 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 
 import java.util.Collections;
 import java.util.Map;
@@ -92,7 +93,7 @@ public class DummyEnvironment implements Environment {
 
 	@Override
 	public TaskManagerRuntimeInfo getTaskManagerInfo() {
-		return new TaskManagerRuntimeInfo("foo", new Configuration(), "foo");
+		return new TestingTaskManagerRuntimeInfo();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 2a9ff61..7494d7a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -45,10 +45,6 @@ import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.SerializedValue;
@@ -57,9 +53,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.net.URL;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertFalse;

http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index e96109e..bc62890 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -794,7 +795,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		when(env.getTaskInfo()).thenReturn(new TaskInfo("Test task name", 1, 0, 1, 0));
 		when(env.getUserClassLoader()).thenReturn(AggregatingAlignedProcessingTimeWindowOperatorTest.class.getClassLoader());
 		when(env.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup());
-		when(env.getTaskManagerInfo()).thenReturn(new TaskManagerRuntimeInfo("foo", new Configuration(), "foo"));
+		when(env.getTaskManagerInfo()).thenReturn(new TestingTaskManagerRuntimeInfo());
 
 		when(task.getEnvironment()).thenReturn(env);
 		return task;