You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/05/06 17:47:55 UTC
[11/12] flink git commit: [hotfix] [runtime] Migrate
NetworkEnvironmentConfiguration to Java
[hotfix] [runtime] Migrate NetworkEnvironmentConfiguration to Java
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/710c08b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/710c08b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/710c08b8
Branch: refs/heads/master
Commit: 710c08b8a6e3b8888308679cf3c16761cffcae9c
Parents: 606c592
Author: Stephan Ewen <se...@apache.org>
Authored: Fri May 5 16:40:08 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat May 6 19:41:53 2017 +0200
----------------------------------------------------------------------
.../taskexecutor/TaskManagerServices.java | 2 +-
.../NetworkEnvironmentConfiguration.java | 206 +++++++++++++++++++
.../NetworkEnvironmentConfiguration.scala | 36 ----
...askManagerComponentsStartupShutdownTest.java | 2 +-
.../runtime/taskmanager/TaskManagerTest.java | 2 +-
5 files changed, 209 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/710c08b8/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index ecf81d9..86a2fdf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -388,7 +388,7 @@ public class TaskManagerServices {
networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
networkEnvironmentConfiguration.partitionRequestMaxBackoff(),
networkEnvironmentConfiguration.networkBuffersPerChannel(),
- networkEnvironmentConfiguration.extraNetworkBuffersPerGate());
+ networkEnvironmentConfiguration.floatingNetworkBuffersPerGate());
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/710c08b8/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
new file mode 100644
index 0000000..193fd90
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+
+import javax.annotation.Nullable;
+
+/**
+ * Configuration object for the network stack.
+ */
+public class NetworkEnvironmentConfiguration {
+
+ private final float networkBufFraction;
+
+ private final long networkBufMin;
+
+ private final long networkBufMax;
+
+ private final int networkBufferSize;
+
+ private final MemoryType memoryType;
+
+ private final IOMode ioMode;
+
+ private final int partitionRequestInitialBackoff;
+
+ private final int partitionRequestMaxBackoff;
+
+ private final int networkBuffersPerChannel;
+
+ private final int floatingNetworkBuffersPerGate;
+
+ private final NettyConfig nettyConfig;
+
+ /**
+ * Constructor for a setup with purely local communication (no netty).
+ */
+ public NetworkEnvironmentConfiguration(
+ float networkBufFraction,
+ long networkBufMin,
+ long networkBufMax,
+ int networkBufferSize,
+ MemoryType memoryType,
+ IOMode ioMode,
+ int partitionRequestInitialBackoff,
+ int partitionRequestMaxBackoff,
+ int networkBuffersPerChannel,
+ int floatingNetworkBuffersPerGate) {
+
+ this(networkBufFraction, networkBufMin, networkBufMax, networkBufferSize,
+ memoryType, ioMode,
+ partitionRequestInitialBackoff, partitionRequestMaxBackoff,
+ networkBuffersPerChannel, floatingNetworkBuffersPerGate,
+ null);
+
+ }
+
+ public NetworkEnvironmentConfiguration(
+ float networkBufFraction,
+ long networkBufMin,
+ long networkBufMax,
+ int networkBufferSize,
+ MemoryType memoryType,
+ IOMode ioMode,
+ int partitionRequestInitialBackoff,
+ int partitionRequestMaxBackoff,
+ int networkBuffersPerChannel,
+ int floatingNetworkBuffersPerGate,
+ @Nullable NettyConfig nettyConfig) {
+
+ this.networkBufFraction = networkBufFraction;
+ this.networkBufMin = networkBufMin;
+ this.networkBufMax = networkBufMax;
+ this.networkBufferSize = networkBufferSize;
+ this.memoryType = memoryType;
+ this.ioMode = ioMode;
+ this.partitionRequestInitialBackoff = partitionRequestInitialBackoff;
+ this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;
+ this.networkBuffersPerChannel = networkBuffersPerChannel;
+ this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;
+ this.nettyConfig = nettyConfig;
+ }
+
+ // ------------------------------------------------------------------------
+
+ public float networkBufFraction() {
+ return networkBufFraction;
+ }
+
+ public long networkBufMin() {
+ return networkBufMin;
+ }
+
+ public long networkBufMax() {
+ return networkBufMax;
+ }
+
+ public int networkBufferSize() {
+ return networkBufferSize;
+ }
+
+ public MemoryType memoryType() {
+ return memoryType;
+ }
+
+ public IOMode ioMode() {
+ return ioMode;
+ }
+
+ public int partitionRequestInitialBackoff() {
+ return partitionRequestInitialBackoff;
+ }
+
+ public int partitionRequestMaxBackoff() {
+ return partitionRequestMaxBackoff;
+ }
+
+ public int networkBuffersPerChannel() {
+ return networkBuffersPerChannel;
+ }
+
+ public int floatingNetworkBuffersPerGate() {
+ return floatingNetworkBuffersPerGate;
+ }
+
+ public NettyConfig nettyConfig() {
+ return nettyConfig;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ int result = 1;
+ result = 31 * result + networkBufferSize;
+ result = 31 * result + memoryType.hashCode();
+ result = 31 * result + ioMode.hashCode();
+ result = 31 * result + partitionRequestInitialBackoff;
+ result = 31 * result + partitionRequestMaxBackoff;
+ result = 31 * result + networkBuffersPerChannel;
+ result = 31 * result + floatingNetworkBuffersPerGate;
+ result = 31 * result + (nettyConfig != null ? nettyConfig.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ else if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ else {
+ final NetworkEnvironmentConfiguration that = (NetworkEnvironmentConfiguration) obj;
+
+ return this.networkBufFraction == that.networkBufFraction &&
+ this.networkBufMin == that.networkBufMin &&
+ this.networkBufMax == that.networkBufMax &&
+ this.networkBufferSize == that.networkBufferSize &&
+ this.partitionRequestInitialBackoff == that.partitionRequestInitialBackoff &&
+ this.partitionRequestMaxBackoff == that.partitionRequestMaxBackoff &&
+ this.networkBuffersPerChannel == that.networkBuffersPerChannel &&
+ this.floatingNetworkBuffersPerGate == that.floatingNetworkBuffersPerGate &&
+ this.memoryType == that.memoryType &&
+ this.ioMode == that.ioMode &&
+ (nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "NetworkEnvironmentConfiguration{" +
+ "networkBufFraction=" + networkBufFraction +
+ ", networkBufMin=" + networkBufMin +
+ ", networkBufMax=" + networkBufMax +
+ ", networkBufferSize=" + networkBufferSize +
+ ", memoryType=" + memoryType +
+ ", ioMode=" + ioMode +
+ ", partitionRequestInitialBackoff=" + partitionRequestInitialBackoff +
+ ", partitionRequestMaxBackoff=" + partitionRequestMaxBackoff +
+ ", networkBuffersPerChannel=" + networkBuffersPerChannel +
+ ", floatingNetworkBuffersPerGate=" + floatingNetworkBuffersPerGate +
+ ", nettyConfig=" + nettyConfig +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/710c08b8/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
deleted file mode 100644
index d74bb3b..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager
-
-import org.apache.flink.core.memory.MemoryType
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
-import org.apache.flink.runtime.io.network.netty.NettyConfig
-
-case class NetworkEnvironmentConfiguration(
- networkBufFraction: Float,
- networkBufMin: Long,
- networkBufMax: Long,
- networkBufferSize: Int,
- memoryType: MemoryType,
- ioMode: IOMode,
- partitionRequestInitialBackoff : Int,
- partitionRequestMaxBackoff : Int,
- networkBuffersPerChannel: Int,
- extraNetworkBuffersPerGate: Int,
- nettyConfig: NettyConfig = null)
http://git-wip-us.apache.org/repos/asf/flink/blob/710c08b8/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 7837b27..2a4c036 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -145,7 +145,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
netConf.partitionRequestInitialBackoff(),
netConf.partitionRequestMaxBackoff(),
netConf.networkBuffersPerChannel(),
- netConf.extraNetworkBuffersPerGate());
+ netConf.floatingNetworkBuffersPerGate());
network.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/710c08b8/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 0f5afc0..e790ea8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -1047,7 +1047,7 @@ public class TaskManagerTest extends TestLogger {
assertEquals(tmConfig.getNetworkConfig().partitionRequestInitialBackoff(), 100);
assertEquals(tmConfig.getNetworkConfig().partitionRequestMaxBackoff(), 200);
assertEquals(tmConfig.getNetworkConfig().networkBuffersPerChannel(), 10);
- assertEquals(tmConfig.getNetworkConfig().extraNetworkBuffersPerGate(), 100);
+ assertEquals(tmConfig.getNetworkConfig().floatingNetworkBuffersPerGate(), 100);
}
/**