You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2018/12/03 07:12:15 UTC

[flink] branch release-1.6 updated (4d3574e -> dbba9b0)

This is an automated email from the ASF dual-hosted git repository.

gary pushed a change to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 4d3574e  [FLINK-11017][table] Throw exception if constant with YEAR TO MONTH resolution was used for group windows
     new b909d4d  [FLINK-10149][mesos] Don't allocate extra mesos port for TM unless configured to do so.
     new 9b7b6af  [FLINK-10149][mesos] Replace string concatenation with slf4j placeholders
     new bec8d7f  [FLINK-10149][mesos] Make returned port keys set immutable.
     new dbba9b0  [FLINK-10149][mesos] Simplify assertions in LaunchableMesosWorkerTest

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/mesos/configuration/MesosOptions.java    |  5 ++--
 .../clusterframework/LaunchableMesosWorker.java    | 20 ++++++++------
 .../LaunchableMesosWorkerTest.java                 | 32 ++++++++++++++++------
 3 files changed, 38 insertions(+), 19 deletions(-)


[flink] 02/04: [FLINK-10149][mesos] Replace string concatenation with slf4j placeholders

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9b7b6af5fc50cacc5c0b4ab11c78f1046d1e010d
Author: Gary Yao <ga...@data-artisans.com>
AuthorDate: Fri Nov 30 11:17:14 2018 +0100

    [FLINK-10149][mesos] Replace string concatenation with slf4j placeholders
---
 .../flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 35bea99..81ee795 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -349,7 +349,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		if (portKeys != null) {
 			Arrays.stream(portKeys.split(","))
 				.map(String::trim)
-				.peek(key -> LOG.debug("Adding port key " + key + " to mesos request"))
+				.peek(key -> LOG.debug("Adding port key {} to mesos request"))
 				.forEach(tmPortKeys::add);
 		}
 


[flink] 04/04: [FLINK-10149][mesos] Simplify assertions in LaunchableMesosWorkerTest

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dbba9b072c001d6ebc852d4bf36424f9353916bb
Author: Gary Yao <ga...@data-artisans.com>
AuthorDate: Fri Nov 30 11:50:39 2018 +0100

    [FLINK-10149][mesos] Simplify assertions in LaunchableMesosWorkerTest
    
    Use Set equality to simplify test assertions.
    
    Change type of field LaunchableMesosWorker#TM_PORT_KEYS to Set<String>.
---
 .../clusterframework/LaunchableMesosWorker.java    |  8 ++++---
 .../LaunchableMesosWorkerTest.java                 | 25 +++++++++++-----------
 2 files changed, 17 insertions(+), 16 deletions(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 1eb5679..637442c 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -64,12 +65,13 @@ import static org.apache.flink.mesos.configuration.MesosOptions.PORT_ASSIGNMENTS
 public class LaunchableMesosWorker implements LaunchableTask {
 
 	protected static final Logger LOG = LoggerFactory.getLogger(LaunchableMesosWorker.class);
+
 	/**
 	 * The set of configuration keys to be dynamically configured with a port allocated from Mesos.
 	 */
-	static final String[] TM_PORT_KEYS = {
+	static final Set<String> TM_PORT_KEYS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
 		"taskmanager.rpc.port",
-		"taskmanager.data.port"};
+		"taskmanager.data.port")));
 
 	private final MesosArtifactResolver resolver;
 	private final ContainerSpecification containerSpec;
@@ -342,7 +344,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
 	 * @return A deterministically ordered Set of port keys to expose from the TM container
 	 */
 	static Set<String> extractPortKeys(Configuration config) {
-		final LinkedHashSet<String> tmPortKeys = new LinkedHashSet<>(Arrays.asList(TM_PORT_KEYS));
+		final LinkedHashSet<String> tmPortKeys = new LinkedHashSet<>(TM_PORT_KEYS);
 
 		final String portKeys = config.getString(PORT_ASSIGNMENTS);
 
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
index 7fc99d2..48a436c 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
@@ -23,11 +23,14 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
-import java.util.Iterator;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Set;
 
 import static org.apache.flink.mesos.configuration.MesosOptions.PORT_ASSIGNMENTS;
-import static org.junit.Assert.assertEquals;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
 
 /**
  * Test that mesos config are extracted correctly from the configuration.
@@ -37,19 +40,18 @@ public class LaunchableMesosWorkerTest extends TestLogger {
 	@Test
 	public void canGetPortKeys() {
 		// Setup
+		Set<String> additionalPorts = new HashSet<>(Arrays.asList("someport.here", "anotherport"));
+
 		Configuration config = new Configuration();
-		config.setString(PORT_ASSIGNMENTS, "someport.here,anotherport");
+		config.setString(PORT_ASSIGNMENTS, String.join(",", additionalPorts));
 
 		// Act
 		Set<String> portKeys = LaunchableMesosWorker.extractPortKeys(config);
 
 		// Assert
-		assertEquals("Must get right number of port keys", 4, portKeys.size());
-		Iterator<String> iterator = portKeys.iterator();
-		assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[0], iterator.next());
-		assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[1], iterator.next());
-		assertEquals("port key must be correct", "someport.here", iterator.next());
-		assertEquals("port key must be correct", "anotherport", iterator.next());
+		Set<String> expectedPorts = new HashSet<>(LaunchableMesosWorker.TM_PORT_KEYS);
+		expectedPorts.addAll(additionalPorts);
+		assertThat(portKeys, is(equalTo(expectedPorts)));
 	}
 
 	@Test
@@ -61,10 +63,7 @@ public class LaunchableMesosWorkerTest extends TestLogger {
 		Set<String> portKeys = LaunchableMesosWorker.extractPortKeys(config);
 
 		// Assert
-		assertEquals("Must get right number of port keys", 2, portKeys.size());
-		Iterator<String> iterator = portKeys.iterator();
-		assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[0], iterator.next());
-		assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[1], iterator.next());
+		assertThat(portKeys, is(equalTo(LaunchableMesosWorker.TM_PORT_KEYS)));
 	}
 
 }


[flink] 03/04: [FLINK-10149][mesos] Make returned port keys set immutable.

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bec8d7f5c0255dbdcee25cc9a023bb4cf86048be
Author: Gary Yao <ga...@data-artisans.com>
AuthorDate: Fri Nov 30 11:20:18 2018 +0100

    [FLINK-10149][mesos] Make returned port keys set immutable.
---
 .../flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 81ee795..1eb5679 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -353,7 +353,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
 				.forEach(tmPortKeys::add);
 		}
 
-		return tmPortKeys;
+		return Collections.unmodifiableSet(tmPortKeys);
 	}
 
 	@Override


[flink] 01/04: [FLINK-10149][mesos] Don't allocate extra mesos port for TM unless configured to do so.

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b909d4d5bd96e492fe88db7a954b307f99d5c81d
Author: Rune Skou Larsen <rs...@trifork.com>
AuthorDate: Wed Aug 15 12:33:54 2018 +0200

    [FLINK-10149][mesos] Don't allocate extra mesos port for TM unless configured to do so.
---
 .../apache/flink/mesos/configuration/MesosOptions.java    |  5 +++--
 .../runtime/clusterframework/LaunchableMesosWorker.java   | 10 ++++++----
 .../clusterframework/LaunchableMesosWorkerTest.java       | 15 +++++++++++++++
 3 files changed, 24 insertions(+), 6 deletions(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
index 426a891..0c4e1f6 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
@@ -136,8 +136,9 @@ public class MesosOptions {
 	/**
 	 * Config parameter to configure which configuration keys will dynamically get a port assigned through Mesos.
 	 */
-	public static final ConfigOption<String> PORT_ASSIGNMENTS = key("mesos.resourcemanager.tasks.port-assignments")
-		.defaultValue("")
+	public static final ConfigOption<String> PORT_ASSIGNMENTS =
+		key("mesos.resourcemanager.tasks.port-assignments")
+		.noDefaultValue()
 		.withDescription(Description.builder()
 			.text("Comma-separated list of configuration keys which represent a configurable port. " +
 				"All port keys will dynamically get a port assigned through Mesos.")
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 84ec222..35bea99 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -346,10 +346,12 @@ public class LaunchableMesosWorker implements LaunchableTask {
 
 		final String portKeys = config.getString(PORT_ASSIGNMENTS);
 
-		Arrays.stream(portKeys.split(","))
-			.map(String::trim)
-			.peek(key -> LOG.debug("Adding port key " + key + " to mesos request"))
-			.forEach(tmPortKeys::add);
+		if (portKeys != null) {
+			Arrays.stream(portKeys.split(","))
+				.map(String::trim)
+				.peek(key -> LOG.debug("Adding port key " + key + " to mesos request"))
+				.forEach(tmPortKeys::add);
+		}
 
 		return tmPortKeys;
 	}
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
index 6784e42..7fc99d2 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
@@ -52,4 +52,19 @@ public class LaunchableMesosWorkerTest extends TestLogger {
 		assertEquals("port key must be correct", "anotherport", iterator.next());
 	}
 
+	@Test
+	public void canGetNoPortKeys() {
+		// Setup
+		Configuration config = new Configuration();
+
+		// Act
+		Set<String> portKeys = LaunchableMesosWorker.extractPortKeys(config);
+
+		// Assert
+		assertEquals("Must get right number of port keys", 2, portKeys.size());
+		Iterator<String> iterator = portKeys.iterator();
+		assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[0], iterator.next());
+		assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[1], iterator.next());
+	}
+
 }