You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/12/05 11:32:07 UTC

[2/2] flink git commit: [FLINK-8198] [core] Fix condition for parsing ConnectionLimitingSettings

[FLINK-8198] [core] Fix condition for parsing ConnectionLimitingSettings


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d4762c6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d4762c6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d4762c6

Branch: refs/heads/master
Commit: 2d4762c6c0bc73845549575f21fd3f8dbc466aa9
Parents: 23ea197
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Dec 5 12:21:08 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Dec 5 12:21:08 2017 +0100

----------------------------------------------------------------------
 .../core/fs/LimitedConnectionsFileSystem.java   |  2 +-
 .../fs/LimitedConnectionsConfigurationTest.java | 56 ++++++++++++++++++++
 2 files changed, 57 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2d4762c6/flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
index 5353563..fdf54e0 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
@@ -1074,7 +1074,7 @@ public class LimitedConnectionsFileSystem extends FileSystem {
 			checkLimit(limitOut, limitOutOption);
 
 			// create the settings only, if at least one limit is configured
-			if (totalLimit <= 0 || limitIn <= 0 || limitOut <= 0) {
+			if (totalLimit <= 0 && limitIn <= 0 && limitOut <= 0) {
 				// no limit configured
 				return null;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d4762c6/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java
index 4742a7e..2c30ce8 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.core.fs;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.fs.LimitedConnectionsFileSystem.ConnectionLimitingSettings;
 import org.apache.flink.testutils.TestFileSystem;
 
 import org.junit.Rule;
@@ -29,6 +31,8 @@ import java.net.URI;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -40,6 +44,10 @@ public class LimitedConnectionsConfigurationTest {
 	@Rule
 	public final TemporaryFolder tempDir = new TemporaryFolder();
 
+	/**
+	 * This test validates that the File System is correctly wrapped by the
+	 * file system factories when the corresponding entries are in the configuration.
+	 */
 	@Test
 	public void testConfiguration() throws Exception {
 		final String fsScheme = TestFileSystem.SCHEME;
@@ -81,4 +89,52 @@ public class LimitedConnectionsConfigurationTest {
 			FileSystem.initialize(new Configuration());
 		}
 	}
+
+	/**
+	 * This test checks that the file system connection limiting configuration object
+	 * is properly created.
+	 */
+	@Test
+	public void testConnectionLimitingSettings() {
+		final String scheme = "testscheme";
+
+		// empty config
+		assertNull(ConnectionLimitingSettings.fromConfig(new Configuration(), scheme));
+
+		// only total limit set
+		{
+			Configuration conf = new Configuration();
+			conf.setInteger(CoreOptions.fileSystemConnectionLimit(scheme), 10);
+
+			ConnectionLimitingSettings settings = ConnectionLimitingSettings.fromConfig(conf, scheme);
+			assertNotNull(settings);
+			assertEquals(10, settings.limitTotal);
+			assertEquals(0, settings.limitInput);
+			assertEquals(0, settings.limitOutput);
+		}
+
+		// only input limit set
+		{
+			Configuration conf = new Configuration();
+			conf.setInteger(CoreOptions.fileSystemConnectionLimitIn(scheme), 10);
+
+			ConnectionLimitingSettings settings = ConnectionLimitingSettings.fromConfig(conf, scheme);
+			assertNotNull(settings);
+			assertEquals(0, settings.limitTotal);
+			assertEquals(10, settings.limitInput);
+			assertEquals(0, settings.limitOutput);
+		}
+
+		// only output limit set
+		{
+			Configuration conf = new Configuration();
+			conf.setInteger(CoreOptions.fileSystemConnectionLimitOut(scheme), 10);
+
+			ConnectionLimitingSettings settings = ConnectionLimitingSettings.fromConfig(conf, scheme);
+			assertNotNull(settings);
+			assertEquals(0, settings.limitTotal);
+			assertEquals(0, settings.limitInput);
+			assertEquals(10, settings.limitOutput);
+		}
+	}
 }