You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/05/06 17:47:55 UTC

[11/12] flink git commit: [hotfix] [runtime] Migrate NetworkEnvironmentConfiguration to Java

[hotfix] [runtime] Migrate NetworkEnvironmentConfiguration to Java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/710c08b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/710c08b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/710c08b8

Branch: refs/heads/master
Commit: 710c08b8a6e3b8888308679cf3c16761cffcae9c
Parents: 606c592
Author: Stephan Ewen <se...@apache.org>
Authored: Fri May 5 16:40:08 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat May 6 19:41:53 2017 +0200

----------------------------------------------------------------------
 .../taskexecutor/TaskManagerServices.java       |   2 +-
 .../NetworkEnvironmentConfiguration.java        | 206 +++++++++++++++++++
 .../NetworkEnvironmentConfiguration.scala       |  36 ----
 ...askManagerComponentsStartupShutdownTest.java |   2 +-
 .../runtime/taskmanager/TaskManagerTest.java    |   2 +-
 5 files changed, 209 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/710c08b8/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index ecf81d9..86a2fdf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -388,7 +388,7 @@ public class TaskManagerServices {
 			networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
 			networkEnvironmentConfiguration.partitionRequestMaxBackoff(),
 			networkEnvironmentConfiguration.networkBuffersPerChannel(),
-			networkEnvironmentConfiguration.extraNetworkBuffersPerGate());
+			networkEnvironmentConfiguration.floatingNetworkBuffersPerGate());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/710c08b8/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
new file mode 100644
index 0000000..193fd90
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+
+import javax.annotation.Nullable;
+
+/**
+ * Configuration object for the network stack.
+ */
+public class NetworkEnvironmentConfiguration {
+
+	private final float networkBufFraction;
+
+	private final long networkBufMin;
+
+	private final long networkBufMax;
+
+	private final int networkBufferSize;
+
+	private final MemoryType memoryType;
+
+	private final IOMode ioMode;
+
+	private final int partitionRequestInitialBackoff;
+
+	private final int partitionRequestMaxBackoff;
+
+	private final int networkBuffersPerChannel;
+
+	private final int floatingNetworkBuffersPerGate;
+
+	private final NettyConfig nettyConfig;
+
+	/**
+	 * Constructor for a setup with purely local communication (no netty).
+	 */
+	public NetworkEnvironmentConfiguration(
+			float networkBufFraction,
+			long networkBufMin,
+			long networkBufMax,
+			int networkBufferSize,
+			MemoryType memoryType,
+			IOMode ioMode,
+			int partitionRequestInitialBackoff,
+			int partitionRequestMaxBackoff,
+			int networkBuffersPerChannel,
+			int floatingNetworkBuffersPerGate) {
+
+		this(networkBufFraction, networkBufMin, networkBufMax, networkBufferSize,
+				memoryType, ioMode,
+				partitionRequestInitialBackoff, partitionRequestMaxBackoff,
+				networkBuffersPerChannel, floatingNetworkBuffersPerGate,
+				null);
+		
+	}
+
+	public NetworkEnvironmentConfiguration(
+			float networkBufFraction,
+			long networkBufMin,
+			long networkBufMax,
+			int networkBufferSize,
+			MemoryType memoryType,
+			IOMode ioMode,
+			int partitionRequestInitialBackoff,
+			int partitionRequestMaxBackoff,
+			int networkBuffersPerChannel,
+			int floatingNetworkBuffersPerGate,
+			@Nullable NettyConfig nettyConfig) {
+
+		this.networkBufFraction = networkBufFraction;
+		this.networkBufMin = networkBufMin;
+		this.networkBufMax = networkBufMax;
+		this.networkBufferSize = networkBufferSize;
+		this.memoryType = memoryType;
+		this.ioMode = ioMode;
+		this.partitionRequestInitialBackoff = partitionRequestInitialBackoff;
+		this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;
+		this.networkBuffersPerChannel = networkBuffersPerChannel;
+		this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;
+		this.nettyConfig = nettyConfig;
+	}
+
+	// ------------------------------------------------------------------------
+
+	public float networkBufFraction() {
+		return networkBufFraction;
+	}
+
+	public long networkBufMin() {
+		return networkBufMin;
+	}
+
+	public long networkBufMax() {
+		return networkBufMax;
+	}
+
+	public int networkBufferSize() {
+		return networkBufferSize;
+	}
+
+	public MemoryType memoryType() {
+		return memoryType;
+	}
+
+	public IOMode ioMode() {
+		return ioMode;
+	}
+
+	public int partitionRequestInitialBackoff() {
+		return partitionRequestInitialBackoff;
+	}
+
+	public int partitionRequestMaxBackoff() {
+		return partitionRequestMaxBackoff;
+	}
+
+	public int networkBuffersPerChannel() {
+		return networkBuffersPerChannel;
+	}
+
+	public int floatingNetworkBuffersPerGate() {
+		return floatingNetworkBuffersPerGate;
+	}
+
+	public NettyConfig nettyConfig() {
+		return nettyConfig;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		int result = 1;
+		result = 31 * result + networkBufferSize;
+		result = 31 * result + memoryType.hashCode();
+		result = 31 * result + ioMode.hashCode();
+		result = 31 * result + partitionRequestInitialBackoff;
+		result = 31 * result + partitionRequestMaxBackoff;
+		result = 31 * result + networkBuffersPerChannel;
+		result = 31 * result + floatingNetworkBuffersPerGate;
+		result = 31 * result + (nettyConfig != null ? nettyConfig.hashCode() : 0);
+		return result;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (this == obj) {
+			return true;
+		}
+		else if (obj == null || getClass() != obj.getClass()) {
+			return false;
+		}
+		else {
+			final NetworkEnvironmentConfiguration that = (NetworkEnvironmentConfiguration) obj;
+
+			return this.networkBufFraction == that.networkBufFraction &&
+					this.networkBufMin == that.networkBufMin &&
+					this.networkBufMax == that.networkBufMax &&
+					this.networkBufferSize == that.networkBufferSize &&
+					this.partitionRequestInitialBackoff == that.partitionRequestInitialBackoff &&
+					this.partitionRequestMaxBackoff == that.partitionRequestMaxBackoff &&
+					this.networkBuffersPerChannel == that.networkBuffersPerChannel &&
+					this.floatingNetworkBuffersPerGate == that.floatingNetworkBuffersPerGate &&
+					this.memoryType == that.memoryType &&
+					this.ioMode == that.ioMode && 
+					(nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null);
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "NetworkEnvironmentConfiguration{" +
+				"networkBufFraction=" + networkBufFraction +
+				", networkBufMin=" + networkBufMin +
+				", networkBufMax=" + networkBufMax +
+				", networkBufferSize=" + networkBufferSize +
+				", memoryType=" + memoryType +
+				", ioMode=" + ioMode +
+				", partitionRequestInitialBackoff=" + partitionRequestInitialBackoff +
+				", partitionRequestMaxBackoff=" + partitionRequestMaxBackoff +
+				", networkBuffersPerChannel=" + networkBuffersPerChannel +
+				", floatingNetworkBuffersPerGate=" + floatingNetworkBuffersPerGate +
+				", nettyConfig=" + nettyConfig +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/710c08b8/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
deleted file mode 100644
index d74bb3b..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager
-
-import org.apache.flink.core.memory.MemoryType
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
-import org.apache.flink.runtime.io.network.netty.NettyConfig
-
-case class NetworkEnvironmentConfiguration(
-    networkBufFraction: Float,
-    networkBufMin: Long,
-    networkBufMax: Long,
-    networkBufferSize: Int,
-    memoryType: MemoryType,
-    ioMode: IOMode,
-    partitionRequestInitialBackoff : Int,
-    partitionRequestMaxBackoff : Int,
-    networkBuffersPerChannel: Int,
-    extraNetworkBuffersPerGate: Int,
-    nettyConfig: NettyConfig = null)

http://git-wip-us.apache.org/repos/asf/flink/blob/710c08b8/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 7837b27..2a4c036 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -145,7 +145,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
 				netConf.partitionRequestInitialBackoff(),
 				netConf.partitionRequestMaxBackoff(),
 				netConf.networkBuffersPerChannel(),
-				netConf.extraNetworkBuffersPerGate());
+				netConf.floatingNetworkBuffersPerGate());
 
 			network.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/710c08b8/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 0f5afc0..e790ea8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -1047,7 +1047,7 @@ public class TaskManagerTest extends TestLogger {
 		assertEquals(tmConfig.getNetworkConfig().partitionRequestInitialBackoff(), 100);
 		assertEquals(tmConfig.getNetworkConfig().partitionRequestMaxBackoff(), 200);
 		assertEquals(tmConfig.getNetworkConfig().networkBuffersPerChannel(), 10);
-		assertEquals(tmConfig.getNetworkConfig().extraNetworkBuffersPerGate(), 100);
+		assertEquals(tmConfig.getNetworkConfig().floatingNetworkBuffersPerGate(), 100);
 	}
 
 	/**