You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/03 20:50:26 UTC
[6/8] flink git commit: [FLINK-2425] [runtime] Cleanup code for
forwarding config and hostname into TaskManager's RuntimeEnvironment
[FLINK-2425] [runtime] Cleanup code for forwarding config and hostname into TaskManager's RuntimeEnvironment
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c3ef61de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c3ef61de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c3ef61de
Branch: refs/heads/master
Commit: c3ef61de934a9c447ec442449c527ce719ee46c6
Parents: 5bf2197
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 3 17:41:57 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 3 18:49:45 2015 +0200
----------------------------------------------------------------------
.../runtime/taskmanager/RuntimeEnvironment.java | 12 ++--
.../apache/flink/runtime/taskmanager/Task.java | 8 +--
.../taskmanager/TaskManagerRuntimeInfo.java | 61 ++++++++++++++++++++
.../taskmanager/RuntimeConfiguration.scala | 23 --------
.../flink/runtime/taskmanager/TaskManager.scala | 8 ++-
.../runtime/taskmanager/TaskAsyncCallTest.java | 6 +-
.../flink/runtime/taskmanager/TaskTest.java | 8 +--
7 files changed, 80 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index c0dfee6..cd6dbd6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -75,9 +75,9 @@ public class RuntimeEnvironment implements Environment {
private final AccumulatorRegistry accumulatorRegistry;
- private Configuration taskManagerConfiguration;
+ private final Configuration taskManagerConfiguration;
- private String hostname;
+ private final String hostname;
// ------------------------------------------------------------------------
@@ -95,13 +95,13 @@ public class RuntimeEnvironment implements Environment {
MemoryManager memManager,
IOManager ioManager,
BroadcastVariableManager bcVarManager,
- AccumulatorRegistry accumulatorRegistry,
+ AccumulatorRegistry accumulatorRegistry,
InputSplitProvider splitProvider,
Map<String, Future<Path>> distCacheEntries,
ResultPartitionWriter[] writers,
InputGate[] inputGates,
ActorGateway jobManager,
- RuntimeConfiguration taskManagerConfig) {
+ TaskManagerRuntimeInfo taskManagerInfo) {
checkArgument(parallelism > 0 && subtaskIndex >= 0 && subtaskIndex < parallelism);
@@ -124,8 +124,8 @@ public class RuntimeEnvironment implements Environment {
this.writers = checkNotNull(writers);
this.inputGates = checkNotNull(inputGates);
this.jobManager = checkNotNull(jobManager);
- this.taskManagerConfiguration = checkNotNull(taskManagerConfig).configuration();
- this.hostname = taskManagerConfig.hostname();
+ this.taskManagerConfiguration = checkNotNull(taskManagerInfo).getConfiguration();
+ this.hostname = taskManagerInfo.getHostname();
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 878a69a..36de90a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -145,6 +145,9 @@ public class Task implements Runnable {
/** The name of the class that holds the invokable code */
private final String nameOfInvokableClass;
+ /** Access to task manager configuration and host names*/
+ private final TaskManagerRuntimeInfo taskManagerConfig;
+
/** The memory manager to be used by this task */
private final MemoryManager memoryManager;
@@ -214,9 +217,6 @@ public class Task implements Runnable {
* initialization, to be memory friendly */
private volatile SerializedValue<StateHandle<?>> operatorState;
- /** Access to task manager configuration and host names*/
- private RuntimeConfiguration taskManagerConfig;
-
/**
* <p><b>IMPORTANT:</b> This constructor may not start any work that would need to
* be undone in the case of a failing task deployment.</p>
@@ -231,7 +231,7 @@ public class Task implements Runnable {
FiniteDuration actorAskTimeout,
LibraryCacheManager libraryCache,
FileCache fileCache,
- RuntimeConfiguration taskManagerConfig)
+ TaskManagerRuntimeInfo taskManagerConfig)
{
checkArgument(tdd.getNumberOfSubtasks() > 0);
checkArgument(tdd.getIndexInSubtaskGroup() >= 0);
http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
new file mode 100644
index 0000000..8d06f10
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Encapsulation of TaskManager runtime information, like hostname and configuration.
+ */
+public class TaskManagerRuntimeInfo implements java.io.Serializable {
+
+ private static final long serialVersionUID = 5598219619760274072L;
+
+ /** host name of the interface that the TaskManager uses to communicate */
+ private final String hostname;
+
+ /** configuration that the TaskManager was started with */
+ private final Configuration configuration;
+
+ /**
+ * Creates a runtime info.
+ * @param hostname The host name of the interface that the TaskManager uses to communicate.
+ * @param configuration The configuration that the TaskManager was started with.
+ */
+ public TaskManagerRuntimeInfo(String hostname, Configuration configuration) {
+ this.hostname = hostname;
+ this.configuration = configuration;
+ }
+
+ /**
+ * Gets host name of the interface that the TaskManager uses to communicate.
+ * @return The host name of the interface that the TaskManager uses to communicate.
+ */
+ public String getHostname() {
+ return hostname;
+ }
+
+ /**
+ * Gets the configuration that the TaskManager was started with.
+ * @return The configuration that the TaskManager was started with.
+ */
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
deleted file mode 100644
index ef0e705..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager
-
-import org.apache.flink.configuration.UnmodifiableConfiguration
-
-case class RuntimeConfiguration(hostname: String, configuration: UnmodifiableConfiguration)
http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 3ab271a..cc8b8ba 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -173,6 +173,10 @@ class TaskManager(
private val currentRegistrationSessionID: UUID = UUID.randomUUID()
+ private val runtimeInfo = new TaskManagerRuntimeInfo(
+ connectionInfo.getHostname(),
+ new UnmodifiableConfiguration(config.configuration))
+
// --------------------------------------------------------------------------
// Actor messages and life cycle
// --------------------------------------------------------------------------
@@ -893,9 +897,7 @@ class TaskManager(
config.timeout,
libCache,
fileCache,
- new RuntimeConfiguration(
- self.path.toSerializationFormat,
- new UnmodifiableConfiguration(config.configuration)))
+ runtimeInfo)
log.info(s"Received task ${task.getTaskNameWithSubtasks}")
http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 08f0094..a7d8d8d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.taskmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -129,7 +128,6 @@ public class TaskAsyncCallTest {
}
private static Task createTask() {
-
LibraryCacheManager libCache = mock(LibraryCacheManager.class);
when(libCache.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
@@ -161,9 +159,7 @@ public class TaskAsyncCallTest {
new FiniteDuration(60, TimeUnit.SECONDS),
libCache,
mock(FileCache.class),
- new RuntimeConfiguration(
- taskManagerGateway.path(),
- new UnmodifiableConfiguration(new Configuration())));
+ new TaskManagerRuntimeInfo("localhost", new Configuration()));
}
public static class CheckpointsInOrderInvokable extends AbstractInvokable
http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 6d9df6d..0cba533 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -19,9 +19,8 @@
package org.apache.flink.runtime.taskmanager;
import com.google.common.collect.Maps;
+
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -49,6 +48,7 @@ import org.apache.flink.runtime.messages.TaskMessages;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+
import scala.concurrent.duration.FiniteDuration;
import java.lang.reflect.Field;
@@ -727,9 +727,7 @@ public class TaskTest {
new FiniteDuration(60, TimeUnit.SECONDS),
libCache,
mock(FileCache.class),
- new RuntimeConfiguration(
- taskManagerGateway.path(),
- new UnmodifiableConfiguration(new Configuration())));
+ new TaskManagerRuntimeInfo("localhost", new Configuration()));
}
private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? extends AbstractInvokable> invokable) {