You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2018/12/03 07:07:15 UTC

[flink] branch release-1.7 updated (13e0736 -> 0ff5f86)

This is an automated email from the ASF dual-hosted git repository.

gary pushed a change to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 13e0736  [hotfix] [docs] Fix typos in MATCH_RECOGNIZE docs
     new e82df4f  [FLINK-10149][mesos] Don't allocate extra mesos port for TM unless configured to do so.
     new e3017db  [FLINK-10149][mesos] Replace string concatenation with slf4j placeholders
     new 3108567  [FLINK-10149][mesos] Make returned port keys set immutable.
     new 0ff5f86  [FLINK-10149][mesos] Simplify assertions in LaunchableMesosWorkerTest

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/mesos/configuration/MesosOptions.java    |  5 ++--
 .../clusterframework/LaunchableMesosWorker.java    | 20 ++++++++------
 .../LaunchableMesosWorkerTest.java                 | 32 ++++++++++++++++------
 3 files changed, 38 insertions(+), 19 deletions(-)


[flink] 03/04: [FLINK-10149][mesos] Make returned port keys set immutable.

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3108567f886177cba7ae72ad1dd5124c57987860
Author: Gary Yao <ga...@data-artisans.com>
AuthorDate: Fri Nov 30 11:20:18 2018 +0100

    [FLINK-10149][mesos] Make returned port keys set immutable.
---
 .../flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 81ee795..1eb5679 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -353,7 +353,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
 				.forEach(tmPortKeys::add);
 		}
 
-		return tmPortKeys;
+		return Collections.unmodifiableSet(tmPortKeys);
 	}
 
 	@Override


[flink] 01/04: [FLINK-10149][mesos] Don't allocate extra mesos port for TM unless configured to do so.

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e82df4f07f0a4e82410db0d30013b8df32e66720
Author: Rune Skou Larsen <rs...@trifork.com>
AuthorDate: Wed Aug 15 12:33:54 2018 +0200

    [FLINK-10149][mesos] Don't allocate extra mesos port for TM unless configured to do so.
---
 .../apache/flink/mesos/configuration/MesosOptions.java    |  5 +++--
 .../runtime/clusterframework/LaunchableMesosWorker.java   | 10 ++++++----
 .../clusterframework/LaunchableMesosWorkerTest.java       | 15 +++++++++++++++
 3 files changed, 24 insertions(+), 6 deletions(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
index 426a891..0c4e1f6 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
@@ -136,8 +136,9 @@ public class MesosOptions {
 	/**
 	 * Config parameter to configure which configuration keys will dynamically get a port assigned through Mesos.
 	 */
-	public static final ConfigOption<String> PORT_ASSIGNMENTS = key("mesos.resourcemanager.tasks.port-assignments")
-		.defaultValue("")
+	public static final ConfigOption<String> PORT_ASSIGNMENTS =
+		key("mesos.resourcemanager.tasks.port-assignments")
+		.noDefaultValue()
 		.withDescription(Description.builder()
 			.text("Comma-separated list of configuration keys which represent a configurable port. " +
 				"All port keys will dynamically get a port assigned through Mesos.")
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 84ec222..35bea99 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -346,10 +346,12 @@ public class LaunchableMesosWorker implements LaunchableTask {
 
 		final String portKeys = config.getString(PORT_ASSIGNMENTS);
 
-		Arrays.stream(portKeys.split(","))
-			.map(String::trim)
-			.peek(key -> LOG.debug("Adding port key " + key + " to mesos request"))
-			.forEach(tmPortKeys::add);
+		if (portKeys != null) {
+			Arrays.stream(portKeys.split(","))
+				.map(String::trim)
+				.peek(key -> LOG.debug("Adding port key " + key + " to mesos request"))
+				.forEach(tmPortKeys::add);
+		}
 
 		return tmPortKeys;
 	}
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
index 6784e42..7fc99d2 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
@@ -52,4 +52,19 @@ public class LaunchableMesosWorkerTest extends TestLogger {
 		assertEquals("port key must be correct", "anotherport", iterator.next());
 	}
 
+	@Test
+	public void canGetNoPortKeys() {
+		// Setup
+		Configuration config = new Configuration();
+
+		// Act
+		Set<String> portKeys = LaunchableMesosWorker.extractPortKeys(config);
+
+		// Assert
+		assertEquals("Must get right number of port keys", 2, portKeys.size());
+		Iterator<String> iterator = portKeys.iterator();
+		assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[0], iterator.next());
+		assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[1], iterator.next());
+	}
+
 }


[flink] 04/04: [FLINK-10149][mesos] Simplify assertions in LaunchableMesosWorkerTest

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0ff5f86f78a71ae77ccbe0e9abf5a387e339f67a
Author: Gary Yao <ga...@data-artisans.com>
AuthorDate: Fri Nov 30 11:50:39 2018 +0100

    [FLINK-10149][mesos] Simplify assertions in LaunchableMesosWorkerTest
    
    Use Set equality to simplify test assertions.
    
    Change type of field LaunchableMesosWorker#TM_PORT_KEYS to Set<String>.
---
 .../clusterframework/LaunchableMesosWorker.java    |  8 ++++---
 .../LaunchableMesosWorkerTest.java                 | 25 +++++++++++-----------
 2 files changed, 17 insertions(+), 16 deletions(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 1eb5679..637442c 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -64,12 +65,13 @@ import static org.apache.flink.mesos.configuration.MesosOptions.PORT_ASSIGNMENTS
 public class LaunchableMesosWorker implements LaunchableTask {
 
 	protected static final Logger LOG = LoggerFactory.getLogger(LaunchableMesosWorker.class);
+
 	/**
 	 * The set of configuration keys to be dynamically configured with a port allocated from Mesos.
 	 */
-	static final String[] TM_PORT_KEYS = {
+	static final Set<String> TM_PORT_KEYS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
 		"taskmanager.rpc.port",
-		"taskmanager.data.port"};
+		"taskmanager.data.port")));
 
 	private final MesosArtifactResolver resolver;
 	private final ContainerSpecification containerSpec;
@@ -342,7 +344,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
 	 * @return A deterministically ordered Set of port keys to expose from the TM container
 	 */
 	static Set<String> extractPortKeys(Configuration config) {
-		final LinkedHashSet<String> tmPortKeys = new LinkedHashSet<>(Arrays.asList(TM_PORT_KEYS));
+		final LinkedHashSet<String> tmPortKeys = new LinkedHashSet<>(TM_PORT_KEYS);
 
 		final String portKeys = config.getString(PORT_ASSIGNMENTS);
 
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
index 7fc99d2..48a436c 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
@@ -23,11 +23,14 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
-import java.util.Iterator;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Set;
 
 import static org.apache.flink.mesos.configuration.MesosOptions.PORT_ASSIGNMENTS;
-import static org.junit.Assert.assertEquals;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
 
 /**
  * Test that mesos config are extracted correctly from the configuration.
@@ -37,19 +40,18 @@ public class LaunchableMesosWorkerTest extends TestLogger {
 	@Test
 	public void canGetPortKeys() {
 		// Setup
+		Set<String> additionalPorts = new HashSet<>(Arrays.asList("someport.here", "anotherport"));
+
 		Configuration config = new Configuration();
-		config.setString(PORT_ASSIGNMENTS, "someport.here,anotherport");
+		config.setString(PORT_ASSIGNMENTS, String.join(",", additionalPorts));
 
 		// Act
 		Set<String> portKeys = LaunchableMesosWorker.extractPortKeys(config);
 
 		// Assert
-		assertEquals("Must get right number of port keys", 4, portKeys.size());
-		Iterator<String> iterator = portKeys.iterator();
-		assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[0], iterator.next());
-		assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[1], iterator.next());
-		assertEquals("port key must be correct", "someport.here", iterator.next());
-		assertEquals("port key must be correct", "anotherport", iterator.next());
+		Set<String> expectedPorts = new HashSet<>(LaunchableMesosWorker.TM_PORT_KEYS);
+		expectedPorts.addAll(additionalPorts);
+		assertThat(portKeys, is(equalTo(expectedPorts)));
 	}
 
 	@Test
@@ -61,10 +63,7 @@ public class LaunchableMesosWorkerTest extends TestLogger {
 		Set<String> portKeys = LaunchableMesosWorker.extractPortKeys(config);
 
 		// Assert
-		assertEquals("Must get right number of port keys", 2, portKeys.size());
-		Iterator<String> iterator = portKeys.iterator();
-		assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[0], iterator.next());
-		assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[1], iterator.next());
+		assertThat(portKeys, is(equalTo(LaunchableMesosWorker.TM_PORT_KEYS)));
 	}
 
 }


[flink] 02/04: [FLINK-10149][mesos] Replace string concatenation with slf4j placeholders

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e3017db7a54edea22b546d5f427b3f82ade56b25
Author: Gary Yao <ga...@data-artisans.com>
AuthorDate: Fri Nov 30 11:17:14 2018 +0100

    [FLINK-10149][mesos] Replace string concatenation with slf4j placeholders
---
 .../flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 35bea99..81ee795 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -349,7 +349,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		if (portKeys != null) {
 			Arrays.stream(portKeys.split(","))
 				.map(String::trim)
-				.peek(key -> LOG.debug("Adding port key " + key + " to mesos request"))
+				.peek(key -> LOG.debug("Adding port key {} to mesos request"))
 				.forEach(tmPortKeys::add);
 		}