You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/21 12:22:10 UTC

[30/50] [abbrv] flink git commit: [FLINK-4746] Make TaskManagerRuntimeInfo an interface

[FLINK-4746] Make TaskManagerRuntimeInfo an interface

Let the TaskManagerConfiguration implement the TaskManagerRuntimeInformation to make some of
the TaskManager's configuration values accessible from different components.

This closes #2599.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4596f1e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4596f1e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4596f1e

Branch: refs/heads/flip-6
Commit: a4596f1e21157066a3728f4c13209d41a3cd6858
Parents: 5275586
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Oct 5 14:47:24 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 20 19:48:30 2016 +0200

----------------------------------------------------------------------
 .../runtime/taskexecutor/TaskExecutor.java      | 11 +---
 .../taskexecutor/TaskManagerConfiguration.java  | 22 +++----
 .../taskmanager/TaskManagerRuntimeInfo.java     | 61 ++------------------
 .../flink/runtime/taskmanager/TaskManager.scala | 11 +---
 .../operators/drivers/TestTaskContext.java      |  4 +-
 .../testutils/BinaryOperatorTestBase.java       |  4 +-
 .../operators/testutils/DriverTestBase.java     |  4 +-
 .../operators/testutils/MockEnvironment.java    |  8 +--
 .../testutils/UnaryOperatorTestBase.java        |  4 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |  8 +--
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  4 +-
 .../flink/runtime/taskmanager/TaskTest.java     |  3 +-
 .../util/TestingTaskManagerRuntimeInfo.java     | 52 +++++++++++++++++
 .../tasks/InterruptSensitiveRestoreTest.java    |  5 +-
 .../runtime/tasks/StreamMockEnvironment.java    |  4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |  4 +-
 16 files changed, 98 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 35b639b..a2716e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
-import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -71,7 +70,6 @@ import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.util.Preconditions;
 
 import java.util.HashSet;
@@ -127,9 +125,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 	private final FileCache fileCache;
 
-	// TODO: Try to get rid of it
-	private final TaskManagerRuntimeInfo taskManagerRuntimeInfo;
-
 	// --------- resource manager --------
 
 	private TaskExecutorToResourceManagerConnection resourceManagerConnection;
@@ -177,10 +172,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup);
 		this.broadcastVariableManager = checkNotNull(broadcastVariableManager);
 		this.fileCache = checkNotNull(fileCache);
-		this.taskManagerRuntimeInfo = new TaskManagerRuntimeInfo(
-			taskManagerLocation.getHostname(),
-			new UnmodifiableConfiguration(taskManagerConfiguration.getConfiguration()),
-			taskManagerConfiguration.getTmpDirPaths());
 
 		this.jobManagerConnections = new HashMap<>(4);
 
@@ -308,7 +299,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			checkpointResponder,
 			libraryCache,
 			fileCache,
-			taskManagerRuntimeInfo,
+			taskManagerConfiguration,
 			taskMetricGroup,
 			resultPartitionConsumableNotifier,
 			partitionStateChecker,

http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index bce3dc3..1d1e732 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,13 +34,13 @@ import java.io.File;
 /**
  * Configuration object for {@link TaskExecutor}.
  */
-public class TaskManagerConfiguration {
+public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 
 	private static final Logger LOG = LoggerFactory.getLogger(TaskManagerConfiguration.class);
 
 	private final int numberSlots;
 
-	private final String[] tmpDirPaths;
+	private final String[] tmpDirectories;
 
 	private final Time timeout;
 	// null indicates an infinite duration
@@ -50,12 +51,11 @@ public class TaskManagerConfiguration {
 
 	private final long cleanupInterval;
 
-	// TODO: remove necessity for complete configuration object
-	private final Configuration configuration;
+	private final UnmodifiableConfiguration configuration;
 
 	public TaskManagerConfiguration(
 		int numberSlots,
-		String[] tmpDirPaths,
+		String[] tmpDirectories,
 		Time timeout,
 		Time maxRegistrationDuration,
 		Time initialRegistrationPause,
@@ -65,7 +65,7 @@ public class TaskManagerConfiguration {
 		Configuration configuration) {
 
 		this.numberSlots = numberSlots;
-		this.tmpDirPaths = Preconditions.checkNotNull(tmpDirPaths);
+		this.tmpDirectories = Preconditions.checkNotNull(tmpDirectories);
 		this.timeout = Preconditions.checkNotNull(timeout);
 		this.maxRegistrationDuration = maxRegistrationDuration;
 		this.initialRegistrationPause = Preconditions.checkNotNull(initialRegistrationPause);
@@ -79,10 +79,6 @@ public class TaskManagerConfiguration {
 		return numberSlots;
 	}
 
-	public String[] getTmpDirPaths() {
-		return tmpDirPaths;
-	}
-
 	public Time getTimeout() {
 		return timeout;
 	}
@@ -107,10 +103,16 @@ public class TaskManagerConfiguration {
 		return cleanupInterval;
 	}
 
+	@Override
 	public Configuration getConfiguration() {
 		return configuration;
 	}
 
+	@Override
+	public String[] getTmpDirectories() {
+		return tmpDirectories;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Static factory methods
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
index 9ac982e..d1efe34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
@@ -20,71 +20,22 @@ package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.configuration.Configuration;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkArgument;
-
 /**
- * Encapsulation of TaskManager runtime information, like hostname and configuration.
+ * Interface to access {@link TaskManager} information.
  */
-public class TaskManagerRuntimeInfo implements java.io.Serializable {
-
-	private static final long serialVersionUID = 5598219619760274072L;
-	
-	/** host name of the interface that the TaskManager uses to communicate */
-	private final String hostname;
-
-	/** configuration that the TaskManager was started with */
-	private final Configuration configuration;
-
-	/** list of temporary file directories */
-	private final String[] tmpDirectories;
-	
-	/**
-	 * Creates a runtime info.
-	 * 
-	 * @param hostname The host name of the interface that the TaskManager uses to communicate.
-	 * @param configuration The configuration that the TaskManager was started with.
-	 * @param tmpDirectory The temporary file directory.   
-	 */
-	public TaskManagerRuntimeInfo(String hostname, Configuration configuration, String tmpDirectory) {
-		this(hostname, configuration, new String[] { tmpDirectory });
-	}
-	
-	/**
-	 * Creates a runtime info.
-	 * @param hostname The host name of the interface that the TaskManager uses to communicate.
-	 * @param configuration The configuration that the TaskManager was started with.
-	 * @param tmpDirectories The list of temporary file directories.   
-	 */
-	public TaskManagerRuntimeInfo(String hostname, Configuration configuration, String[] tmpDirectories) {
-		checkArgument(tmpDirectories.length > 0);
-		this.hostname = checkNotNull(hostname);
-		this.configuration = checkNotNull(configuration);
-		this.tmpDirectories = tmpDirectories;
-		
-	}
-
-	/**
-	 * Gets host name of the interface that the TaskManager uses to communicate.
-	 * @return The host name of the interface that the TaskManager uses to communicate.
-	 */
-	public String getHostname() {
-		return hostname;
-	}
+public interface TaskManagerRuntimeInfo {
 
 	/**
 	 * Gets the configuration that the TaskManager was started with.
+	 *
 	 * @return The configuration that the TaskManager was started with.
 	 */
-	public Configuration getConfiguration() {
-		return configuration;
-	}
+	Configuration getConfiguration();
 
 	/**
 	 * Gets the list of temporary file directories.
+	 * 
 	 * @return The list of temporary file directories.
 	 */
-	public String[] getTmpDirectories() {
-		return tmpDirectories;
-	}
+	String[] getTmpDirectories();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7364ee0..1b56c92 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -150,7 +150,7 @@ class TaskManager(
   protected val bcVarManager = new BroadcastVariableManager()
 
   /** Handler for distributed files cached by this TaskManager */
-  protected val fileCache = new FileCache(config.getTmpDirPaths())
+  protected val fileCache = new FileCache(config.getTmpDirectories())
 
   /** Registry of metrics periodically transmitted to the JobManager */
   private val metricRegistry = TaskManager.createMetricsRegistry()
@@ -184,11 +184,6 @@ class TaskManager(
 
   var leaderSessionID: Option[UUID] = None
 
-  private val runtimeInfo = new TaskManagerRuntimeInfo(
-       location.getHostname(),
-       new UnmodifiableConfiguration(config.getConfiguration()),
-       config.getTmpDirPaths())
-
   private var scheduledTaskManagerRegistration: Option[Cancellable] = None
   private var currentRegistrationRun: UUID = UUID.randomUUID()
 
@@ -996,7 +991,7 @@ class TaskManager(
     }
     
     taskManagerMetricGroup = 
-      new TaskManagerMetricGroup(metricsRegistry, this.runtimeInfo.getHostname, id.toString)
+      new TaskManagerMetricGroup(metricsRegistry, location.getHostname, id.toString)
     
     TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, network)
     
@@ -1180,7 +1175,7 @@ class TaskManager(
         checkpointResponder,
         libCache,
         fileCache,
-        runtimeInfo,
+        config,
         taskMetricGroup,
         resultPartitionConsumableNotifier,
         partitionStateChecker,

http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index 62110a7..d34bb40 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.operators.TaskContext;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -74,8 +75,7 @@ public class TestTaskContext<S, T> implements TaskContext<S, T> {
 	
 	public TestTaskContext(long memoryInBytes) {
 		this.memoryManager = new MemoryManager(memoryInBytes, 1, 32 * 1024, MemoryType.HEAP, true);
-		this.taskManageInfo = new TaskManagerRuntimeInfo(
-				"localhost", new Configuration(), System.getProperty("java.io.tmpdir"));
+		this.taskManageInfo = new TestingTaskManagerRuntimeInfo();
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
index 75f960e..3d4c45f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.operators.TaskContext;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.apache.flink.util.TestLogger;
@@ -110,8 +111,7 @@ public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLog
 		this.owner = new DummyInvokable();
 		this.taskConfig = new TaskConfig(new Configuration());
 		this.executionConfig = executionConfig;
-		this.taskManageInfo = new TaskManagerRuntimeInfo(
-				"localhost", new Configuration(), System.getProperty("java.io.tmpdir"));
+		this.taskManageInfo = new TestingTaskManagerRuntimeInfo();
 	}
 	
 	@Parameterized.Parameters

http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 088435a..f43632c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
 import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
@@ -115,8 +116,7 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Ta
 		this.owner = new DummyInvokable();
 		this.taskConfig = new TaskConfig(new Configuration());
 		this.executionConfig = executionConfig;
-		this.taskManageInfo = new TaskManagerRuntimeInfo(
-				"localhost", new Configuration(), System.getProperty("java.io.tmpdir"));
+		this.taskManageInfo = new TestingTaskManagerRuntimeInfo();
 	}
 
 	@Parameterized.Parameters

http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 08b84cb..9b33071 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
@@ -48,8 +47,10 @@ import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -233,10 +234,7 @@ public class MockEnvironment implements Environment {
 
 	@Override
 	public TaskManagerRuntimeInfo getTaskManagerInfo() {
-		return new TaskManagerRuntimeInfo(
-				"localhost",
-				new UnmodifiableConfiguration(new Configuration()),
-				System.getProperty("java.io.tmpdir"));
+		return new TestingTaskManagerRuntimeInfo();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
index a94e694..85137cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.operators.ResettableDriver;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -115,8 +116,7 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogg
 		this.executionConfig = executionConfig;
 		this.comparators = new ArrayList<TypeComparator<IN>>(2);
 
-		this.taskManageInfo = new TaskManagerRuntimeInfo(
-				"localhost", new Configuration(), System.getProperty("java.io.tmpdir"));
+		this.taskManageInfo = new TestingTaskManagerRuntimeInfo();
 	}
 
 	@Parameterized.Parameters

http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index f5fe52c..ecbd9b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -63,14 +63,11 @@ public class TaskExecutorTest extends TestLogger {
 			ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
 			TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
 			PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
-			PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration());
-			PowerMockito.when(taskManagerServicesConfiguration.getTmpDirPaths()).thenReturn(new String[1]);
 
 			rpc.registerGateway(resourceManagerAddress, rmGateway);
 
 			TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
 			when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
-			when(taskManagerLocation.getHostname()).thenReturn("foobar");
 
 			NonHaServices haServices = new NonHaServices(resourceManagerAddress);
 
@@ -124,7 +121,7 @@ public class TaskExecutorTest extends TestLogger {
 			TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
 			PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
 			PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration());
-			PowerMockito.when(taskManagerServicesConfiguration.getTmpDirPaths()).thenReturn(new String[1]);
+			PowerMockito.when(taskManagerServicesConfiguration.getTmpDirectories()).thenReturn(new String[1]);
 
 			TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
 			when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
@@ -198,12 +195,9 @@ public class TaskExecutorTest extends TestLogger {
 
 			TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
 			PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
-			PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration());
-			PowerMockito.when(taskManagerServicesConfiguration.getTmpDirPaths()).thenReturn(new String[1]);
 
 			TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
 			when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
-			when(taskManagerLocation.getHostname()).thenReturn("foobar");
 
 			TaskExecutor taskManager = new TaskExecutor(
 				taskManagerServicesConfiguration,

http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 3a87d86..8fa7463 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -49,7 +49,9 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.SerializedValue;
+
 import org.junit.Before;
 import org.junit.Test;
 
@@ -179,7 +181,7 @@ public class TaskAsyncCallTest {
 			mock(CheckpointResponder.class),
 			libCache,
 			mock(FileCache.class),
-			new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
+			new TestingTaskManagerRuntimeInfo(),
 			mock(TaskMetricGroup.class),
 			consumableNotifier,
 			partitionStateChecker,

http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index fe618ff..50fc181 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.SerializedValue;
 import org.junit.After;
 import org.junit.Before;
@@ -648,7 +649,7 @@ public class TaskTest {
 			checkpointResponder,
 			libCache,
 			mock(FileCache.class),
-			new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
+			new TestingTaskManagerRuntimeInfo(),
 			mock(TaskMetricGroup.class),
 			consumableNotifier,
 			partitionStateChecker,

http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java
new file mode 100644
index 0000000..e56da97
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+
+import java.io.File;
+
+/**
+ * TaskManagerRuntimeInfo implementation for testing purposes
+ */
+public class TestingTaskManagerRuntimeInfo implements TaskManagerRuntimeInfo {
+
+	private final Configuration configuration;
+	private final String[] tmpDirectories;
+
+	public TestingTaskManagerRuntimeInfo() {
+		this(new Configuration(), System.getProperty("java.io.tmpdir").split(",|" + File.pathSeparator));
+	}
+
+	public TestingTaskManagerRuntimeInfo(Configuration configuration, String[] tmpDirectories) {
+		this.configuration = configuration;
+		this.tmpDirectories = tmpDirectories;
+	}
+
+	@Override
+	public Configuration getConfiguration() {
+		return configuration;
+	}
+
+	@Override
+	public String[] getTmpDirectories() {
+		return tmpDirectories;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 861665f..1077052 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -49,8 +49,8 @@ import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -175,8 +175,7 @@ public class InterruptSensitiveRestoreTest {
 				mock(CheckpointResponder.class),
 				new FallbackLibraryCacheManager(),
 				new FileCache(tmpDirectories),
-				new TaskManagerRuntimeInfo(
-						"localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()),
+				new TestingTaskManagerRuntimeInfo(new Configuration(), tmpDirectories),
 				new UnregisteredTaskMetricsGroup(),
 				mock(ResultPartitionConsumableNotifier.class),
 				mock(PartitionStateChecker.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 36ecf59..a73a1e4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -52,6 +52,8 @@ import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -327,7 +329,7 @@ public class StreamMockEnvironment implements Environment {
 
 	@Override
 	public TaskManagerRuntimeInfo getTaskManagerInfo() {
-		return new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir"));
+		return new TestingTaskManagerRuntimeInfo();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index ee0839f..bb246f9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -52,7 +52,7 @@ import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskExecutionStateListener;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -243,7 +243,7 @@ public class StreamTaskTest {
 			mock(CheckpointResponder.class),
 			libCache,
 			mock(FileCache.class),
-			new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")),
+			new TestingTaskManagerRuntimeInfo(taskManagerConfig, new String[] {System.getProperty("java.io.tmpdir")}),
 			new UnregisteredTaskMetricsGroup(),
 			consumableNotifier,
 			partitionStateChecker,