You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/03 20:50:26 UTC

[6/8] flink git commit: [FLINK-2425] [runtime] Cleanup code for forwarding config and hostname into TaskManager's RuntimeEnvironment

[FLINK-2425] [runtime] Cleanup code for forwarding config and hostname into TaskManager's RuntimeEnvironment


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c3ef61de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c3ef61de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c3ef61de

Branch: refs/heads/master
Commit: c3ef61de934a9c447ec442449c527ce719ee46c6
Parents: 5bf2197
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 3 17:41:57 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 3 18:49:45 2015 +0200

----------------------------------------------------------------------
 .../runtime/taskmanager/RuntimeEnvironment.java | 12 ++--
 .../apache/flink/runtime/taskmanager/Task.java  |  8 +--
 .../taskmanager/TaskManagerRuntimeInfo.java     | 61 ++++++++++++++++++++
 .../taskmanager/RuntimeConfiguration.scala      | 23 --------
 .../flink/runtime/taskmanager/TaskManager.scala |  8 ++-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  6 +-
 .../flink/runtime/taskmanager/TaskTest.java     |  8 +--
 7 files changed, 80 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index c0dfee6..cd6dbd6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -75,9 +75,9 @@ public class RuntimeEnvironment implements Environment {
 
 	private final AccumulatorRegistry accumulatorRegistry;
 
-	private Configuration taskManagerConfiguration;
+	private final Configuration taskManagerConfiguration;
 
-	private String hostname;
+	private final String hostname;
 
 	// ------------------------------------------------------------------------
 
@@ -95,13 +95,13 @@ public class RuntimeEnvironment implements Environment {
 			MemoryManager memManager,
 			IOManager ioManager,
 			BroadcastVariableManager bcVarManager,
-								AccumulatorRegistry accumulatorRegistry,
+			AccumulatorRegistry accumulatorRegistry,
 			InputSplitProvider splitProvider,
 			Map<String, Future<Path>> distCacheEntries,
 			ResultPartitionWriter[] writers,
 			InputGate[] inputGates,
 			ActorGateway jobManager,
-			RuntimeConfiguration taskManagerConfig) {
+			TaskManagerRuntimeInfo taskManagerInfo) {
 		
 		checkArgument(parallelism > 0 && subtaskIndex >= 0 && subtaskIndex < parallelism);
 
@@ -124,8 +124,8 @@ public class RuntimeEnvironment implements Environment {
 		this.writers = checkNotNull(writers);
 		this.inputGates = checkNotNull(inputGates);
 		this.jobManager = checkNotNull(jobManager);
-		this.taskManagerConfiguration = checkNotNull(taskManagerConfig).configuration();
-		this.hostname = taskManagerConfig.hostname();
+		this.taskManagerConfiguration = checkNotNull(taskManagerInfo).getConfiguration();
+		this.hostname = taskManagerInfo.getHostname();
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 878a69a..36de90a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -145,6 +145,9 @@ public class Task implements Runnable {
 	/** The name of the class that holds the invokable code */
 	private final String nameOfInvokableClass;
 
+	/** Access to task manager configuration and host names*/
+	private final TaskManagerRuntimeInfo taskManagerConfig;
+	
 	/** The memory manager to be used by this task */
 	private final MemoryManager memoryManager;
 
@@ -214,9 +217,6 @@ public class Task implements Runnable {
 	 * initialization, to be memory friendly */
 	private volatile SerializedValue<StateHandle<?>> operatorState;
 
-	/** Access to task manager configuration and host names*/
-	private RuntimeConfiguration taskManagerConfig;
-
 	/**
 	 * <p><b>IMPORTANT:</b> This constructor may not start any work that would need to 
 	 * be undone in the case of a failing task deployment.</p>
@@ -231,7 +231,7 @@ public class Task implements Runnable {
 				FiniteDuration actorAskTimeout,
 				LibraryCacheManager libraryCache,
 				FileCache fileCache,
-				RuntimeConfiguration taskManagerConfig)
+				TaskManagerRuntimeInfo taskManagerConfig)
 	{
 		checkArgument(tdd.getNumberOfSubtasks() > 0);
 		checkArgument(tdd.getIndexInSubtaskGroup() >= 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
new file mode 100644
index 0000000..8d06f10
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Encapsulation of TaskManager runtime information, like hostname and configuration.
+ */
+public class TaskManagerRuntimeInfo implements java.io.Serializable {
+
+	private static final long serialVersionUID = 5598219619760274072L;
+	
+	/** host name of the interface that the TaskManager uses to communicate */
+	private final String hostname;
+
+	/** configuration that the TaskManager was started with */
+	private final Configuration configuration;
+
+	/**
+	 * Creates a runtime info.
+	 * @param hostname The host name of the interface that the TaskManager uses to communicate.
+	 * @param configuration The configuration that the TaskManager was started with.
+	 */
+	public TaskManagerRuntimeInfo(String hostname, Configuration configuration) {
+		this.hostname = hostname;
+		this.configuration = configuration;
+	}
+
+	/**
+	 * Gets host name of the interface that the TaskManager uses to communicate.
+	 * @return The host name of the interface that the TaskManager uses to communicate.
+	 */
+	public String getHostname() {
+		return hostname;
+	}
+
+	/**
+	 * Gets the configuration that the TaskManager was started with.
+	 * @return The configuration that the TaskManager was started with.
+	 */
+	public Configuration getConfiguration() {
+		return configuration;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
deleted file mode 100644
index ef0e705..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager
-
-import org.apache.flink.configuration.UnmodifiableConfiguration
-
-case class RuntimeConfiguration(hostname: String, configuration: UnmodifiableConfiguration)

http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 3ab271a..cc8b8ba 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -173,6 +173,10 @@ class TaskManager(
 
   private val currentRegistrationSessionID: UUID = UUID.randomUUID()
 
+  private val runtimeInfo = new TaskManagerRuntimeInfo(
+       connectionInfo.getHostname(),
+       new UnmodifiableConfiguration(config.configuration))
+
   // --------------------------------------------------------------------------
   //  Actor messages and life cycle
   // --------------------------------------------------------------------------
@@ -893,9 +897,7 @@ class TaskManager(
         config.timeout,
         libCache,
         fileCache,
-        new RuntimeConfiguration(
-          self.path.toSerializationFormat,
-          new UnmodifiableConfiguration(config.configuration)))
+        runtimeInfo)
 
       log.info(s"Received task ${task.getTaskNameWithSubtasks}")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 08f0094..a7d8d8d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -129,7 +128,6 @@ public class TaskAsyncCallTest {
 	}
 	
 	private static Task createTask() {
-		
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
 		
@@ -161,9 +159,7 @@ public class TaskAsyncCallTest {
 				new FiniteDuration(60, TimeUnit.SECONDS),
 				libCache,
 				mock(FileCache.class),
-				new RuntimeConfiguration(
-						taskManagerGateway.path(),
-						new UnmodifiableConfiguration(new Configuration())));
+				new TaskManagerRuntimeInfo("localhost", new Configuration()));
 	}
 	
 	public static class CheckpointsInOrderInvokable extends AbstractInvokable

http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 6d9df6d..0cba533 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -19,9 +19,8 @@
 package org.apache.flink.runtime.taskmanager;
 
 import com.google.common.collect.Maps;
+
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -49,6 +48,7 @@ import org.apache.flink.runtime.messages.TaskMessages;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+
 import scala.concurrent.duration.FiniteDuration;
 
 import java.lang.reflect.Field;
@@ -727,9 +727,7 @@ public class TaskTest {
 				new FiniteDuration(60, TimeUnit.SECONDS),
 				libCache,
 				mock(FileCache.class),
-				new RuntimeConfiguration(
-						taskManagerGateway.path(),
-						new UnmodifiableConfiguration(new Configuration())));
+				new TaskManagerRuntimeInfo("localhost", new Configuration()));
 	}
 
 	private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? extends AbstractInvokable> invokable) {