You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by etiennecarriere <gi...@git.apache.org> on 2018/06/11 14:39:54 UTC

[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

GitHub user etiennecarriere opened a pull request:

    https://github.com/apache/flink/pull/6149

    [FLINK-9560] Add RateLimiting for FileSystem

    ## Contribution Checklist
      
      - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices).
    
      - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
    
    ## What is the purpose of the change
    
    * Adding rate limiting on FileSystem . Our first purpose is to limit the bandwidth used by the writing of Checkpoint to object storage because it has impact on the global throughput of flink
    
    ## Brief change log
    
    Use the existing LimitedConnectionsFileSystem class to add some rate limiting mechanisms to throttle the bandwidth needs. It use the RateLimiter class provided by Guava
    
    ## Verifying this change
    
      - Unit test for validation of the configuration parsing 
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): Yes . Flink-core now depends on shaded guava18 
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: No
      - The serializers: No
      - The runtime per-record code paths (performance sensitive): No
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: No
      - The file system connector: Yes
    
    ## Documentation
    
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented? Ops documentation updated


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/DataDome/flink FLINK-9560

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6149.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6149
    
----
commit b6e22f3bb15652780efbe6375d6b24f5bba73f4e
Author: Etienne Carriere <et...@...>
Date:   2018-06-11T12:14:54Z

    [FLINK-9560] Add RateLimiting for FileSystem
    
    Add the capacity to limit the bandwidth used by reading or writing to
    Filesystem. It can be useful, for example, for limiting the bandwidth
    used to write periodic checkpoint to object storage

----


---

[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6149#discussion_r195134923
  
    --- Diff: docs/ops/filesystems.md ---
    @@ -89,9 +89,9 @@ For example, if the default file system configured as `fs.default-scheme: hdfs:/
     #### Connection limiting
     
     You can limit the total number of connections that a file system can concurrently open. This is useful when the file system cannot handle a large number
    -of concurrent reads / writes or open connections at the same time.
    +of concurrent reads / writes or open connections at the same time. You can also limit the throughput/bandwidth used to read/write from/to the FileSystem
     
    -For example, very small HDFS clusters with few RPC handlers can sometimes be overwhelmed by a large Flink job trying to build up many connections during a checkpoint.
    +For example, very small HDFS clusters with few RPC handlers can sometimes be overwhelmed by a large Flink job trying to build up many connections during a checkpoint. 
    --- End diff --
    
    white space at the end?


---

[GitHub] flink issue #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by etiennecarriere <gi...@git.apache.org>.
Github user etiennecarriere commented on the issue:

    https://github.com/apache/flink/pull/6149
  
    Hi @pnowojski,
    
    Unfortunately, this patch is not working because : 
    * The implementation of FileSystem is using a temporary file to store it 
    ** s3 : it is mandatory to know the file of the file before sending it (Content-Length is needed and Chunked mode is not supported) https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/s3/model/PutObjectRequest.html
    *** flink-s3-fs-hadoop : https://github.com/Aloisius/hadoop-s3a/blob/master/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java#L808
    *** flink-s3-fs-presto : https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java#L991
    * Swift : even if possible to upload file via chunked the hadoop implementation use tempFile
    ** flink-swift-fs-hadoop : https://github.com/c9n/hadoop/blob/master/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeOutputStream.java#L78
    
    so I propose to close the PR as it is not the right level to handle that. 


---

[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by etiennecarriere <gi...@git.apache.org>.
Github user etiennecarriere commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6149#discussion_r195682578
  
    --- Diff: flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java ---
    @@ -122,6 +122,42 @@ public void testLimitingInputStreams() throws Exception {
     		}
     	}
     
    +	@Test
    +	public void testLimitingRateLimitingStream() throws Exception {
    +		final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
    +				LocalFileSystem.getSharedInstance(),
    +				Integer.MAX_VALUE,
    +				Integer.MAX_VALUE,
    +				Integer.MAX_VALUE,
    +				0,
    +				0,
    +				10000, // Limit write to 10 kbytes/s
    +				10000); // Limit read to 10 kbytes/s
    +		File file = tempFolder.newFile();
    +		Path path = new Path(file.toURI());
    +		long durationWrite = System.currentTimeMillis();
    +		try (FSDataOutputStream stream = limitedFs.create(path, WriteMode.OVERWRITE)) {
    +			final Random rnd = new Random();
    +			final byte[] data = new byte[100];
    +			for (int i = 0; i < (1000 + 10); i++) {
    +				rnd.nextBytes(data);
    +				stream.write(data);
    +			}
    +		}
    +		durationWrite = System.currentTimeMillis() - durationWrite;
    +
    +		long durationRead = System.currentTimeMillis();
    +		final byte[] data = new byte[100];
    +		try (FSDataInputStream stream = limitedFs.open(path)) {
    +			//noinspection StatementWithEmptyBody
    +			while (stream.read(data) != -1) {}
    +		}
    +		durationRead = System.currentTimeMillis() - durationRead;
    +		file.delete();
    +		assertTrue(durationWrite > 10000);
    +		assertTrue(durationRead > 8000); // Less stability with read limiter than write
    --- End diff --
    
    I agree with the burst behaviour of guava RateLimiter but I don't explain why we don't have symmetric behaviour between read and write : 
    * For the write I am always above the 10s (in my test)
    * For the read I am sometimes below 
    I played differents tests yesterday and don't have clues about the differences. 


---

[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6149


---

[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6149#discussion_r195685352
  
    --- Diff: flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java ---
    @@ -122,6 +122,42 @@ public void testLimitingInputStreams() throws Exception {
     		}
     	}
     
    +	@Test
    +	public void testLimitingRateLimitingStream() throws Exception {
    +		final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
    +				LocalFileSystem.getSharedInstance(),
    +				Integer.MAX_VALUE,
    +				Integer.MAX_VALUE,
    +				Integer.MAX_VALUE,
    +				0,
    +				0,
    +				10000, // Limit write to 10 kbytes/s
    +				10000); // Limit read to 10 kbytes/s
    +		File file = tempFolder.newFile();
    +		Path path = new Path(file.toURI());
    +		long durationWrite = System.currentTimeMillis();
    +		try (FSDataOutputStream stream = limitedFs.create(path, WriteMode.OVERWRITE)) {
    +			final Random rnd = new Random();
    +			final byte[] data = new byte[100];
    +			for (int i = 0; i < (1000 + 10); i++) {
    +				rnd.nextBytes(data);
    +				stream.write(data);
    +			}
    +		}
    +		durationWrite = System.currentTimeMillis() - durationWrite;
    +
    +		long durationRead = System.currentTimeMillis();
    +		final byte[] data = new byte[100];
    +		try (FSDataInputStream stream = limitedFs.open(path)) {
    +			//noinspection StatementWithEmptyBody
    +			while (stream.read(data) != -1) {}
    +		}
    +		durationRead = System.currentTimeMillis() - durationRead;
    +		file.delete();
    +		assertTrue(durationWrite > 10000);
    +		assertTrue(durationRead > 8000); // Less stability with read limiter than write
    --- End diff --
    
    I think burst credits are accumulated over time. If you create RateLimiter in `LimitedConnectionsFileSystem` constructor and immediately start using the output rate limiter, it will not have any burst credits accumulated. However input rate limiter will be used only after couple of seconds (when write has finished), thus it will accumulate burst credits.
    
    Btw, that makes me thing, do we really want to have two separate limiters for read and writes? Are the quotas on S3 accounted also separately? Or usually is there a single quota for reads and writes together? If the latter one, it would be more handy to have one shared `RateLimiter` between reads and writes.


---

[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6149#discussion_r195138105
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java ---
    @@ -731,6 +791,9 @@ public boolean checkNewBytesAndMark(long timestamp) throws IOException {
     		public void write(int b) throws IOException {
     			try {
     				originalStream.write(b);
    +				if (fs.outputRateLimiter != null){
    --- End diff --
    
    With optional: `rate.ifPresent(RateLimiter::acquire);`


---

[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6149#discussion_r197729438
  
    --- Diff: flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java ---
    @@ -122,6 +122,42 @@ public void testLimitingInputStreams() throws Exception {
     		}
     	}
     
    +	@Test
    +	public void testLimitingRateLimitingStream() throws Exception {
    +		final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
    +				LocalFileSystem.getSharedInstance(),
    +				Integer.MAX_VALUE,
    +				Integer.MAX_VALUE,
    +				Integer.MAX_VALUE,
    +				0,
    +				0,
    +				10000, // Limit write to 10 kbytes/s
    +				10000); // Limit read to 10 kbytes/s
    +		File file = tempFolder.newFile();
    +		Path path = new Path(file.toURI());
    +		long durationWrite = System.currentTimeMillis();
    +		try (FSDataOutputStream stream = limitedFs.create(path, WriteMode.OVERWRITE)) {
    +			final Random rnd = new Random();
    +			final byte[] data = new byte[100];
    +			for (int i = 0; i < (1000 + 10); i++) {
    +				rnd.nextBytes(data);
    +				stream.write(data);
    +			}
    +		}
    +		durationWrite = System.currentTimeMillis() - durationWrite;
    +
    +		long durationRead = System.currentTimeMillis();
    +		final byte[] data = new byte[100];
    +		try (FSDataInputStream stream = limitedFs.open(path)) {
    +			//noinspection StatementWithEmptyBody
    +			while (stream.read(data) != -1) {}
    +		}
    +		durationRead = System.currentTimeMillis() - durationRead;
    +		file.delete();
    +		assertTrue(durationWrite > 10000);
    +		assertTrue(durationRead > 8000); // Less stability with read limiter than write
    --- End diff --
    
    @etiennecarriere could you apply the test changes that I suggested above, so that we could merge this PR?


---

[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6149#discussion_r195657899
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java ---
    @@ -940,7 +932,9 @@ public int read() throws IOException {
     		public int read(byte[] b) throws IOException {
     			try {
     				int len = originalStream.read(b);
    -				fs.inputRateLimiter.ifPresent(limiter -> limiter.acquire(len));
    +				if (len >= 0) {
    --- End diff --
    
    was adding this `if (len >= 0)` check a bug fix?


---

[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6149#discussion_r196008935
  
    --- Diff: flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java ---
    @@ -122,6 +122,42 @@ public void testLimitingInputStreams() throws Exception {
     		}
     	}
     
    +	@Test
    +	public void testLimitingRateLimitingStream() throws Exception {
    +		final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
    +				LocalFileSystem.getSharedInstance(),
    +				Integer.MAX_VALUE,
    +				Integer.MAX_VALUE,
    +				Integer.MAX_VALUE,
    +				0,
    +				0,
    +				10000, // Limit write to 10 kbytes/s
    +				10000); // Limit read to 10 kbytes/s
    +		File file = tempFolder.newFile();
    +		Path path = new Path(file.toURI());
    +		long durationWrite = System.currentTimeMillis();
    +		try (FSDataOutputStream stream = limitedFs.create(path, WriteMode.OVERWRITE)) {
    +			final Random rnd = new Random();
    +			final byte[] data = new byte[100];
    +			for (int i = 0; i < (1000 + 10); i++) {
    +				rnd.nextBytes(data);
    +				stream.write(data);
    +			}
    +		}
    +		durationWrite = System.currentTimeMillis() - durationWrite;
    +
    +		long durationRead = System.currentTimeMillis();
    +		final byte[] data = new byte[100];
    +		try (FSDataInputStream stream = limitedFs.open(path)) {
    +			//noinspection StatementWithEmptyBody
    +			while (stream.read(data) != -1) {}
    +		}
    +		durationRead = System.currentTimeMillis() - durationRead;
    +		file.delete();
    +		assertTrue(durationWrite > 10000);
    +		assertTrue(durationRead > 8000); // Less stability with read limiter than write
    --- End diff --
    
    Ok, I'm fine to keep it as it is.


---

[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6149#discussion_r195341250
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java ---
    @@ -278,15 +278,15 @@
     	 * Unlimited be default.
     	 */
     	public static ConfigOption<Long> rateLimitingInputBytesPerSeconds(String scheme) {
    -		return ConfigOptions.key("fs." + scheme + ".limit.rateLimitingInput").defaultValue(0L);
    +		return ConfigOptions.key("fs." + scheme + ".limit.input-rate-limit").defaultValue(0L);
    --- End diff --
    
    Please rename also variables/methods to match the new config name and match the other properties above:
    `s/rateLimitingInputBytesPerSeconds/fileSystemConnectionLimitInputRate/g`
    `s/rateLimitingOututBytesPerSeconds/fileSystemConnectionLimitOutputRate/g`


---

[GitHub] flink issue #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by etiennecarriere <gi...@git.apache.org>.
Github user etiennecarriere commented on the issue:

    https://github.com/apache/flink/pull/6149
  
    @pnowojski , I already addressed the small remarks but not the test part. 
    I will try to propose a first test versions in the next days. 


---

[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6149#discussion_r195136244
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/ConnectionLimitingFactory.java ---
    @@ -62,7 +62,9 @@ public FileSystem create(URI fsUri) throws IOException {
     		FileSystem original = factory.create(fsUri);
     		return new LimitedConnectionsFileSystem(original,
     				settings.limitTotal, settings.limitOutput, settings.limitInput,
    -				settings.streamOpenTimeout, settings.streamInactivityTimeout);
    +				settings.streamOpenTimeout, settings.streamInactivityTimeout,
    --- End diff --
    
    please reformat previous arguments (one per line) since you have just touched this one


---

[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6149#discussion_r195134228
  
    --- Diff: docs/ops/filesystems.md ---
    @@ -102,6 +102,8 @@ fs.<scheme>.limit.input: (number, 0/-1 mean no limit)
     fs.<scheme>.limit.output: (number, 0/-1 mean no limit)
     fs.<scheme>.limit.timeout: (milliseconds, 0 means infinite)
     fs.<scheme>.limit.stream-timeout: (milliseconds, 0 means infinite)
    +fs.<scheme>.limit.rateLimitingInput: (bytes/s, 0 means infinite)
    +fs.<scheme>.limit.rateLimitingOutput: (bytes/s, 0 means infinite)
    --- End diff --
    
    `output-rate-limit`? 


---

[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6149#discussion_r195340311
  
    --- Diff: docs/ops/filesystems.md ---
    @@ -102,8 +102,8 @@ fs.<scheme>.limit.input: (number, 0/-1 mean no limit)
     fs.<scheme>.limit.output: (number, 0/-1 mean no limit)
     fs.<scheme>.limit.timeout: (milliseconds, 0 means infinite)
     fs.<scheme>.limit.stream-timeout: (milliseconds, 0 means infinite)
    -fs.<scheme>.limit.rateLimitingInput: (bytes/s, 0 means infinite)
    -fs.<scheme>.limit.rateLimitingOutput: (bytes/s, 0 means infinite)
    +fs.<scheme>.limit.input-rate-limit: (bytes/s, 0 means infinite . By default, there is no limits)
    --- End diff --
    
    `there is no limit` (limit instead of limits)
    
    And sorry for confusion, but I just noticed that word `limit` occurs twice in property name. It would be enough to name it:
    `fs.<scheme>.limit.input-rate`
    and
    `fs.<scheme>.limit.output-rate`
    Sorry for previous comment in wrong direction!


---

[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6149#discussion_r195346228
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java ---
    @@ -1032,19 +1113,50 @@ public ConnectionLimitingSettings(
     				int limitOutput,
     				long streamOpenTimeout,
     				long streamInactivityTimeout) {
    +			this(limitTotal, limitInput, limitOutput, streamOpenTimeout, streamInactivityTimeout, 0, 0);
    +		}
    +
    +
    +		/**
    +		 * Creates a new ConnectionLimitingSettings with the given parameters.
    +		 *
    +		 * @param limitTotal The limit for the total number of connections, or 0, if no limit.
    +		 * @param limitInput The limit for the number of input stream connections, or 0, if no limit.
    +		 * @param limitOutput The limit for the number of output stream connections, or 0, if no limit.
    +		 * @param streamOpenTimeout       The maximum number of milliseconds that the file system will wait when
    +		 *                                no more connections are currently permitted.
    +		 * @param streamInactivityTimeout The milliseconds that a stream may spend not writing any
    +		 *                                bytes before it is closed as inactive.
    +		 * @param rateLimitingInputBytesPerSecond the allowed rate (bytes/s) that can be red from FileSystem
    --- End diff --
    
    same here - in whole file, please unify argument naming with the config parameter (`rateLimitingInput***` -> `limitInputRateBytesPerSecond`


---

[GitHub] flink issue #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by etiennecarriere <gi...@git.apache.org>.
Github user etiennecarriere commented on the issue:

    https://github.com/apache/flink/pull/6149
  
    @pnowojski , I add a unit test which validate the rate limiting feature but : 
    * It add 20s to the unit test . Would it be better to move to Integration Test (even if it is not an Integration Test)
    * I take 10 seconds for each test to have some stability
    
    Open to your suggestions. 


---

[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6149#discussion_r195345480
  
    --- Diff: flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java ---
    @@ -122,4 +125,28 @@ public static boolean hasHDFSDelegationToken() throws Exception {
     		}
     		return false;
     	}
    +
    +	/**
    +	 * Parse configuration and load the limited FS wrapper.
    +	 */
    +	public static FileSystem limitIfConfigured(FileSystem fs, String scheme, org.apache.flink.configuration.Configuration config) {
    --- End diff --
    
    ```
    public static FileSystem limitIfConfigured(
            FileSystem fs,
            String scheme,
            Optional<org.apache.flink.configuration.Configuration> config) {
        if (!config.isPresent()) {
            return fs;
        }
        ....
    ```


---

[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by etiennecarriere <gi...@git.apache.org>.
Github user etiennecarriere commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6149#discussion_r195687859
  
    --- Diff: flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java ---
    @@ -122,6 +122,42 @@ public void testLimitingInputStreams() throws Exception {
     		}
     	}
     
    +	@Test
    +	public void testLimitingRateLimitingStream() throws Exception {
    +		final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
    +				LocalFileSystem.getSharedInstance(),
    +				Integer.MAX_VALUE,
    +				Integer.MAX_VALUE,
    +				Integer.MAX_VALUE,
    +				0,
    +				0,
    +				10000, // Limit write to 10 kbytes/s
    +				10000); // Limit read to 10 kbytes/s
    +		File file = tempFolder.newFile();
    +		Path path = new Path(file.toURI());
    +		long durationWrite = System.currentTimeMillis();
    +		try (FSDataOutputStream stream = limitedFs.create(path, WriteMode.OVERWRITE)) {
    +			final Random rnd = new Random();
    +			final byte[] data = new byte[100];
    +			for (int i = 0; i < (1000 + 10); i++) {
    +				rnd.nextBytes(data);
    +				stream.write(data);
    +			}
    +		}
    +		durationWrite = System.currentTimeMillis() - durationWrite;
    +
    +		long durationRead = System.currentTimeMillis();
    +		final byte[] data = new byte[100];
    +		try (FSDataInputStream stream = limitedFs.open(path)) {
    +			//noinspection StatementWithEmptyBody
    +			while (stream.read(data) != -1) {}
    +		}
    +		durationRead = System.currentTimeMillis() - durationRead;
    +		file.delete();
    +		assertTrue(durationWrite > 10000);
    +		assertTrue(durationRead > 8000); // Less stability with read limiter than write
    --- End diff --
    
    My initial need is only for the output Ratelimiter :  we want to be able to limit the rate of sending checkpoint to s3/hadoop because it is in concurrence with flink exchange of data between task manager (we are on an infrastructure with only gigabit) 
    I implemented input and output because some people could also want to limit input . (even if I don't see real use cases for input ratelimiter)
    I implemented separated limiter because on a full duplex network it is possible you want to privilege one part of the traffic but I am also ok to have a shared ratelimiter


---

[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6149#discussion_r195681277
  
    --- Diff: flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java ---
    @@ -122,6 +122,42 @@ public void testLimitingInputStreams() throws Exception {
     		}
     	}
     
    +	@Test
    +	public void testLimitingRateLimitingStream() throws Exception {
    +		final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
    +				LocalFileSystem.getSharedInstance(),
    +				Integer.MAX_VALUE,
    +				Integer.MAX_VALUE,
    +				Integer.MAX_VALUE,
    +				0,
    +				0,
    +				10000, // Limit write to 10 kbytes/s
    +				10000); // Limit read to 10 kbytes/s
    +		File file = tempFolder.newFile();
    +		Path path = new Path(file.toURI());
    +		long durationWrite = System.currentTimeMillis();
    +		try (FSDataOutputStream stream = limitedFs.create(path, WriteMode.OVERWRITE)) {
    +			final Random rnd = new Random();
    +			final byte[] data = new byte[100];
    +			for (int i = 0; i < (1000 + 10); i++) {
    +				rnd.nextBytes(data);
    +				stream.write(data);
    +			}
    +		}
    +		durationWrite = System.currentTimeMillis() - durationWrite;
    +
    +		long durationRead = System.currentTimeMillis();
    +		final byte[] data = new byte[100];
    +		try (FSDataInputStream stream = limitedFs.open(path)) {
    +			//noinspection StatementWithEmptyBody
    +			while (stream.read(data) != -1) {}
    +		}
    +		durationRead = System.currentTimeMillis() - durationRead;
    +		file.delete();
    +		assertTrue(durationWrite > 10000);
    +		assertTrue(durationRead > 8000); // Less stability with read limiter than write
    --- End diff --
    
    I have debugged it, and my guess is that this instability came from bursty reads (`org.apache.flink.shaded.guava18.com.google.common.util.concurrent.SmoothRateLimiter.SmoothBursty`). Default `RateLimiter` comes with 1 second of max burst duration and this "burst credits" are accumulated for reads while this test is writing. I have modified this test to take that into account (and added test case for single byte writes/reads), please check the code below:
    
    ```
    
    	@Test
    	public void testLimitingRateLimitingStream() throws Exception {
    		testLimitingRateLimitingStream(10);
    	}
    
    	@Test
    	public void testLimitingRateLimitingStreamSingeByteWrites() throws Exception {
    		testLimitingRateLimitingStream(1);
    	}
    
    	private void testLimitingRateLimitingStream(int chunkSize) throws Exception {
    		long dataRate = 1000; // Limit read to 1000 bytes/s
    		// Allowed burst duration in SmoothBursty when creating RateLimiter. This value is copied from `RateLimiter#create(SleepingStopwatch, double)`
    		long burstDurationSeconds = 1; 
    		long burstData = dataRate * burstDurationSeconds;
    		long dataToWrite = 2000;
    		assertTrue(
    			"Becauese of burst writes/reads, dataToWrite must be larger then burstData",
    			dataToWrite > burstData);
    		final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
    			LocalFileSystem.getSharedInstance(),
    			Integer.MAX_VALUE,
    			Integer.MAX_VALUE,
    			Integer.MAX_VALUE,
    			0,
    			0,
    			dataRate,
    			dataRate);
    		File file = tempFolder.newFile();
    		Path path = new Path(file.toURI());
    		long durationWrite = System.currentTimeMillis();
    		try (FSDataOutputStream stream = limitedFs.create(path, WriteMode.OVERWRITE)) {
    			final byte[] data = new byte[chunkSize];
    			for (int i = 0; i < dataToWrite; i += chunkSize) {
    				if (chunkSize == 1) {
    					stream.write(42);
    				}
    				else {
    					stream.write(data);
    				}
    			}
    		}
    		durationWrite = System.currentTimeMillis() - durationWrite;
    
    		long durationRead = System.currentTimeMillis();
    		try (FSDataInputStream stream = limitedFs.open(path)) {
    			final byte[] data = new byte[chunkSize];
    			//noinspection StatementWithEmptyBody
    			int result = 0;
    			while (result != -1) {
    				result = chunkSize == 1 ? stream.read() : stream.read(data);
    			}
    		}
    		durationRead = System.currentTimeMillis() - durationRead;
    		file.delete();
    		long expectedDuration = (long) (((dataToWrite - burstData) * 1000) / dataRate * (0.9));
    		assertThat(durationWrite, greaterThan(expectedDuration));
    		assertThat(durationRead, greaterThan(expectedDuration));
    	}
    ```


---

[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6149#discussion_r195138248
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java ---
    @@ -741,6 +804,10 @@ public void write(int b) throws IOException {
     		public void write(byte[] b, int off, int len) throws IOException {
     			try {
     				originalStream.write(b, off, len);
    +				if (fs.outputRateLimiter != null){
    --- End diff --
    
    With optional: `rate.ifPresent(limiter -> limiter.acquire(len));`


---

[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by etiennecarriere <gi...@git.apache.org>.
Github user etiennecarriere commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6149#discussion_r195682592
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java ---
    @@ -940,7 +932,9 @@ public int read() throws IOException {
     		public int read(byte[] b) throws IOException {
     			try {
     				int len = originalStream.read(b);
    -				fs.inputRateLimiter.ifPresent(limiter -> limiter.acquire(len));
    +				if (len >= 0) {
    --- End diff --
    
    yes :
    + InputStream.read() return number of bytes reads or -1 if end of stream is reached . 
    + RateLimiter.acquire(n) only accept positive number (which is logic)



---

[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6149#discussion_r195342365
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java ---
    @@ -313,14 +313,22 @@ public int getNumberOfOpenInputStreams() {
     	 * Get the rate limitation on Input (bytes/s).
     	 */
     	public long getRateLimitingInput(){
    -		return (long) inputRateLimiter.getRate();
    +		if (inputRateLimiter.isPresent()){
    --- End diff --
    
    Nitty nit (matter of taste): `return (long) inputRateLimiter.map(RateLimiter::getRate).getOrDefault(0)` if you prefer if/else I'm fine with that.
    
    Btw, `Optional` saved the day. Previously there were some missed null pointer exceptions :)


---

[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6149#discussion_r195344817
  
    --- Diff: flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java ---
    @@ -132,7 +132,12 @@ else if (scheme != null && authority == null) {
     			final S3AFileSystem fs = new S3AFileSystem();
     			fs.initialize(fsUri, hadoopConfig);
     
    -			return new HadoopFileSystem(fs);
    +			if (flinkConfig != null) {
    --- End diff --
    
    Can you pull this if into `HadoopUtils.limitIfConfigured` method? That way those invocations would simply look like: `limitIfConfigured(new HadoopFileSystem(fs), scheme, Optional.ofNullable(flinkConfig))` without duplicated it/else branch.


---

[GitHub] flink issue #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/6149
  
    @etiennecarriere that's a bad news. Nevertheless good that you have found that! Thanks for the contribution and making sure does (not :( ) work.


---

[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6149#discussion_r195134150
  
    --- Diff: docs/ops/filesystems.md ---
    @@ -102,6 +102,8 @@ fs.<scheme>.limit.input: (number, 0/-1 mean no limit)
     fs.<scheme>.limit.output: (number, 0/-1 mean no limit)
     fs.<scheme>.limit.timeout: (milliseconds, 0 means infinite)
     fs.<scheme>.limit.stream-timeout: (milliseconds, 0 means infinite)
    +fs.<scheme>.limit.rateLimitingInput: (bytes/s, 0 means infinite)
    --- End diff --
    
    `input-rate-limit`? + document  default value? Or the default value is visible via some automatically generated docs?


---

[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6149#discussion_r195142699
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java ---
    @@ -166,12 +174,47 @@ public LimitedConnectionsFileSystem(
     			int maxNumOpenInputStreams,
     			long streamOpenTimeout,
     			long streamInactivityTimeout) {
    +		this(originalFs, maxNumOpenStreamsTotal, maxNumOpenOutputStreams, maxNumOpenInputStreams, streamOpenTimeout, streamInactivityTimeout, 0, 0);
    +	}
    +
    +	/**
    +	 * Creates a new output connection limiting file system, limiting input and output streams with
    +	 * potentially different quotas.
    +	 *
    +	 * <p>If streams are inactive (meaning not writing bytes) for longer than the given timeout,
    +	 * then they are terminated as "inactive", to prevent that the limited number of connections gets
    +	 * stuck on only blocked threads.
    +	 *
    +	 * @param originalFs              The original file system to which connections are limited.
    +	 * @param maxNumOpenStreamsTotal  The maximum number of concurrent open streams (0 means no limit).
    +	 * @param maxNumOpenOutputStreams The maximum number of concurrent open output streams (0 means no limit).
    +	 * @param maxNumOpenInputStreams  The maximum number of concurrent open input streams (0 means no limit).
    +	 * @param streamOpenTimeout       The maximum number of milliseconds that the file system will wait when
    +	 *                                no more connections are currently permitted.
    +	 * @param streamInactivityTimeout The milliseconds that a stream may spend not writing any
    +	 *                                bytes before it is closed as inactive.
    +	 * @param inputBytesPerSecondRate The rate limiting of Bytes red per second on the FileSystem (0 means no limit)
    +	 * @param outputBytesPerSecondRate The rate limiting of Bytes written per second on the FileSystem (0 means no limit)
    +	 */
    +
    +	public LimitedConnectionsFileSystem(
    +			FileSystem originalFs,
    +			int maxNumOpenStreamsTotal,
    +			int maxNumOpenOutputStreams,
    +			int maxNumOpenInputStreams,
    +			long streamOpenTimeout,
    +			long streamInactivityTimeout,
    +			long inputBytesPerSecondRate,
    +			long outputBytesPerSecondRate
    +			) {
    --- End diff --
    
    remove the newline after last argument


---

[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6149#discussion_r195137063
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java ---
    @@ -88,6 +90,12 @@
     	/** The nanoseconds that a stream may spend not writing any bytes before it is closed as inactive. */
     	private final long streamInactivityTimeoutNanos;
     
    +	/** Rate limiter of incoming bytes for this filesystem. */
    +	private final RateLimiter inputRateLimiter;
    --- End diff --
    
    Instead of nullable field, please use `Optional<RateLimiter>` on both fields. annotating `@Nullable` wouldn't suffice, since unfortunately we do not enforce such annotation and `Optional` is always enforced.


---