You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/12 09:05:11 UTC
flink git commit: [FLINK-4746] Make TaskManagerRuntimeInfo an
interface
Repository: flink
Updated Branches:
refs/heads/flip-6 8c656d925 -> 64ee13862
[FLINK-4746] Make TaskManagerRuntimeInfo an interface
Let the TaskManagerConfiguration implement the TaskManagerRuntimeInformation to make some of
the TaskManager's configuration values accessible from different components.
This closes #2599.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/64ee1386
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/64ee1386
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/64ee1386
Branch: refs/heads/flip-6
Commit: 64ee1386294cde6c61c8ee15c5e1d1ad018dcc46
Parents: 8c656d9
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Oct 5 14:47:24 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Oct 12 11:04:34 2016 +0200
----------------------------------------------------------------------
.../runtime/taskexecutor/TaskExecutor.java | 11 +---
.../taskexecutor/TaskManagerConfiguration.java | 22 +++----
.../taskmanager/TaskManagerRuntimeInfo.java | 61 ++------------------
.../flink/runtime/taskmanager/TaskManager.scala | 11 +---
.../operators/drivers/TestTaskContext.java | 4 +-
.../testutils/BinaryOperatorTestBase.java | 4 +-
.../operators/testutils/DriverTestBase.java | 4 +-
.../operators/testutils/MockEnvironment.java | 10 +---
.../testutils/UnaryOperatorTestBase.java | 4 +-
.../runtime/taskexecutor/TaskExecutorTest.java | 8 +--
.../runtime/taskmanager/TaskAsyncCallTest.java | 3 +-
.../flink/runtime/taskmanager/TaskTest.java | 3 +-
.../util/TestingTaskManagerRuntimeInfo.java | 52 +++++++++++++++++
.../tasks/InterruptSensitiveRestoreTest.java | 5 +-
.../runtime/tasks/StreamMockEnvironment.java | 6 +-
.../streaming/runtime/tasks/StreamTaskTest.java | 4 +-
16 files changed, 95 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 35b639b..a2716e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.taskexecutor;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -71,7 +70,6 @@ import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.util.Preconditions;
import java.util.HashSet;
@@ -127,9 +125,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
private final FileCache fileCache;
- // TODO: Try to get rid of it
- private final TaskManagerRuntimeInfo taskManagerRuntimeInfo;
-
// --------- resource manager --------
private TaskExecutorToResourceManagerConnection resourceManagerConnection;
@@ -177,10 +172,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup);
this.broadcastVariableManager = checkNotNull(broadcastVariableManager);
this.fileCache = checkNotNull(fileCache);
- this.taskManagerRuntimeInfo = new TaskManagerRuntimeInfo(
- taskManagerLocation.getHostname(),
- new UnmodifiableConfiguration(taskManagerConfiguration.getConfiguration()),
- taskManagerConfiguration.getTmpDirPaths());
this.jobManagerConnections = new HashMap<>(4);
@@ -308,7 +299,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
checkpointResponder,
libraryCache,
fileCache,
- taskManagerRuntimeInfo,
+ taskManagerConfiguration,
taskMetricGroup,
resultPartitionConsumableNotifier,
partitionStateChecker,
http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index bce3dc3..1d1e732 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,13 +34,13 @@ import java.io.File;
/**
* Configuration object for {@link TaskExecutor}.
*/
-public class TaskManagerConfiguration {
+public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
private static final Logger LOG = LoggerFactory.getLogger(TaskManagerConfiguration.class);
private final int numberSlots;
- private final String[] tmpDirPaths;
+ private final String[] tmpDirectories;
private final Time timeout;
// null indicates an infinite duration
@@ -50,12 +51,11 @@ public class TaskManagerConfiguration {
private final long cleanupInterval;
- // TODO: remove necessity for complete configuration object
- private final Configuration configuration;
+ private final UnmodifiableConfiguration configuration;
public TaskManagerConfiguration(
int numberSlots,
- String[] tmpDirPaths,
+ String[] tmpDirectories,
Time timeout,
Time maxRegistrationDuration,
Time initialRegistrationPause,
@@ -65,7 +65,7 @@ public class TaskManagerConfiguration {
Configuration configuration) {
this.numberSlots = numberSlots;
- this.tmpDirPaths = Preconditions.checkNotNull(tmpDirPaths);
+ this.tmpDirectories = Preconditions.checkNotNull(tmpDirectories);
this.timeout = Preconditions.checkNotNull(timeout);
this.maxRegistrationDuration = maxRegistrationDuration;
this.initialRegistrationPause = Preconditions.checkNotNull(initialRegistrationPause);
@@ -79,10 +79,6 @@ public class TaskManagerConfiguration {
return numberSlots;
}
- public String[] getTmpDirPaths() {
- return tmpDirPaths;
- }
-
public Time getTimeout() {
return timeout;
}
@@ -107,10 +103,16 @@ public class TaskManagerConfiguration {
return cleanupInterval;
}
+ @Override
public Configuration getConfiguration() {
return configuration;
}
+ @Override
+ public String[] getTmpDirectories() {
+ return tmpDirectories;
+ }
+
// --------------------------------------------------------------------------------------------
// Static factory methods
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
index 9ac982e..d1efe34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
@@ -20,71 +20,22 @@ package org.apache.flink.runtime.taskmanager;
import org.apache.flink.configuration.Configuration;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkArgument;
-
/**
- * Encapsulation of TaskManager runtime information, like hostname and configuration.
+ * Interface to access {@link TaskManager} information.
*/
-public class TaskManagerRuntimeInfo implements java.io.Serializable {
-
- private static final long serialVersionUID = 5598219619760274072L;
-
- /** host name of the interface that the TaskManager uses to communicate */
- private final String hostname;
-
- /** configuration that the TaskManager was started with */
- private final Configuration configuration;
-
- /** list of temporary file directories */
- private final String[] tmpDirectories;
-
- /**
- * Creates a runtime info.
- *
- * @param hostname The host name of the interface that the TaskManager uses to communicate.
- * @param configuration The configuration that the TaskManager was started with.
- * @param tmpDirectory The temporary file directory.
- */
- public TaskManagerRuntimeInfo(String hostname, Configuration configuration, String tmpDirectory) {
- this(hostname, configuration, new String[] { tmpDirectory });
- }
-
- /**
- * Creates a runtime info.
- * @param hostname The host name of the interface that the TaskManager uses to communicate.
- * @param configuration The configuration that the TaskManager was started with.
- * @param tmpDirectories The list of temporary file directories.
- */
- public TaskManagerRuntimeInfo(String hostname, Configuration configuration, String[] tmpDirectories) {
- checkArgument(tmpDirectories.length > 0);
- this.hostname = checkNotNull(hostname);
- this.configuration = checkNotNull(configuration);
- this.tmpDirectories = tmpDirectories;
-
- }
-
- /**
- * Gets host name of the interface that the TaskManager uses to communicate.
- * @return The host name of the interface that the TaskManager uses to communicate.
- */
- public String getHostname() {
- return hostname;
- }
+public interface TaskManagerRuntimeInfo {
/**
* Gets the configuration that the TaskManager was started with.
+ *
* @return The configuration that the TaskManager was started with.
*/
- public Configuration getConfiguration() {
- return configuration;
- }
+ Configuration getConfiguration();
/**
* Gets the list of temporary file directories.
+ *
* @return The list of temporary file directories.
*/
- public String[] getTmpDirectories() {
- return tmpDirectories;
- }
+ String[] getTmpDirectories();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index da8c14e..26e13ba 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -149,7 +149,7 @@ class TaskManager(
protected val bcVarManager = new BroadcastVariableManager()
/** Handler for distributed files cached by this TaskManager */
- protected val fileCache = new FileCache(config.getTmpDirPaths())
+ protected val fileCache = new FileCache(config.getTmpDirectories())
/** Registry of metrics periodically transmitted to the JobManager */
private val metricRegistry = TaskManager.createMetricsRegistry()
@@ -183,11 +183,6 @@ class TaskManager(
var leaderSessionID: Option[UUID] = None
- private val runtimeInfo = new TaskManagerRuntimeInfo(
- location.getHostname(),
- new UnmodifiableConfiguration(config.getConfiguration()),
- config.getTmpDirPaths())
-
private var scheduledTaskManagerRegistration: Option[Cancellable] = None
private var currentRegistrationRun: UUID = UUID.randomUUID()
@@ -995,7 +990,7 @@ class TaskManager(
}
taskManagerMetricGroup =
- new TaskManagerMetricGroup(metricsRegistry, this.runtimeInfo.getHostname, id.toString)
+ new TaskManagerMetricGroup(metricsRegistry, location.getHostname, id.toString)
TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, network)
@@ -1179,7 +1174,7 @@ class TaskManager(
checkpointResponder,
libCache,
fileCache,
- runtimeInfo,
+ config,
taskMetricGroup,
resultPartitionConsumableNotifier,
partitionStateChecker,
http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index 62110a7..d34bb40 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.operators.TaskContext;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
@@ -74,8 +75,7 @@ public class TestTaskContext<S, T> implements TaskContext<S, T> {
public TestTaskContext(long memoryInBytes) {
this.memoryManager = new MemoryManager(memoryInBytes, 1, 32 * 1024, MemoryType.HEAP, true);
- this.taskManageInfo = new TaskManagerRuntimeInfo(
- "localhost", new Configuration(), System.getProperty("java.io.tmpdir"));
+ this.taskManageInfo = new TestingTaskManagerRuntimeInfo();
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
index 75f960e..3d4c45f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.operators.TaskContext;
import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.TestLogger;
@@ -110,8 +111,7 @@ public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLog
this.owner = new DummyInvokable();
this.taskConfig = new TaskConfig(new Configuration());
this.executionConfig = executionConfig;
- this.taskManageInfo = new TaskManagerRuntimeInfo(
- "localhost", new Configuration(), System.getProperty("java.io.tmpdir"));
+ this.taskManageInfo = new TestingTaskManagerRuntimeInfo();
}
@Parameterized.Parameters
http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 088435a..f43632c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
@@ -115,8 +116,7 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Ta
this.owner = new DummyInvokable();
this.taskConfig = new TaskConfig(new Configuration());
this.executionConfig = executionConfig;
- this.taskManageInfo = new TaskManagerRuntimeInfo(
- "localhost", new Configuration(), System.getProperty("java.io.tmpdir"));
+ this.taskManageInfo = new TestingTaskManagerRuntimeInfo();
}
@Parameterized.Parameters
http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index c3ed6c0..d2d4094 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
@@ -45,12 +44,10 @@ import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.CheckpointStateHandles;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.types.Record;
import org.apache.flink.util.MutableObjectIterator;
import org.mockito.invocation.InvocationOnMock;
@@ -236,10 +233,7 @@ public class MockEnvironment implements Environment {
@Override
public TaskManagerRuntimeInfo getTaskManagerInfo() {
- return new TaskManagerRuntimeInfo(
- "localhost",
- new UnmodifiableConfiguration(new Configuration()),
- System.getProperty("java.io.tmpdir"));
+ return new TestingTaskManagerRuntimeInfo();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
index a94e694..85137cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.operators.ResettableDriver;
import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
@@ -115,8 +116,7 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogg
this.executionConfig = executionConfig;
this.comparators = new ArrayList<TypeComparator<IN>>(2);
- this.taskManageInfo = new TaskManagerRuntimeInfo(
- "localhost", new Configuration(), System.getProperty("java.io.tmpdir"));
+ this.taskManageInfo = new TestingTaskManagerRuntimeInfo();
}
@Parameterized.Parameters
http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index f5fe52c..ecbd9b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -63,14 +63,11 @@ public class TaskExecutorTest extends TestLogger {
ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
- PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration());
- PowerMockito.when(taskManagerServicesConfiguration.getTmpDirPaths()).thenReturn(new String[1]);
rpc.registerGateway(resourceManagerAddress, rmGateway);
TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
- when(taskManagerLocation.getHostname()).thenReturn("foobar");
NonHaServices haServices = new NonHaServices(resourceManagerAddress);
@@ -124,7 +121,7 @@ public class TaskExecutorTest extends TestLogger {
TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration());
- PowerMockito.when(taskManagerServicesConfiguration.getTmpDirPaths()).thenReturn(new String[1]);
+ PowerMockito.when(taskManagerServicesConfiguration.getTmpDirectories()).thenReturn(new String[1]);
TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
@@ -198,12 +195,9 @@ public class TaskExecutorTest extends TestLogger {
TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
- PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration());
- PowerMockito.when(taskManagerServicesConfiguration.getTmpDirPaths()).thenReturn(new String[1]);
TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
- when(taskManagerLocation.getHostname()).thenReturn("foobar");
TaskExecutor taskManager = new TaskExecutor(
taskManagerServicesConfiguration,
http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index aea5294..090880f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -49,6 +49,7 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.util.SerializedValue;
import org.junit.Before;
import org.junit.Test;
@@ -179,7 +180,7 @@ public class TaskAsyncCallTest {
mock(CheckpointResponder.class),
libCache,
mock(FileCache.class),
- new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
+ new TestingTaskManagerRuntimeInfo(),
mock(TaskMetricGroup.class),
consumableNotifier,
partitionStateChecker,
http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index fe618ff..50fc181 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.util.SerializedValue;
import org.junit.After;
import org.junit.Before;
@@ -648,7 +649,7 @@ public class TaskTest {
checkpointResponder,
libCache,
mock(FileCache.class),
- new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
+ new TestingTaskManagerRuntimeInfo(),
mock(TaskMetricGroup.class),
consumableNotifier,
partitionStateChecker,
http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java
new file mode 100644
index 0000000..e56da97
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+
+import java.io.File;
+
+/**
+ * TaskManagerRuntimeInfo implementation for testing purposes
+ */
+public class TestingTaskManagerRuntimeInfo implements TaskManagerRuntimeInfo {
+
+ private final Configuration configuration;
+ private final String[] tmpDirectories;
+
+ public TestingTaskManagerRuntimeInfo() {
+ this(new Configuration(), System.getProperty("java.io.tmpdir").split(",|" + File.pathSeparator));
+ }
+
+ public TestingTaskManagerRuntimeInfo(Configuration configuration, String[] tmpDirectories) {
+ this.configuration = configuration;
+ this.tmpDirectories = tmpDirectories;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ @Override
+ public String[] getTmpDirectories() {
+ return tmpDirectories;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index ffda126..fb1b3b3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -48,8 +48,8 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -167,8 +167,7 @@ public class InterruptSensitiveRestoreTest {
mock(CheckpointResponder.class),
new FallbackLibraryCacheManager(),
new FileCache(tmpDirectories),
- new TaskManagerRuntimeInfo(
- "localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()),
+ new TestingTaskManagerRuntimeInfo(new Configuration(), tmpDirectories),
new UnregisteredTaskMetricsGroup(),
mock(ResultPartitionConsumableNotifier.class),
mock(PartitionStateChecker.class),
http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 9b773d8..6f9d8dd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -49,12 +49,10 @@ import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.CheckpointStateHandles;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -336,7 +334,7 @@ public class StreamMockEnvironment implements Environment {
@Override
public TaskManagerRuntimeInfo getTaskManagerInfo() {
- return new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir"));
+ return new TestingTaskManagerRuntimeInfo();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 5a8ca04..205fba0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -51,7 +51,7 @@ import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskExecutionStateListener;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -221,7 +221,7 @@ public class StreamTaskTest {
mock(CheckpointResponder.class),
libCache,
mock(FileCache.class),
- new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
+ new TestingTaskManagerRuntimeInfo(),
mock(TaskMetricGroup.class),
consumableNotifier,
partitionStateChecker,