You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2018/12/03 07:03:02 UTC

[flink] branch master updated (9956bc7 -> 8ba8337)

This is an automated email from the ASF dual-hosted git repository.

gary pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 9956bc7  [FLINK-10798] Add the version number of Flink 1.7 to MigrationVersion
     new 529288d  [FLINK-10149][mesos] Don't allocate extra mesos port for TM unless configured to do so.
     new ffa0a9e  [FLINK-10149][mesos] Replace string concatenation with slf4j placeholders
     new 419a353  [FLINK-10149][mesos] Make returned port keys set immutable.
     new 8ba8337  [FLINK-10149][mesos] Simplify assertions in LaunchableMesosWorkerTest

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/mesos/configuration/MesosOptions.java    |  5 ++--
 .../clusterframework/LaunchableMesosWorker.java    | 20 ++++++++------
 .../LaunchableMesosWorkerTest.java                 | 32 ++++++++++++++++------
 3 files changed, 38 insertions(+), 19 deletions(-)


[flink] 03/04: [FLINK-10149][mesos] Make returned port keys set immutable.

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 419a3535e8aa1688828c03ef022c7aee3ba56700
Author: Gary Yao <ga...@data-artisans.com>
AuthorDate: Fri Nov 30 11:20:18 2018 +0100

    [FLINK-10149][mesos] Make returned port keys set immutable.
---
 .../flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 81ee795..1eb5679 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -353,7 +353,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
 				.forEach(tmPortKeys::add);
 		}
 
-		return tmPortKeys;
+		return Collections.unmodifiableSet(tmPortKeys);
 	}
 
 	@Override


[flink] 02/04: [FLINK-10149][mesos] Replace string concatenation with slf4j placeholders

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ffa0a9e453b8f770a8fe8db08f8324de330db569
Author: Gary Yao <ga...@data-artisans.com>
AuthorDate: Fri Nov 30 11:17:14 2018 +0100

    [FLINK-10149][mesos] Replace string concatenation with slf4j placeholders
---
 .../flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 35bea99..81ee795 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -349,7 +349,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		if (portKeys != null) {
 			Arrays.stream(portKeys.split(","))
 				.map(String::trim)
-				.peek(key -> LOG.debug("Adding port key " + key + " to mesos request"))
+				.peek(key -> LOG.debug("Adding port key {} to mesos request"))
 				.forEach(tmPortKeys::add);
 		}
 


[flink] 01/04: [FLINK-10149][mesos] Don't allocate extra mesos port for TM unless configured to do so.

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 529288dfac1fb82b3c15d5acaeec2e54676a930d
Author: Rune Skou Larsen <rs...@trifork.com>
AuthorDate: Wed Aug 15 12:33:54 2018 +0200

    [FLINK-10149][mesos] Don't allocate extra mesos port for TM unless configured to do so.
    
    This closes #7203.
---
 .../apache/flink/mesos/configuration/MesosOptions.java    |  5 +++--
 .../runtime/clusterframework/LaunchableMesosWorker.java   | 10 ++++++----
 .../clusterframework/LaunchableMesosWorkerTest.java       | 15 +++++++++++++++
 3 files changed, 24 insertions(+), 6 deletions(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
index 426a891..0c4e1f6 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
@@ -136,8 +136,9 @@ public class MesosOptions {
 	/**
 	 * Config parameter to configure which configuration keys will dynamically get a port assigned through Mesos.
 	 */
-	public static final ConfigOption<String> PORT_ASSIGNMENTS = key("mesos.resourcemanager.tasks.port-assignments")
-		.defaultValue("")
+	public static final ConfigOption<String> PORT_ASSIGNMENTS =
+		key("mesos.resourcemanager.tasks.port-assignments")
+		.noDefaultValue()
 		.withDescription(Description.builder()
 			.text("Comma-separated list of configuration keys which represent a configurable port. " +
 				"All port keys will dynamically get a port assigned through Mesos.")
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 84ec222..35bea99 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -346,10 +346,12 @@ public class LaunchableMesosWorker implements LaunchableTask {
 
 		final String portKeys = config.getString(PORT_ASSIGNMENTS);
 
-		Arrays.stream(portKeys.split(","))
-			.map(String::trim)
-			.peek(key -> LOG.debug("Adding port key " + key + " to mesos request"))
-			.forEach(tmPortKeys::add);
+		if (portKeys != null) {
+			Arrays.stream(portKeys.split(","))
+				.map(String::trim)
+				.peek(key -> LOG.debug("Adding port key " + key + " to mesos request"))
+				.forEach(tmPortKeys::add);
+		}
 
 		return tmPortKeys;
 	}
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
index 6784e42..7fc99d2 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
@@ -52,4 +52,19 @@ public class LaunchableMesosWorkerTest extends TestLogger {
 		assertEquals("port key must be correct", "anotherport", iterator.next());
 	}
 
+	@Test
+	public void canGetNoPortKeys() {
+		// Setup
+		Configuration config = new Configuration();
+
+		// Act
+		Set<String> portKeys = LaunchableMesosWorker.extractPortKeys(config);
+
+		// Assert
+		assertEquals("Must get right number of port keys", 2, portKeys.size());
+		Iterator<String> iterator = portKeys.iterator();
+		assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[0], iterator.next());
+		assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[1], iterator.next());
+	}
+
 }


[flink] 04/04: [FLINK-10149][mesos] Simplify assertions in LaunchableMesosWorkerTest

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8ba8337941fd1f53f217d0fe1dfe749467db9a82
Author: Gary Yao <ga...@data-artisans.com>
AuthorDate: Fri Nov 30 11:50:39 2018 +0100

    [FLINK-10149][mesos] Simplify assertions in LaunchableMesosWorkerTest
    
    Use Set equality to simplify test assertions.
    
    Change type of field LaunchableMesosWorker#TM_PORT_KEYS to Set<String>.
---
 .../clusterframework/LaunchableMesosWorker.java    |  8 ++++---
 .../LaunchableMesosWorkerTest.java                 | 25 +++++++++++-----------
 2 files changed, 17 insertions(+), 16 deletions(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 1eb5679..637442c 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -64,12 +65,13 @@ import static org.apache.flink.mesos.configuration.MesosOptions.PORT_ASSIGNMENTS
 public class LaunchableMesosWorker implements LaunchableTask {
 
 	protected static final Logger LOG = LoggerFactory.getLogger(LaunchableMesosWorker.class);
+
 	/**
 	 * The set of configuration keys to be dynamically configured with a port allocated from Mesos.
 	 */
-	static final String[] TM_PORT_KEYS = {
+	static final Set<String> TM_PORT_KEYS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
 		"taskmanager.rpc.port",
-		"taskmanager.data.port"};
+		"taskmanager.data.port")));
 
 	private final MesosArtifactResolver resolver;
 	private final ContainerSpecification containerSpec;
@@ -342,7 +344,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
 	 * @return A deterministically ordered Set of port keys to expose from the TM container
 	 */
 	static Set<String> extractPortKeys(Configuration config) {
-		final LinkedHashSet<String> tmPortKeys = new LinkedHashSet<>(Arrays.asList(TM_PORT_KEYS));
+		final LinkedHashSet<String> tmPortKeys = new LinkedHashSet<>(TM_PORT_KEYS);
 
 		final String portKeys = config.getString(PORT_ASSIGNMENTS);
 
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
index 7fc99d2..48a436c 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
@@ -23,11 +23,14 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
-import java.util.Iterator;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Set;
 
 import static org.apache.flink.mesos.configuration.MesosOptions.PORT_ASSIGNMENTS;
-import static org.junit.Assert.assertEquals;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
 
 /**
  * Test that mesos config are extracted correctly from the configuration.
@@ -37,19 +40,18 @@ public class LaunchableMesosWorkerTest extends TestLogger {
 	@Test
 	public void canGetPortKeys() {
 		// Setup
+		Set<String> additionalPorts = new HashSet<>(Arrays.asList("someport.here", "anotherport"));
+
 		Configuration config = new Configuration();
-		config.setString(PORT_ASSIGNMENTS, "someport.here,anotherport");
+		config.setString(PORT_ASSIGNMENTS, String.join(",", additionalPorts));
 
 		// Act
 		Set<String> portKeys = LaunchableMesosWorker.extractPortKeys(config);
 
 		// Assert
-		assertEquals("Must get right number of port keys", 4, portKeys.size());
-		Iterator<String> iterator = portKeys.iterator();
-		assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[0], iterator.next());
-		assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[1], iterator.next());
-		assertEquals("port key must be correct", "someport.here", iterator.next());
-		assertEquals("port key must be correct", "anotherport", iterator.next());
+		Set<String> expectedPorts = new HashSet<>(LaunchableMesosWorker.TM_PORT_KEYS);
+		expectedPorts.addAll(additionalPorts);
+		assertThat(portKeys, is(equalTo(expectedPorts)));
 	}
 
 	@Test
@@ -61,10 +63,7 @@ public class LaunchableMesosWorkerTest extends TestLogger {
 		Set<String> portKeys = LaunchableMesosWorker.extractPortKeys(config);
 
 		// Assert
-		assertEquals("Must get right number of port keys", 2, portKeys.size());
-		Iterator<String> iterator = portKeys.iterator();
-		assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[0], iterator.next());
-		assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[1], iterator.next());
+		assertThat(portKeys, is(equalTo(LaunchableMesosWorker.TM_PORT_KEYS)));
 	}
 
 }