You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by NicoK <gi...@git.apache.org> on 2017/04/13 12:50:00 UTC

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

GitHub user NicoK opened a pull request:

    https://github.com/apache/flink/pull/3721

    [FLINK-4545] replace the network buffers parameter

    (based on #3708 and #3713)
    
    Instead, allow the configuration with the following three new (more flexible) parameters:
    * `taskmanager.network.memory.fraction`: fraction of JVM memory to use for network buffers (default: 0.1)
    * `taskmanager.network.memory.min`: minimum memory size for network buffers (default: 64 MB)
    * `taskmanager.network.memory.max`: maximum memory size for network buffers (default: 1 GB)
    
    Note that I needed to adapt two unit tests which would have been killed on Travis CI because these defaults result in ~150MB memory being used for network buffers which apparently was too much there.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/NicoK/flink flink-4545

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3721.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3721
    
----
commit e61f7bc4debce332c421cb645ff1025b4d03d8d0
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-04-11T09:26:29Z

    [FLINK-6292] fix transfer.sh upload by using https
    
    Seems the upload via http is not supported anymore.

commit 362ceec0823b179719449d0ed244c591dfcf51f4
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-04-12T09:09:03Z

    [FLINK-6299] make all IT cases extend from TestLogger
    
    This way, currently executed tests and their failures are properly logged.

commit 973099ef55701fe63951639d37b4f01765b06a01
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-04-06T12:41:52Z

    [FLINK-4545] replace the network buffers parameter
    
    Instead, allow the configuration with the following three new (more flexible)
    parameters:
     * "taskmanager.network.memory.fraction": fraction of JVM memory to use for network buffers (default: 0.1)
     * "taskmanager.network.memory.min": minimum memory size for network buffers (default: 64 MB)
     * "taskmanager.network.memory.max": maximum memory size for network buffers (default: 1 GB)
    
     # Please enter the commit message for your changes. Lines starting

commit 09a981189b59ac13bd39000cc77913c0b03289fd
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-04-11T12:20:40Z

    [hotfix] fix typo in error message

commit 0960a809c8da51b9787f3f726945716933051fc3
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-04-11T13:29:41Z

    [hotfix] fix typo in taskmanager.sh usage string

commit 298bb69451a1405df774451de11eb5684534c956
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-04-06T15:58:14Z

    [FLINK-4545] adapt taskmanager.sh to take network buffers memory into account

commit ea2fb24f4a6eb18cc3f8d3ebd83a49c0f1386a8a
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-04-10T09:43:50Z

    [FLINK-4545] add configuration checks for the new network buffer memory config

commit 5133d250c4dba4a5e72baad95c841d2b03cb49ea
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-04-10T16:22:10Z

    [FLINK-4545] add unit tests using the new network configuration parameters and methods

commit a24a548e6ff7e36581f7f7457099656362ca3974
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-04-11T16:52:56Z

    [FLINK-4545] add unit tests for heap size calculation in shell scripts
    
    These verify that the results are the same as in the calculation done by Java.

commit d55153d559bf110a931b5de849df812038ba4a7a
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-04-12T16:11:37Z

    [FLINK-4545] update the docs with the changed network buffer parameter
    
    Also update the descriptions of taskmanager.memory.fraction not being relative
    to the full size of taskmanager.heap.mb but that network buffer memory is
    subtracted before!

commit c48beb0d67e8ef847ef845835e342d4a49127e7d
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-04-12T16:25:27Z

    [FLINK-4545] fix some tests being killed on Travis CI
    
    Due to the increased defaults for network buffer memory use, some builds on
    Travis CI fail with unit tests being killed. This affects
    * RocksDbBackendEventTimeWindowCheckpointingITCase and
    * HBaseConnectorITCase
    
    We fix this by limiting the maximum amount of network buffer memory to 80MB
    (current defaults would yield 150MB, previously 64MB were used).

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3721#discussion_r112000165
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java ---
    @@ -0,0 +1,272 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.taskexecutor;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.TaskManagerOptions;
    +import org.apache.flink.core.memory.MemoryType;
    +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
    +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
    +import org.apache.flink.runtime.util.EnvironmentInformation;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.powermock.api.mockito.PowerMockito;
    +import org.powermock.core.classloader.annotations.PrepareForTest;
    +import org.powermock.modules.junit4.PowerMockRunner;
    +
    +import java.net.InetAddress;
    +import java.util.Random;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Unit test for {@link TaskManagerServices}.
    + */
    +@RunWith(PowerMockRunner.class)
    +@PrepareForTest(EnvironmentInformation.class)
    +public class TaskManagerServicesTest {
    +
    +	/**
    +	 * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old
    +	 * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
    +	 */
    +	@SuppressWarnings("deprecation")
    +	@Test
    +	public void calculateNetworkBufOld() throws Exception {
    +		Configuration config = new Configuration();
    +		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
    +
    +		// note: actual network buffer memory size is independent of the totalJavaMemorySize
    +		assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
    +			TaskManagerServices.calculateNetworkBuf(10L << 20, config));
    +		assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
    +			TaskManagerServices.calculateNetworkBuf(64L << 20, config));
    +
    +		// test integer overflow in the memory size
    +		int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33
    +		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers);
    +		assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config));
    +	}
    +
    +	/**
    +	 * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using new
    +	 * configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
    +	 * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
    +	 * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
    +	 */
    +	@Test
    +	public void calculateNetworkBufNew() throws Exception {
    +		Configuration config = new Configuration();
    +
    +		// (1) defaults
    +		final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
    +		final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
    +		final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
    +		assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20)))),
    +			TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config));
    +		assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30)))),
    +			TaskManagerServices.calculateNetworkBuf((10L << 30), config));
    +
    +		calculateNetworkBufNew(config);
    +	}
    +
    +	/**
    +	 * Helper to test {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} with the
    +	 * new configuration parameters.
    +	 *
    +	 * @param config configuration object
    +	 */
    +	private static void calculateNetworkBufNew(final Configuration config) {
    +		// (2) fixed size memory
    +		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1L << 20); // 1MB
    +		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 20); // 1MB
    +
    +		// note: actual network buffer memory size is independent of the totalJavaMemorySize
    +		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(10L << 20, config));
    +		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(64L << 20, config));
    +		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(1L << 30, config));
    +
    +		// (3) random fraction, min, and max values
    +		Random ran = new Random();
    +		for (int i = 0; i < 1_000; ++i){
    +			float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
    +			config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac);
    +
    +			long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong());
    +			config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, min);
    +
    +			long max = Math.max(min, ran.nextLong());
    +			config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, max);
    +
    +			long javaMem = Math.max(max + 1, ran.nextLong());
    +
    +			final long networkBufMem = TaskManagerServices.calculateNetworkBuf(javaMem, config);
    +			assertTrue(networkBufMem >= min);
    +			assertTrue(networkBufMem <= max);
    +			if (networkBufMem > min && networkBufMem < max) {
    +				assertEquals((long) (javaMem * frac), networkBufMem);
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using mixed
    +	 * old/new configurations.
    +	 */
    +	@SuppressWarnings("deprecation")
    +	@Test
    +	public void calculateNetworkBufMixed() throws Exception {
    +		Configuration config = new Configuration();
    +		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
    +
    +		final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
    +		final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
    +		final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
    +
    +		// old + 1 new parameter = new:
    +		Configuration config1 = config.clone();
    +		config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
    +		assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (0.1f * (10L << 20)))),
    +			TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config1));
    +		assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (0.1f * (10L << 30)))),
    +			TaskManagerServices.calculateNetworkBuf((10L << 30), config1));
    +
    +		config1 = config.clone();
    +		long newMin = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(); // smallest value possible
    +		config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, newMin);
    +		assertEquals(Math.min(defaultMax, Math.max(newMin, (long) (defaultFrac * (10L << 20)))),
    +			TaskManagerServices.calculateNetworkBuf((10L << 20), config1));
    +		assertEquals(Math.min(defaultMax, Math.max(newMin, (long) (defaultFrac * (10L << 30)))),
    +			TaskManagerServices.calculateNetworkBuf((10L << 30), config1));
    +
    +		config1 = config.clone();
    +		long newMax = Math.max(64L << 20 + 1, TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue());
    +		config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, newMax);
    +		assertEquals(Math.min(newMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20)))),
    +			TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config1));
    +		assertEquals(Math.min(newMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30)))),
    +			TaskManagerServices.calculateNetworkBuf((10L << 30), config1));
    +		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config1));
    +
    +		// old + any new parameter = new:
    +		calculateNetworkBufNew(config);
    +	}
    +
    +	/**
    +	 * Test for {@link TaskManagerServices#calculateNetworkBuf(TaskManagerServicesConfiguration)}
    +	 * using the same (manual) test cases as in {@link #calculateHeapSizeMB()}.
    +	 */
    +	@Test
    +	public void calculateNetworkBufFromHeapSize() throws Exception {
    +		PowerMockito.mockStatic(EnvironmentInformation.class);
    +		// some defaults:
    +		when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L << 20); // 1000MB
    +		when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(1000L << 20); // 1000MB
    +
    +		TaskManagerServicesConfiguration tmConfig;
    +
    +		tmConfig = getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(),
    +			TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
    +			0.1f, 60L << 20, 1L << 30, MemoryType.HEAP);
    +		when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L << 20); // 1000MB
    +		assertEquals(100L << 20, TaskManagerServices.calculateNetworkBuf(tmConfig));
    +
    +		tmConfig = getTmConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
    +			0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP);
    +		when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(890L << 20); // 890MB
    +		assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
    +			TaskManagerServices.calculateNetworkBuf(tmConfig));
    +
    +		tmConfig = getTmConfig(-1, 0.1f,
    +			0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP);
    +		when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(810L << 20); // 810MB
    +		assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
    +			TaskManagerServices.calculateNetworkBuf(tmConfig));
    +	}
    +
    +	/**
    +	 * Returns a task manager services configuration for the tests
    +	 *
    +	 * @param managedMemory         see {@link TaskManagerOptions#MANAGED_MEMORY_SIZE}
    +	 * @param managedMemoryFraction see {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION}
    +	 * @param networkBufFraction	see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}
    +	 * @param networkBufMin			see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}
    +	 * @param networkBufMax			see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}
    +	 * @param memType				on-heap or off-heap
    +	 *
    +	 * @return configuration object
    +	 */
    +	private static TaskManagerServicesConfiguration getTmConfig(
    +		final long managedMemory, final float managedMemoryFraction, float networkBufFraction,
    +		long networkBufMin, long networkBufMax,
    +		final MemoryType memType) {
    +
    +		final NetworkEnvironmentConfiguration networkConfig = new NetworkEnvironmentConfiguration(
    +			networkBufFraction,
    +			networkBufMin,
    +			networkBufMax,
    +			TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(),
    +			memType,
    +			null,
    +			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
    +			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
    +			TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(),
    +			TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(),
    +			null);
    +
    +		return new TaskManagerServicesConfiguration(
    +			mock(InetAddress.class),
    +			new String[] {},
    +			networkConfig,
    +			QueryableStateConfiguration.disabled(),
    +			1,
    +			managedMemory,
    +			false,
    +			managedMemoryFraction,
    +			mock(MetricRegistryConfiguration.class),
    +			0);
    +	}
    +
    +	/**
    +	 * Test for {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)} with some
    +	 * manually calculated scenarios.
    +	 */
    +	@Test
    +	public void calculateHeapSizeMB() throws Exception {
    +		Configuration config = new Configuration();
    +		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
    +		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 64L << 20); // 64MB
    +		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 30); // 1GB
    +
    +		config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false);
    +		assertEquals(1000, TaskManagerServices.calculateHeapSizeMB(1000, config));
    +
    +		config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
    +		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 10); // 10MB
    +		assertEquals(890, TaskManagerServices.calculateHeapSizeMB(1000, config));
    +
    +		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1); // use fraction of given memory
    +		config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 0.1f); // 10%
    +		assertEquals(810, TaskManagerServices.calculateHeapSizeMB(1000, config));
    --- End diff --
    
    Now that i see this test it got me thinking: the managedMemory and networkBuffersmemory frations do not work on the same base value; i.e if both are set at 0.5 then one (the network i think) gets 0.5 of the total memory, while the managedMemory gets 0.25. I'm wondering how intuitive this is; they are similar when used alone, but when both are used 0.5 doesn't equal 0.5 in a way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3721#discussion_r112246505
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ---
    @@ -450,7 +450,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc
     		}
     		catch (Throwable t) {
     			if (LOG.isErrorEnabled()) {
    -				LOG.error("Unexpected problen while getting the file statistics for file '" + this.filePath + "': "
    +				LOG.error("Unexpected problem while getting the file statistics for file '" + this.filePath + "': "
    --- End diff --
    
    sorry, should I create a separate PR? (a separate JIRA is definitely overkill for this)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3721: [FLINK-4545] replace the network buffers parameter

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/3721
  
    Okay, taking a step back. Looking through the code some more, the internal arithmetric should certainly stay in bytes. However, bytes are tedious to configure.
    
    I suggest to add support to the configuration to interpret memory units, so that we can configure values via
      - 512m
      - 10 kb
      - ...
    
    I have started some utility here: https://github.com/StephanEwen/incubator-flink/tree/mem_size
    
    That means that we would keep the PR in this form and add memory configuration parsing as a followup.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3721#discussion_r112239687
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java ---
    @@ -0,0 +1,272 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.taskexecutor;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.TaskManagerOptions;
    +import org.apache.flink.core.memory.MemoryType;
    +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
    +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
    +import org.apache.flink.runtime.util.EnvironmentInformation;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.powermock.api.mockito.PowerMockito;
    +import org.powermock.core.classloader.annotations.PrepareForTest;
    +import org.powermock.modules.junit4.PowerMockRunner;
    +
    +import java.net.InetAddress;
    +import java.util.Random;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Unit test for {@link TaskManagerServices}.
    + */
    +@RunWith(PowerMockRunner.class)
    +@PrepareForTest(EnvironmentInformation.class)
    +public class TaskManagerServicesTest {
    +
    +	/**
    +	 * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old
    +	 * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
    +	 */
    +	@SuppressWarnings("deprecation")
    +	@Test
    +	public void calculateNetworkBufOld() throws Exception {
    +		Configuration config = new Configuration();
    +		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
    +
    +		// note: actual network buffer memory size is independent of the totalJavaMemorySize
    +		assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
    +			TaskManagerServices.calculateNetworkBuf(10L << 20, config));
    +		assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
    +			TaskManagerServices.calculateNetworkBuf(64L << 20, config));
    +
    +		// test integer overflow in the memory size
    +		int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33
    +		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers);
    +		assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config));
    +	}
    +
    +	/**
    +	 * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using new
    +	 * configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
    +	 * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
    +	 * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
    +	 */
    +	@Test
    +	public void calculateNetworkBufNew() throws Exception {
    +		Configuration config = new Configuration();
    +
    +		// (1) defaults
    +		final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
    +		final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
    +		final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
    +		assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20)))),
    +			TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config));
    +		assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30)))),
    +			TaskManagerServices.calculateNetworkBuf((10L << 30), config));
    +
    +		calculateNetworkBufNew(config);
    +	}
    +
    +	/**
    +	 * Helper to test {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} with the
    +	 * new configuration parameters.
    +	 *
    +	 * @param config configuration object
    +	 */
    +	private static void calculateNetworkBufNew(final Configuration config) {
    +		// (2) fixed size memory
    +		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1L << 20); // 1MB
    +		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 20); // 1MB
    +
    +		// note: actual network buffer memory size is independent of the totalJavaMemorySize
    +		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(10L << 20, config));
    +		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(64L << 20, config));
    +		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(1L << 30, config));
    +
    +		// (3) random fraction, min, and max values
    +		Random ran = new Random();
    +		for (int i = 0; i < 1_000; ++i){
    +			float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
    +			config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac);
    +
    +			long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong());
    +			config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, min);
    +
    +			long max = Math.max(min, ran.nextLong());
    +			config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, max);
    +
    +			long javaMem = Math.max(max + 1, ran.nextLong());
    +
    +			final long networkBufMem = TaskManagerServices.calculateNetworkBuf(javaMem, config);
    --- End diff --
    
    Random testing is quite powerful to identify unforeseen errors, especially since these inputs are user-configurable. I'll keep that but add better error messages as suggested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3721: [FLINK-4545] replace the network buffers parameter

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/3721
  
    Merging this.
    I filed a follow-up JIRA to address the "configuration with units" to make sure all memory-related parameters behave the same way, without loss of byte precision where needed: https://issues.apache.org/jira/browse/FLINK-6469


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3721#discussion_r111975142
  
    --- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
    @@ -398,3 +428,106 @@ readSlaves() {
     useOffHeapMemory() {
         [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" ]]
     }
    +
    +HAVE_AWK=
    +# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long totalJavaMemorySize, Configuration config)
    +calculateNetworkBuf() {
    +    local network_buffers_bytes
    +    if [ "${FLINK_TM_HEAP}" -le "0" ]; then
    +        echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
    +        exit 1
    +    fi
    +
    +    if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then
    +        # fix memory size for network buffers
    +        network_buffers_bytes=${FLINK_TM_NET_BUF_MIN}
    +    else
    +        if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then
    +            echo "[ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid."
    +            echo "Min must be less than or equal to max."
    +            echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}."
    +            exit 1
    +        fi
    +
    +        # Bash only performs integer arithmetic so floating point computation is performed using awk
    +        if [[ -z "${HAVE_AWK}" ]] ; then
    +            command -v awk >/dev/null 2>&1
    +            if [[ $? -ne 0 ]]; then
    +                echo "[ERROR] Program 'awk' not found."
    +                echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
    +                exit 1
    +            fi
    +            HAVE_AWK=true
    +        fi
    +
    +        # We calculate the memory using a fraction of the total memory
    +        if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then
    +            echo "[ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value."
    +            echo "It must be between 0.0 and 1.0."
    +            echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
    +            exit 1
    +        fi
    +
    +        network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > ${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"`
    +    fi
    +
    +    # recalculate the JVM heap memory by taking the network buffers into account
    --- End diff --
    
    This is more of a verification isn't it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3721: [FLINK-4545] replace the network buffers parameter

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on the issue:

    https://github.com/apache/flink/pull/3721
  
    For the two tests that failed on Travis CI: they were simply killed and a "`Killed`" appeared in their logs which is usually an indicator that memory ran out and the kernel killed a process


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3721#discussion_r112003041
  
    --- Diff: flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java ---
    @@ -0,0 +1,306 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.dist;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.TaskManagerOptions;
    +import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
    +import org.apache.flink.util.OperatingSystem;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.Assume;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.util.Random;
    +
    +import static org.hamcrest.CoreMatchers.allOf;
    +import static org.hamcrest.Matchers.greaterThanOrEqualTo;
    +import static org.hamcrest.Matchers.lessThanOrEqualTo;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertThat;
    +
    +/**
    + * Unit test that verifies that the task manager heap size calculation used by the bash script
    + * <tt>taskmanager.sh</tt> returns the same values as the heap size calculation of
    + * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}.
    + *
    + * NOTE: the shell script uses <tt>awk</tt> to perform floating-point arithmetic which uses
    + * <tt>double</tt> precision but our Java code restrains to <tt>float</tt> because we actually do
    + * not need high precision.
    + */
    +public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
    +
    +	/** Key that is used by <tt>config.sh</tt>. */
    +	private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb";
    +
    +	/**
    +	 * Number of tests with random values.
    +	 *
    +	 * NOTE: calling the external test script is slow and thus low numbers are preferred for general
    +	 * testing.
    +	 */
    +	private static final int NUM_RANDOM_TESTS = 20;
    +
    +	@Before
    +	public void checkOperatingSystem() {
    +		Assume.assumeTrue("This test checks shell scripts not available on Windows.",
    +			!OperatingSystem.isWindows());
    +	}
    +
    +	/**
    +	 * Tests that {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} has the same
    +	 * result as the shell script.
    +	 */
    +	@Test
    +	public void compareNetworkBufShellScriptWithJava() throws Exception {
    +		int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
    +		float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
    +
    +		// manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
    +
    +		compareNetworkBufJavaVsScript(
    +			getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f);
    +
    +		compareNetworkBufJavaVsScript(
    +			getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 /*MB*/, managedMemFrac), 0.0f);
    +
    +		compareNetworkBufJavaVsScript(
    +			getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f);
    +
    +		// some automated tests with random (but valid) values
    +
    +		Random ran = new Random();
    +		for (int i = 0; i < NUM_RANDOM_TESTS; ++i) {
    +			// tolerate that values differ by 1% (due to different floating point precisions)
    +			compareNetworkBufJavaVsScript(getRandomConfig(ran), 0.01f);
    +		}
    +	}
    +
    +	/**
    +	 * Tests that {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)} has the same
    +	 * result as the shell script.
    +	 */
    +	@Test
    +	public void compareHeapSizeShellScriptWithJava() throws Exception {
    +		int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
    +		float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
    +
    +		// manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
    +
    +		compareHeapSizeJavaVsScript(
    +			getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f);
    +
    +		compareHeapSizeJavaVsScript(
    +			getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 /*MB*/, managedMemFrac), 0.0f);
    +
    +		compareHeapSizeJavaVsScript(
    +			getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f);
    +
    +		// some automated tests with random (but valid) values
    +
    +		Random ran = new Random();
    +		for (int i = 0; i < NUM_RANDOM_TESTS; ++i) {
    +			// tolerate that values differ by 1% (due to different floating point precisions)
    +			compareHeapSizeJavaVsScript(getRandomConfig(ran), 0.01f);
    +		}
    +	}
    +
    +	/**
    +	 * Returns a flink configuration object with the given values.
    +	 *
    +	 * @param javaMemMB
    +	 * 		total JVM memory to use (in megabytes)
    +	 * @param useOffHeap
    +	 * 		whether to use off-heap memory (<tt>true</tt>) or not (<tt>false</tt>)
    +	 * @param netBufMemFrac
    +	 * 		fraction of JVM memory to use for network buffers
    +	 * @param netBufMemMin
    +	 * 		minimum memory size for network buffers (in bytes)
    +	 * @param netBufMemMax
    +	 * 		maximum memory size for network buffers (in bytes)
    +	 * @param managedMemSizeMB
    +	 * 		amount of managed memory (in megabytes)
    +	 * @param managedMemFrac
    +	 * 		fraction of free memory to use for managed memory (if <tt>managedMemSizeMB</tt> is
    +	 * 		<tt>-1</tt>)
    +	 *
    +	 * @return flink configuration
    +	 */
    +	private static Configuration getConfig(
    +			final int javaMemMB, final boolean useOffHeap, final float netBufMemFrac,
    +			final long netBufMemMin, final long netBufMemMax, final int managedMemSizeMB,
    +			final float managedMemFrac) {
    +
    +		Configuration config = new Configuration();
    +
    +		config.setLong(KEY_TASKM_MEM_SIZE, javaMemMB);
    +		config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, useOffHeap);
    +
    +		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, netBufMemFrac);
    +		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, netBufMemMin);
    +		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, netBufMemMax);
    +
    +		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemSizeMB);
    +		config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, managedMemFrac);
    +
    +		return config;
    +	}
    +
    +	/**
    +	 * Returns a flink configuration object with random values (only those relevant to the tests in
    +	 * this class.
    +	 *
    +	 * @param ran  random number generator
    +	 *
    +	 * @return flink configuration
    +	 */
    +	private static Configuration getRandomConfig(final Random ran) {
    +
    +		float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
    +
    +//		long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong());
    --- End diff --
    
    these lines can be removed i guess


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3721#discussion_r112246105
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java ---
    @@ -0,0 +1,272 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.taskexecutor;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.TaskManagerOptions;
    +import org.apache.flink.core.memory.MemoryType;
    +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
    +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
    +import org.apache.flink.runtime.util.EnvironmentInformation;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.powermock.api.mockito.PowerMockito;
    +import org.powermock.core.classloader.annotations.PrepareForTest;
    +import org.powermock.modules.junit4.PowerMockRunner;
    +
    +import java.net.InetAddress;
    +import java.util.Random;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Unit test for {@link TaskManagerServices}.
    + */
    +@RunWith(PowerMockRunner.class)
    +@PrepareForTest(EnvironmentInformation.class)
    +public class TaskManagerServicesTest {
    +
    +	/**
    +	 * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old
    +	 * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
    +	 */
    +	@SuppressWarnings("deprecation")
    +	@Test
    +	public void calculateNetworkBufOld() throws Exception {
    +		Configuration config = new Configuration();
    +		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
    +
    +		// note: actual network buffer memory size is independent of the totalJavaMemorySize
    +		assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
    +			TaskManagerServices.calculateNetworkBuf(10L << 20, config));
    +		assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
    +			TaskManagerServices.calculateNetworkBuf(64L << 20, config));
    +
    +		// test integer overflow in the memory size
    +		int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33
    +		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers);
    +		assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config));
    +	}
    +
    +	/**
    +	 * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using new
    +	 * configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
    +	 * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
    +	 * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
    +	 */
    +	@Test
    +	public void calculateNetworkBufNew() throws Exception {
    +		Configuration config = new Configuration();
    +
    +		// (1) defaults
    +		final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
    +		final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
    +		final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
    +		assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20)))),
    +			TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config));
    +		assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30)))),
    +			TaskManagerServices.calculateNetworkBuf((10L << 30), config));
    +
    +		calculateNetworkBufNew(config);
    +	}
    +
    +	/**
    +	 * Helper to test {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} with the
    +	 * new configuration parameters.
    +	 *
    +	 * @param config configuration object
    +	 */
    +	private static void calculateNetworkBufNew(final Configuration config) {
    +		// (2) fixed size memory
    +		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1L << 20); // 1MB
    +		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 20); // 1MB
    +
    +		// note: actual network buffer memory size is independent of the totalJavaMemorySize
    +		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(10L << 20, config));
    +		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(64L << 20, config));
    +		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(1L << 30, config));
    +
    +		// (3) random fraction, min, and max values
    +		Random ran = new Random();
    +		for (int i = 0; i < 1_000; ++i){
    +			float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
    +			config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac);
    +
    +			long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong());
    +			config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, min);
    +
    +			long max = Math.max(min, ran.nextLong());
    +			config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, max);
    +
    +			long javaMem = Math.max(max + 1, ran.nextLong());
    +
    +			final long networkBufMem = TaskManagerServices.calculateNetworkBuf(javaMem, config);
    +			assertTrue(networkBufMem >= min);
    +			assertTrue(networkBufMem <= max);
    +			if (networkBufMem > min && networkBufMem < max) {
    +				assertEquals((long) (javaMem * frac), networkBufMem);
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using mixed
    +	 * old/new configurations.
    +	 */
    +	@SuppressWarnings("deprecation")
    +	@Test
    +	public void calculateNetworkBufMixed() throws Exception {
    +		Configuration config = new Configuration();
    +		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
    +
    +		final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
    +		final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
    +		final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
    +
    +		// old + 1 new parameter = new:
    +		Configuration config1 = config.clone();
    +		config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
    +		assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (0.1f * (10L << 20)))),
    +			TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config1));
    +		assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (0.1f * (10L << 30)))),
    +			TaskManagerServices.calculateNetworkBuf((10L << 30), config1));
    +
    +		config1 = config.clone();
    +		long newMin = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(); // smallest value possible
    +		config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, newMin);
    +		assertEquals(Math.min(defaultMax, Math.max(newMin, (long) (defaultFrac * (10L << 20)))),
    +			TaskManagerServices.calculateNetworkBuf((10L << 20), config1));
    +		assertEquals(Math.min(defaultMax, Math.max(newMin, (long) (defaultFrac * (10L << 30)))),
    +			TaskManagerServices.calculateNetworkBuf((10L << 30), config1));
    +
    +		config1 = config.clone();
    +		long newMax = Math.max(64L << 20 + 1, TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue());
    +		config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, newMax);
    +		assertEquals(Math.min(newMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20)))),
    +			TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config1));
    +		assertEquals(Math.min(newMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30)))),
    +			TaskManagerServices.calculateNetworkBuf((10L << 30), config1));
    +		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config1));
    +
    +		// old + any new parameter = new:
    +		calculateNetworkBufNew(config);
    +	}
    +
    +	/**
    +	 * Test for {@link TaskManagerServices#calculateNetworkBuf(TaskManagerServicesConfiguration)}
    +	 * using the same (manual) test cases as in {@link #calculateHeapSizeMB()}.
    +	 */
    +	@Test
    +	public void calculateNetworkBufFromHeapSize() throws Exception {
    +		PowerMockito.mockStatic(EnvironmentInformation.class);
    +		// some defaults:
    +		when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L << 20); // 1000MB
    +		when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(1000L << 20); // 1000MB
    +
    +		TaskManagerServicesConfiguration tmConfig;
    +
    +		tmConfig = getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(),
    +			TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
    +			0.1f, 60L << 20, 1L << 30, MemoryType.HEAP);
    +		when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L << 20); // 1000MB
    +		assertEquals(100L << 20, TaskManagerServices.calculateNetworkBuf(tmConfig));
    +
    +		tmConfig = getTmConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
    +			0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP);
    +		when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(890L << 20); // 890MB
    +		assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
    +			TaskManagerServices.calculateNetworkBuf(tmConfig));
    +
    +		tmConfig = getTmConfig(-1, 0.1f,
    +			0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP);
    +		when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(810L << 20); // 810MB
    +		assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
    +			TaskManagerServices.calculateNetworkBuf(tmConfig));
    +	}
    +
    +	/**
    +	 * Returns a task manager services configuration for the tests
    +	 *
    +	 * @param managedMemory         see {@link TaskManagerOptions#MANAGED_MEMORY_SIZE}
    +	 * @param managedMemoryFraction see {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION}
    +	 * @param networkBufFraction	see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}
    +	 * @param networkBufMin			see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}
    +	 * @param networkBufMax			see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}
    +	 * @param memType				on-heap or off-heap
    +	 *
    +	 * @return configuration object
    +	 */
    +	private static TaskManagerServicesConfiguration getTmConfig(
    +		final long managedMemory, final float managedMemoryFraction, float networkBufFraction,
    +		long networkBufMin, long networkBufMax,
    +		final MemoryType memType) {
    +
    +		final NetworkEnvironmentConfiguration networkConfig = new NetworkEnvironmentConfiguration(
    +			networkBufFraction,
    +			networkBufMin,
    +			networkBufMax,
    +			TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(),
    +			memType,
    +			null,
    +			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
    +			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
    +			TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(),
    +			TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(),
    +			null);
    +
    +		return new TaskManagerServicesConfiguration(
    +			mock(InetAddress.class),
    +			new String[] {},
    +			networkConfig,
    +			QueryableStateConfiguration.disabled(),
    +			1,
    +			managedMemory,
    +			false,
    +			managedMemoryFraction,
    +			mock(MetricRegistryConfiguration.class),
    +			0);
    +	}
    +
    +	/**
    +	 * Test for {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)} with some
    +	 * manually calculated scenarios.
    +	 */
    +	@Test
    +	public void calculateHeapSizeMB() throws Exception {
    +		Configuration config = new Configuration();
    +		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
    +		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 64L << 20); // 64MB
    +		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 30); // 1GB
    +
    +		config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false);
    +		assertEquals(1000, TaskManagerServices.calculateHeapSizeMB(1000, config));
    +
    +		config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
    +		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 10); // 10MB
    +		assertEquals(890, TaskManagerServices.calculateHeapSizeMB(1000, config));
    +
    +		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1); // use fraction of given memory
    +		config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 0.1f); // 10%
    +		assertEquals(810, TaskManagerServices.calculateHeapSizeMB(1000, config));
    --- End diff --
    
    yes, unfortunately, that is the case and it has always been that way in the past (with the network buffers not being a fraction but a fixed amount of memory) but it is also properly documented (now).
    
    With the possibility to specify min and max values for the network buffer memory size, the actual fraction may be different than the given one and we don't really want to fail jobs because we can't ensure the given fraction for the managed memory in that case, do we?
    As a side note, this is also the safest way to ensure that the invariants hold, especially for the 0.5 vs. 0.5 example: inside Java, we will always allocate the network buffer memory first and then identify the remaining free heap space (if on-heap) from which we will use the given fraction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3721#discussion_r112002748
  
    --- Diff: flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java ---
    @@ -0,0 +1,306 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.dist;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.TaskManagerOptions;
    +import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
    +import org.apache.flink.util.OperatingSystem;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.Assume;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.util.Random;
    +
    +import static org.hamcrest.CoreMatchers.allOf;
    +import static org.hamcrest.Matchers.greaterThanOrEqualTo;
    +import static org.hamcrest.Matchers.lessThanOrEqualTo;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertThat;
    +
    +/**
    + * Unit test that verifies that the task manager heap size calculation used by the bash script
    + * <tt>taskmanager.sh</tt> returns the same values as the heap size calculation of
    + * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}.
    + *
    + * NOTE: the shell script uses <tt>awk</tt> to perform floating-point arithmetic which uses
    + * <tt>double</tt> precision but our Java code restrains to <tt>float</tt> because we actually do
    + * not need high precision.
    + */
    +public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
    +
    +	/** Key that is used by <tt>config.sh</tt>. */
    +	private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb";
    +
    +	/**
    +	 * Number of tests with random values.
    +	 *
    +	 * NOTE: calling the external test script is slow and thus low numbers are preferred for general
    +	 * testing.
    +	 */
    +	private static final int NUM_RANDOM_TESTS = 20;
    +
    +	@Before
    +	public void checkOperatingSystem() {
    +		Assume.assumeTrue("This test checks shell scripts not available on Windows.",
    +			!OperatingSystem.isWindows());
    +	}
    +
    +	/**
    +	 * Tests that {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} has the same
    +	 * result as the shell script.
    +	 */
    +	@Test
    +	public void compareNetworkBufShellScriptWithJava() throws Exception {
    +		int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
    +		float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
    +
    +		// manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
    +
    +		compareNetworkBufJavaVsScript(
    +			getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f);
    +
    +		compareNetworkBufJavaVsScript(
    +			getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 /*MB*/, managedMemFrac), 0.0f);
    +
    +		compareNetworkBufJavaVsScript(
    +			getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f);
    +
    +		// some automated tests with random (but valid) values
    +
    +		Random ran = new Random();
    +		for (int i = 0; i < NUM_RANDOM_TESTS; ++i) {
    +			// tolerate that values differ by 1% (due to different floating point precisions)
    +			compareNetworkBufJavaVsScript(getRandomConfig(ran), 0.01f);
    --- End diff --
    
    As with the other randomized tests we should print the configured used for the test in case of failure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3721#discussion_r112001266
  
    --- Diff: flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java ---
    @@ -0,0 +1,306 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.dist;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.TaskManagerOptions;
    +import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
    +import org.apache.flink.util.OperatingSystem;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.Assume;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.util.Random;
    +
    +import static org.hamcrest.CoreMatchers.allOf;
    +import static org.hamcrest.Matchers.greaterThanOrEqualTo;
    +import static org.hamcrest.Matchers.lessThanOrEqualTo;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertThat;
    +
    +/**
    + * Unit test that verifies that the task manager heap size calculation used by the bash script
    + * <tt>taskmanager.sh</tt> returns the same values as the heap size calculation of
    + * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}.
    + *
    + * NOTE: the shell script uses <tt>awk</tt> to perform floating-point arithmetic which uses
    + * <tt>double</tt> precision but our Java code restrains to <tt>float</tt> because we actually do
    + * not need high precision.
    + */
    +public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
    +
    +	/** Key that is used by <tt>config.sh</tt>. */
    +	private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb";
    +
    +	/**
    +	 * Number of tests with random values.
    +	 *
    +	 * NOTE: calling the external test script is slow and thus low numbers are preferred for general
    +	 * testing.
    +	 */
    +	private static final int NUM_RANDOM_TESTS = 20;
    +
    +	@Before
    +	public void checkOperatingSystem() {
    +		Assume.assumeTrue("This test checks shell scripts not available on Windows.",
    --- End diff --
    
    missing "that are"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3721#discussion_r112236022
  
    --- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
    @@ -398,3 +428,106 @@ readSlaves() {
     useOffHeapMemory() {
         [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" ]]
     }
    +
    +HAVE_AWK=
    +# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long totalJavaMemorySize, Configuration config)
    +calculateNetworkBuf() {
    +    local network_buffers_bytes
    +    if [ "${FLINK_TM_HEAP}" -le "0" ]; then
    +        echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
    +        exit 1
    +    fi
    +
    +    if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then
    +        # fix memory size for network buffers
    +        network_buffers_bytes=${FLINK_TM_NET_BUF_MIN}
    +    else
    +        if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then
    +            echo "[ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid."
    +            echo "Min must be less than or equal to max."
    +            echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}."
    +            exit 1
    +        fi
    +
    +        # Bash only performs integer arithmetic so floating point computation is performed using awk
    +        if [[ -z "${HAVE_AWK}" ]] ; then
    +            command -v awk >/dev/null 2>&1
    +            if [[ $? -ne 0 ]]; then
    +                echo "[ERROR] Program 'awk' not found."
    +                echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
    +                exit 1
    +            fi
    +            HAVE_AWK=true
    +        fi
    +
    +        # We calculate the memory using a fraction of the total memory
    +        if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then
    +            echo "[ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value."
    +            echo "It must be between 0.0 and 1.0."
    +            echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
    +            exit 1
    +        fi
    +
    +        network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > ${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"`
    +    fi
    +
    +    # recalculate the JVM heap memory by taking the network buffers into account
    --- End diff --
    
    what do you mean?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3721#discussion_r112000380
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ---
    @@ -450,7 +450,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc
     		}
     		catch (Throwable t) {
     			if (LOG.isErrorEnabled()) {
    -				LOG.error("Unexpected problen while getting the file statistics for file '" + this.filePath + "': "
    +				LOG.error("Unexpected problem while getting the file statistics for file '" + this.filePath + "': "
    --- End diff --
    
    Let's see whether i can remember to not squash this commit :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3721#discussion_r112242996
  
    --- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
    @@ -398,3 +428,106 @@ readSlaves() {
     useOffHeapMemory() {
         [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" ]]
     }
    +
    +HAVE_AWK=
    +# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long totalJavaMemorySize, Configuration config)
    +calculateNetworkBuf() {
    +    local network_buffers_bytes
    +    if [ "${FLINK_TM_HEAP}" -le "0" ]; then
    +        echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
    +        exit 1
    +    fi
    +
    +    if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then
    +        # fix memory size for network buffers
    +        network_buffers_bytes=${FLINK_TM_NET_BUF_MIN}
    +    else
    +        if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then
    +            echo "[ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid."
    +            echo "Min must be less than or equal to max."
    +            echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}."
    +            exit 1
    +        fi
    +
    +        # Bash only performs integer arithmetic so floating point computation is performed using awk
    +        if [[ -z "${HAVE_AWK}" ]] ; then
    +            command -v awk >/dev/null 2>&1
    +            if [[ $? -ne 0 ]]; then
    +                echo "[ERROR] Program 'awk' not found."
    +                echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
    +                exit 1
    +            fi
    +            HAVE_AWK=true
    +        fi
    +
    +        # We calculate the memory using a fraction of the total memory
    +        if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then
    +            echo "[ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value."
    +            echo "It must be between 0.0 and 1.0."
    +            echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
    +            exit 1
    +        fi
    +
    +        network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > ${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"`
    +    fi
    +
    +    # recalculate the JVM heap memory by taking the network buffers into account
    --- End diff --
    
    To me, "recalculate" implied that it would change some configuration value, but that's not happening. It's only verifying that the memory for network buffers is less than the heap memory.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3721#discussion_r112247760
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ---
    @@ -450,7 +450,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc
     		}
     		catch (Throwable t) {
     			if (LOG.isErrorEnabled()) {
    -				LOG.error("Unexpected problen while getting the file statistics for file '" + this.filePath + "': "
    +				LOG.error("Unexpected problem while getting the file statistics for file '" + this.filePath + "': "
    --- End diff --
    
    You can leave it as it is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3721#discussion_r111983754
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---
    @@ -410,24 +410,47 @@ private static NetworkEnvironment createNetworkEnvironment(
     	 *
     	 * @return memory to use for network buffers (in bytes)
     	 */
    +	@SuppressWarnings("deprecation")
     	public static long calculateNetworkBuf(long totalJavaMemorySize, Configuration config) {
    +		assert totalJavaMemorySize > 0;
    --- End diff --
    
    use Preconditions instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3721: [FLINK-4545] replace the network buffers parameter

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/3721
  
    I am checking this PR out now...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3721#discussion_r111969982
  
    --- Diff: docs/setup/config.md ---
    @@ -602,26 +612,66 @@ You have to configure `jobmanager.archive.fs.dir` in order to archive terminated
     
     ## Background
     
    +
     ### Configuring the Network Buffers
     
    -If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, please use the following formula to adjust the number of network buffers:
    +If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, you
    --- End diff --
    
    afaik those things only happen if you leave an empty space at the end of the line or so - here, the lines are properly joined inside HTML


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3721: [FLINK-4545] replace the network buffers parameter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/3721
  
    I've merged the 2 PR's that this one build upon; could you rebase this one?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3721#discussion_r111992933
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java ---
    @@ -0,0 +1,272 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.taskexecutor;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.TaskManagerOptions;
    +import org.apache.flink.core.memory.MemoryType;
    +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
    +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
    +import org.apache.flink.runtime.util.EnvironmentInformation;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.powermock.api.mockito.PowerMockito;
    +import org.powermock.core.classloader.annotations.PrepareForTest;
    +import org.powermock.modules.junit4.PowerMockRunner;
    +
    +import java.net.InetAddress;
    +import java.util.Random;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Unit test for {@link TaskManagerServices}.
    + */
    +@RunWith(PowerMockRunner.class)
    +@PrepareForTest(EnvironmentInformation.class)
    +public class TaskManagerServicesTest {
    +
    +	/**
    +	 * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old
    +	 * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
    +	 */
    +	@SuppressWarnings("deprecation")
    +	@Test
    +	public void calculateNetworkBufOld() throws Exception {
    +		Configuration config = new Configuration();
    +		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
    +
    +		// note: actual network buffer memory size is independent of the totalJavaMemorySize
    +		assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
    +			TaskManagerServices.calculateNetworkBuf(10L << 20, config));
    +		assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
    +			TaskManagerServices.calculateNetworkBuf(64L << 20, config));
    +
    +		// test integer overflow in the memory size
    +		int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33
    +		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers);
    +		assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config));
    +	}
    +
    +	/**
    +	 * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using new
    +	 * configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
    +	 * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
    +	 * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
    +	 */
    +	@Test
    +	public void calculateNetworkBufNew() throws Exception {
    +		Configuration config = new Configuration();
    +
    +		// (1) defaults
    +		final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
    +		final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
    +		final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
    +		assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20)))),
    +			TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config));
    +		assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30)))),
    +			TaskManagerServices.calculateNetworkBuf((10L << 30), config));
    +
    +		calculateNetworkBufNew(config);
    +	}
    +
    +	/**
    +	 * Helper to test {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} with the
    +	 * new configuration parameters.
    +	 *
    +	 * @param config configuration object
    +	 */
    +	private static void calculateNetworkBufNew(final Configuration config) {
    +		// (2) fixed size memory
    +		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1L << 20); // 1MB
    +		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 20); // 1MB
    +
    +		// note: actual network buffer memory size is independent of the totalJavaMemorySize
    +		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(10L << 20, config));
    +		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(64L << 20, config));
    +		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(1L << 30, config));
    +
    +		// (3) random fraction, min, and max values
    +		Random ran = new Random();
    +		for (int i = 0; i < 1_000; ++i){
    +			float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
    +			config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac);
    +
    +			long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong());
    +			config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, min);
    +
    +			long max = Math.max(min, ran.nextLong());
    +			config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, max);
    +
    +			long javaMem = Math.max(max + 1, ran.nextLong());
    +
    +			final long networkBufMem = TaskManagerServices.calculateNetworkBuf(javaMem, config);
    --- End diff --
    
    What we definitely need here is a catch block that prints the used parameters if any assertion fails. It is a bit odd to use random parameters in the first place though :/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3721#discussion_r111966911
  
    --- Diff: docs/setup/config.md ---
    @@ -602,26 +612,66 @@ You have to configure `jobmanager.archive.fs.dir` in order to archive terminated
     
     ## Background
     
    +
     ### Configuring the Network Buffers
     
    -If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, please use the following formula to adjust the number of network buffers:
    +If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, you
    --- End diff --
    
    We usually don't do manual line breaks in the documentation; otherwise if you resize the window funky things start to happen.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3721


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3721: [FLINK-4545] replace the network buffers parameter

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/3721
  
    Code is good in general and well tested (including the shell scripts, which is great!)
    
    I would do some on-the fly polishing while merging. Main thing I want to adjust if having the configuration parameters specified in Megabytes, not Bytes. All other memory related parameters are in Megabytes, so that one should be as well, for consistency. Also, I think we don't need a finer granularity these days.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3721#discussion_r112248789
  
    --- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
    @@ -398,3 +428,106 @@ readSlaves() {
     useOffHeapMemory() {
         [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" ]]
     }
    +
    +HAVE_AWK=
    +# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long totalJavaMemorySize, Configuration config)
    +calculateNetworkBuf() {
    +    local network_buffers_bytes
    +    if [ "${FLINK_TM_HEAP}" -le "0" ]; then
    +        echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
    +        exit 1
    +    fi
    +
    +    if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then
    +        # fix memory size for network buffers
    +        network_buffers_bytes=${FLINK_TM_NET_BUF_MIN}
    +    else
    +        if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then
    +            echo "[ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid."
    +            echo "Min must be less than or equal to max."
    +            echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}."
    +            exit 1
    +        fi
    +
    +        # Bash only performs integer arithmetic so floating point computation is performed using awk
    +        if [[ -z "${HAVE_AWK}" ]] ; then
    +            command -v awk >/dev/null 2>&1
    +            if [[ $? -ne 0 ]]; then
    +                echo "[ERROR] Program 'awk' not found."
    +                echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
    +                exit 1
    +            fi
    +            HAVE_AWK=true
    +        fi
    +
    +        # We calculate the memory using a fraction of the total memory
    +        if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then
    +            echo "[ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value."
    +            echo "It must be between 0.0 and 1.0."
    +            echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
    +            exit 1
    +        fi
    +
    +        network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > ${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"`
    +    fi
    +
    +    # recalculate the JVM heap memory by taking the network buffers into account
    --- End diff --
    
    no, actually, the user may give the `FLINK_TM_HEAP` environment variable or configure the "flink heap size" via `taskmanager.heap.mb` but this is not the real "heap" size - rather the overall memory size used by flink (including off-heap). So this function removes the off-heap part and returns the real heap sizes to use with `-Xmx` and `-Xms`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3721#discussion_r112242842
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java ---
    @@ -0,0 +1,272 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.taskexecutor;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.TaskManagerOptions;
    +import org.apache.flink.core.memory.MemoryType;
    +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
    +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
    +import org.apache.flink.runtime.util.EnvironmentInformation;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.powermock.api.mockito.PowerMockito;
    +import org.powermock.core.classloader.annotations.PrepareForTest;
    +import org.powermock.modules.junit4.PowerMockRunner;
    +
    +import java.net.InetAddress;
    +import java.util.Random;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Unit test for {@link TaskManagerServices}.
    + */
    +@RunWith(PowerMockRunner.class)
    +@PrepareForTest(EnvironmentInformation.class)
    +public class TaskManagerServicesTest {
    +
    +	/**
    +	 * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old
    +	 * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
    +	 */
    +	@SuppressWarnings("deprecation")
    +	@Test
    +	public void calculateNetworkBufOld() throws Exception {
    +		Configuration config = new Configuration();
    +		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
    +
    +		// note: actual network buffer memory size is independent of the totalJavaMemorySize
    +		assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
    +			TaskManagerServices.calculateNetworkBuf(10L << 20, config));
    +		assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
    +			TaskManagerServices.calculateNetworkBuf(64L << 20, config));
    +
    +		// test integer overflow in the memory size
    +		int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33
    +		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers);
    +		assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config));
    +	}
    +
    +	/**
    +	 * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using new
    +	 * configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
    +	 * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
    +	 * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
    +	 */
    +	@Test
    +	public void calculateNetworkBufNew() throws Exception {
    +		Configuration config = new Configuration();
    +
    +		// (1) defaults
    +		final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
    +		final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
    +		final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
    +		assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20)))),
    +			TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config));
    +		assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30)))),
    +			TaskManagerServices.calculateNetworkBuf((10L << 30), config));
    +
    +		calculateNetworkBufNew(config);
    +	}
    +
    +	/**
    +	 * Helper to test {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} with the
    +	 * new configuration parameters.
    +	 *
    +	 * @param config configuration object
    +	 */
    +	private static void calculateNetworkBufNew(final Configuration config) {
    +		// (2) fixed size memory
    +		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1L << 20); // 1MB
    +		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 20); // 1MB
    +
    +		// note: actual network buffer memory size is independent of the totalJavaMemorySize
    +		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(10L << 20, config));
    +		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(64L << 20, config));
    +		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(1L << 30, config));
    +
    +		// (3) random fraction, min, and max values
    +		Random ran = new Random();
    +		for (int i = 0; i < 1_000; ++i){
    +			float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
    +			config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac);
    +
    +			long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong());
    +			config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, min);
    +
    +			long max = Math.max(min, ran.nextLong());
    +			config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, max);
    +
    +			long javaMem = Math.max(max + 1, ran.nextLong());
    +
    +			final long networkBufMem = TaskManagerServices.calculateNetworkBuf(javaMem, config);
    +			assertTrue(networkBufMem >= min);
    +			assertTrue(networkBufMem <= max);
    +			if (networkBufMem > min && networkBufMem < max) {
    +				assertEquals((long) (javaMem * frac), networkBufMem);
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using mixed
    +	 * old/new configurations.
    +	 */
    +	@SuppressWarnings("deprecation")
    +	@Test
    +	public void calculateNetworkBufMixed() throws Exception {
    +		Configuration config = new Configuration();
    +		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
    +
    +		final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
    +		final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
    +		final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
    +
    +		// old + 1 new parameter = new:
    +		Configuration config1 = config.clone();
    +		config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
    +		assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (0.1f * (10L << 20)))),
    --- End diff --
    
    yes, if we define our own test-defaults, your are right - however, we would decouple the test from the defaults that are set for real-world applications and I wanted to keep them as close as possible including any future change in the default values


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3721#discussion_r111997260
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java ---
    @@ -0,0 +1,272 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.taskexecutor;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.TaskManagerOptions;
    +import org.apache.flink.core.memory.MemoryType;
    +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
    +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
    +import org.apache.flink.runtime.util.EnvironmentInformation;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.powermock.api.mockito.PowerMockito;
    +import org.powermock.core.classloader.annotations.PrepareForTest;
    +import org.powermock.modules.junit4.PowerMockRunner;
    +
    +import java.net.InetAddress;
    +import java.util.Random;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Unit test for {@link TaskManagerServices}.
    + */
    +@RunWith(PowerMockRunner.class)
    +@PrepareForTest(EnvironmentInformation.class)
    +public class TaskManagerServicesTest {
    +
    +	/**
    +	 * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old
    +	 * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
    +	 */
    +	@SuppressWarnings("deprecation")
    +	@Test
    +	public void calculateNetworkBufOld() throws Exception {
    +		Configuration config = new Configuration();
    +		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
    +
    +		// note: actual network buffer memory size is independent of the totalJavaMemorySize
    +		assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
    +			TaskManagerServices.calculateNetworkBuf(10L << 20, config));
    +		assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
    +			TaskManagerServices.calculateNetworkBuf(64L << 20, config));
    +
    +		// test integer overflow in the memory size
    +		int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33
    +		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers);
    +		assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config));
    +	}
    +
    +	/**
    +	 * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using new
    +	 * configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
    +	 * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
    +	 * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
    +	 */
    +	@Test
    +	public void calculateNetworkBufNew() throws Exception {
    +		Configuration config = new Configuration();
    +
    +		// (1) defaults
    +		final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
    +		final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
    +		final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
    +		assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20)))),
    +			TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config));
    +		assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30)))),
    +			TaskManagerServices.calculateNetworkBuf((10L << 30), config));
    +
    +		calculateNetworkBufNew(config);
    +	}
    +
    +	/**
    +	 * Helper to test {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} with the
    +	 * new configuration parameters.
    +	 *
    +	 * @param config configuration object
    +	 */
    +	private static void calculateNetworkBufNew(final Configuration config) {
    +		// (2) fixed size memory
    +		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1L << 20); // 1MB
    +		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 20); // 1MB
    +
    +		// note: actual network buffer memory size is independent of the totalJavaMemorySize
    +		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(10L << 20, config));
    +		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(64L << 20, config));
    +		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(1L << 30, config));
    +
    +		// (3) random fraction, min, and max values
    +		Random ran = new Random();
    +		for (int i = 0; i < 1_000; ++i){
    +			float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
    +			config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac);
    +
    +			long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong());
    +			config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, min);
    +
    +			long max = Math.max(min, ran.nextLong());
    +			config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, max);
    +
    +			long javaMem = Math.max(max + 1, ran.nextLong());
    +
    +			final long networkBufMem = TaskManagerServices.calculateNetworkBuf(javaMem, config);
    +			assertTrue(networkBufMem >= min);
    +			assertTrue(networkBufMem <= max);
    +			if (networkBufMem > min && networkBufMem < max) {
    +				assertEquals((long) (javaMem * frac), networkBufMem);
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using mixed
    +	 * old/new configurations.
    +	 */
    +	@SuppressWarnings("deprecation")
    +	@Test
    +	public void calculateNetworkBufMixed() throws Exception {
    +		Configuration config = new Configuration();
    +		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
    +
    +		final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
    +		final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
    +		final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
    +
    +		// old + 1 new parameter = new:
    +		Configuration config1 = config.clone();
    +		config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
    +		assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (0.1f * (10L << 20)))),
    --- End diff --
    
    This seems overly complicated. If we would use a set of well-defined values instead of the defaults we could get rid of the min/max calls, no? This applies to the whole test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3721#discussion_r112246946
  
    --- Diff: flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java ---
    @@ -0,0 +1,306 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.dist;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.TaskManagerOptions;
    +import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
    +import org.apache.flink.util.OperatingSystem;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.Assume;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.util.Random;
    +
    +import static org.hamcrest.CoreMatchers.allOf;
    +import static org.hamcrest.Matchers.greaterThanOrEqualTo;
    +import static org.hamcrest.Matchers.lessThanOrEqualTo;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertThat;
    +
    +/**
    + * Unit test that verifies that the task manager heap size calculation used by the bash script
    + * <tt>taskmanager.sh</tt> returns the same values as the heap size calculation of
    + * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}.
    + *
    + * NOTE: the shell script uses <tt>awk</tt> to perform floating-point arithmetic which uses
    + * <tt>double</tt> precision but our Java code restrains to <tt>float</tt> because we actually do
    + * not need high precision.
    + */
    +public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
    +
    +	/** Key that is used by <tt>config.sh</tt>. */
    +	private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb";
    +
    +	/**
    +	 * Number of tests with random values.
    +	 *
    +	 * NOTE: calling the external test script is slow and thus low numbers are preferred for general
    +	 * testing.
    +	 */
    +	private static final int NUM_RANDOM_TESTS = 20;
    +
    +	@Before
    +	public void checkOperatingSystem() {
    +		Assume.assumeTrue("This test checks shell scripts not available on Windows.",
    +			!OperatingSystem.isWindows());
    +	}
    +
    +	/**
    +	 * Tests that {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} has the same
    +	 * result as the shell script.
    +	 */
    +	@Test
    +	public void compareNetworkBufShellScriptWithJava() throws Exception {
    +		int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
    +		float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
    +
    +		// manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
    +
    +		compareNetworkBufJavaVsScript(
    +			getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f);
    +
    +		compareNetworkBufJavaVsScript(
    +			getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 /*MB*/, managedMemFrac), 0.0f);
    +
    +		compareNetworkBufJavaVsScript(
    +			getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f);
    +
    +		// some automated tests with random (but valid) values
    +
    +		Random ran = new Random();
    +		for (int i = 0; i < NUM_RANDOM_TESTS; ++i) {
    +			// tolerate that values differ by 1% (due to different floating point precisions)
    +			compareNetworkBufJavaVsScript(getRandomConfig(ran), 0.01f);
    --- End diff --
    
    oh, here, I actually do already print the configuration in the error message


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3721#discussion_r111994207
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---
    @@ -376,6 +392,169 @@ private static NetworkEnvironment createNetworkEnvironment(
     	}
     
     	/**
    +	 * Calculates the amount of memory used for network buffers based on the total memory to use and
    +	 * the according configuration parameters.
    +	 *
    +	 * The following configuration parameters are involved:
    +	 * <ul>
    +	 *  <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
    +	 * 	<li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
    +	 * 	<li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, and</li>
    +	 *  <li>{@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)</li>
    +	 * </ul>.
    +	 *
    +	 * @param totalJavaMemorySize
    +	 * 		overall available memory to use (heap and off-heap, in bytes)
    +	 * @param config
    +	 * 		configuration object
    +	 *
    +	 * @return memory to use for network buffers (in bytes)
    +	 */
    +	public static long calculateNetworkBuf(long totalJavaMemorySize, Configuration config) {
    --- End diff --
    
    how about a slightly longer method name?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---