You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/09 22:29:40 UTC

[1/2] flink git commit: [FLINK-9261] [security] Fix SSL support for REST API and Web UI.

Repository: flink
Updated Branches:
  refs/heads/release-1.5 e7e1baca8 -> 759555c07


[FLINK-9261] [security] Fix SSL support for REST API and Web UI.

- Remove wrong reuse of SSLEngine instances. SSLEngine must be re-created for
  every SocketChannel initialization.
- Add ChunkedWriteHandler to REST server pipeline because StaticFileServerHandler
  relies on it.
- Add integration tests to verify that SSL can be enabled.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a357bede
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a357bede
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a357bede

Branch: refs/heads/release-1.5
Commit: a357bedede566199def38069e9e74ed55b61aefa
Parents: e7e1bac
Author: gyao <ga...@data-artisans.com>
Authored: Wed May 9 13:30:23 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 10 00:28:29 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/net/SSLEngineFactory.java     |  57 +++++++++
 .../org/apache/flink/runtime/net/SSLUtils.java  |  72 +++++++++--
 .../apache/flink/runtime/rest/RestClient.java   |  11 +-
 .../runtime/rest/RestClientConfiguration.java   |  29 ++---
 .../flink/runtime/rest/RestServerEndpoint.java  |  15 ++-
 .../rest/RestServerEndpointConfiguration.java   |  33 +++---
 .../apache/flink/runtime/net/SSLUtilsTest.java  |  28 +++++
 .../runtime/rest/RestServerEndpointITCase.java  | 118 +++++++++++++++----
 8 files changed, 283 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a357bede/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
new file mode 100644
index 0000000..7aca60c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.net;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Creates and configures {@link SSLEngine} instances.
+ */
+public class SSLEngineFactory {
+
+	private final SSLContext sslContext;
+
+	private final String[] enabledProtocols;
+
+	private final String[] enabledCipherSuites;
+
+	private final boolean clientMode;
+
+	public SSLEngineFactory(
+			final SSLContext sslContext,
+			final String[] enabledProtocols,
+			final String[] enabledCipherSuites,
+			final boolean clientMode) {
+		this.sslContext = requireNonNull(sslContext, "sslContext must not be null");
+		this.enabledProtocols = requireNonNull(enabledProtocols, "enabledProtocols must not be null");
+		this.enabledCipherSuites = requireNonNull(enabledCipherSuites, "cipherSuites must not be null");
+		this.clientMode = clientMode;
+	}
+
+	public SSLEngine createSSLEngine() {
+		final SSLEngine sslEngine = sslContext.createSSLEngine();
+		sslEngine.setEnabledProtocols(enabledProtocols);
+		sslEngine.setEnabledCipherSuites(enabledCipherSuites);
+		sslEngine.setUseClientMode(clientMode);
+		return sslEngine;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a357bede/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
index 663d221..b574d30 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
@@ -25,6 +25,7 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
@@ -38,6 +39,9 @@ import java.net.ServerSocket;
 import java.security.KeyStore;
 import java.util.Arrays;
 
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * Common utilities to manage SSL transport settings.
  */
@@ -82,15 +86,61 @@ public class SSLUtils {
 	}
 
 	/**
+	 * Creates a {@link SSLEngineFactory} to be used by the Server.
+	 *
+	 * @param config The application configuration.
+	 */
+	public static SSLEngineFactory createServerSSLEngineFactory(final Configuration config) throws Exception {
+		return createSSLEngineFactory(config, false);
+	}
+
+	/**
+	 * Creates a {@link SSLEngineFactory} to be used by the Client.
+	 * @param config The application configuration.
+	 */
+	public static SSLEngineFactory createClientSSLEngineFactory(final Configuration config) throws Exception {
+		return createSSLEngineFactory(config, true);
+	}
+
+	private static SSLEngineFactory createSSLEngineFactory(
+			final Configuration config,
+			final boolean clientMode) throws Exception {
+
+		final SSLContext sslContext = clientMode ?
+			createSSLClientContext(config) :
+			createSSLServerContext(config);
+
+		checkState(sslContext != null, "%s it not enabled", SecurityOptions.SSL_ENABLED.key());
+
+		return new SSLEngineFactory(
+			sslContext,
+			getEnabledProtocols(config),
+			getEnabledCipherSuites(config),
+			clientMode);
+	}
+
+	/**
 	 * Sets SSL version and cipher suites for SSLEngine.
-	 * @param engine
-	 *        SSLEngine to be handled
-	 * @param config
-	 *        The application configuration
+	 *
+	 * @param engine SSLEngine to be handled
+	 * @param config The application configuration
+	 * @deprecated Use {@link #createClientSSLEngineFactory(Configuration)} or
+	 * {@link #createServerSSLEngineFactory(Configuration)}.
 	 */
+	@Deprecated
 	public static void setSSLVerAndCipherSuites(SSLEngine engine, Configuration config) {
-		engine.setEnabledProtocols(config.getString(SecurityOptions.SSL_PROTOCOL).split(","));
-		engine.setEnabledCipherSuites(config.getString(SecurityOptions.SSL_ALGORITHMS).split(","));
+		engine.setEnabledProtocols(getEnabledProtocols(config));
+		engine.setEnabledCipherSuites(getEnabledCipherSuites(config));
+	}
+
+	private static String[] getEnabledProtocols(final Configuration config) {
+		requireNonNull(config, "config must not be null");
+		return config.getString(SecurityOptions.SSL_PROTOCOL).split(",");
+	}
+
+	private static String[] getEnabledCipherSuites(final Configuration config) {
+		requireNonNull(config, "config must not be null");
+		return config.getString(SecurityOptions.SSL_ALGORITHMS).split(",");
 	}
 
 	/**
@@ -122,6 +172,7 @@ public class SSLUtils {
 	 * @throws Exception
 	 *         Thrown if there is any misconfiguration
 	 */
+	@Nullable
 	public static SSLContext createSSLClientContext(Configuration sslConfig) throws Exception {
 
 		Preconditions.checkNotNull(sslConfig);
@@ -170,6 +221,7 @@ public class SSLUtils {
 	 * @throws Exception
 	 *         Thrown if there is any misconfiguration
 	 */
+	@Nullable
 	public static SSLContext createSSLServerContext(Configuration sslConfig) throws Exception {
 
 		Preconditions.checkNotNull(sslConfig);
@@ -191,14 +243,8 @@ public class SSLUtils {
 			Preconditions.checkNotNull(certPassword, SecurityOptions.SSL_KEY_PASSWORD.key() + " was not configured.");
 
 			KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
-			FileInputStream keyStoreFile = null;
-			try {
-				keyStoreFile = new FileInputStream(new File(keystoreFilePath));
+			try (FileInputStream keyStoreFile = new FileInputStream(new File(keystoreFilePath))) {
 				ks.load(keyStoreFile, keystorePassword.toCharArray());
-			} finally {
-				if (keyStoreFile != null) {
-					keyStoreFile.close();
-				}
 			}
 
 			// Set up key manager factory to use the server key store

http://git-wip-us.apache.org/repos/asf/flink/blob/a357bede/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 8f7dfed..2e812d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.net.SSLEngineFactory;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
@@ -66,8 +67,6 @@ import org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFact
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLEngine;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringWriter;
@@ -93,13 +92,13 @@ public class RestClient {
 		Preconditions.checkNotNull(configuration);
 		this.executor = Preconditions.checkNotNull(executor);
 
-		SSLEngine sslEngine = configuration.getSslEngine();
+		final SSLEngineFactory sslEngineFactory = configuration.getSslEngineFactory();
 		ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
 			@Override
-			protected void initChannel(SocketChannel socketChannel) throws Exception {
+			protected void initChannel(SocketChannel socketChannel) {
 				// SSL should be the first handler in the pipeline
-				if (sslEngine != null) {
-					socketChannel.pipeline().addLast("ssl", new SslHandler(sslEngine));
+				if (sslEngineFactory != null) {
+					socketChannel.pipeline().addLast("ssl", new SslHandler(sslEngineFactory.createSSLEngine()));
 				}
 
 				socketChannel.pipeline()

http://git-wip-us.apache.org/repos/asf/flink/blob/a357bede/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
index 0e98e8f..b1591be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
@@ -21,12 +21,12 @@ package org.apache.flink.runtime.rest;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.net.SSLEngineFactory;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
-import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -37,18 +37,18 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 public final class RestClientConfiguration {
 
 	@Nullable
-	private final SSLEngine sslEngine;
+	private final SSLEngineFactory sslEngineFactory;
 
 	private final long connectionTimeout;
 
 	private final int maxContentLength;
 
 	private RestClientConfiguration(
-			@Nullable final SSLEngine sslEngine,
+			@Nullable final SSLEngineFactory sslEngineFactory,
 			final long connectionTimeout,
 			final int maxContentLength) {
 		checkArgument(maxContentLength > 0, "maxContentLength must be positive, was: %d", maxContentLength);
-		this.sslEngine = sslEngine;
+		this.sslEngineFactory = sslEngineFactory;
 		this.connectionTimeout = connectionTimeout;
 		this.maxContentLength = maxContentLength;
 	}
@@ -58,9 +58,9 @@ public final class RestClientConfiguration {
 	 *
 	 * @return SSLEngine that the REST client endpoint should use, or null if SSL was disabled
 	 */
-
-	public SSLEngine getSslEngine() {
-		return sslEngine;
+	@Nullable
+	public SSLEngineFactory getSslEngineFactory() {
+		return sslEngineFactory;
 	}
 
 	/**
@@ -90,25 +90,22 @@ public final class RestClientConfiguration {
 	public static RestClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
 		Preconditions.checkNotNull(config);
 
-		SSLEngine sslEngine = null;
-		boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
+		final SSLEngineFactory sslEngineFactory;
+		final boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
 		if (enableSSL) {
 			try {
-				SSLContext sslContext = SSLUtils.createSSLServerContext(config);
-				if (sslContext != null) {
-					sslEngine = sslContext.createSSLEngine();
-					SSLUtils.setSSLVerAndCipherSuites(sslEngine, config);
-					sslEngine.setUseClientMode(false);
-				}
+				sslEngineFactory = SSLUtils.createClientSSLEngineFactory(config);
 			} catch (Exception e) {
 				throw new ConfigurationException("Failed to initialize SSLContext for the web frontend", e);
 			}
+		} else {
+			sslEngineFactory = null;
 		}
 
 		final long connectionTimeout = config.getLong(RestOptions.CONNECTION_TIMEOUT);
 
 		int maxContentLength = config.getInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH);
 
-		return new RestClientConfiguration(sslEngine, connectionTimeout, maxContentLength);
+		return new RestClientConfiguration(sslEngineFactory, connectionTimeout, maxContentLength);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a357bede/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 15fbbb2..01d1043 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.net.SSLEngineFactory;
 import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.RouterHandler;
@@ -41,13 +42,13 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCode
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Handler;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
 import org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import javax.net.ssl.SSLEngine;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -76,7 +77,8 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync {
 	private final String restAddress;
 	private final String restBindAddress;
 	private final int restBindPort;
-	private final SSLEngine sslEngine;
+	@Nullable
+	private final SSLEngineFactory sslEngineFactory;
 	private final int maxContentLength;
 
 	protected final Path uploadDir;
@@ -96,7 +98,7 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync {
 		this.restAddress = configuration.getRestAddress();
 		this.restBindAddress = configuration.getRestBindAddress();
 		this.restBindPort = configuration.getRestBindPort();
-		this.sslEngine = configuration.getSslEngine();
+		this.sslEngineFactory = configuration.getSslEngineFactory();
 
 		this.uploadDir = configuration.getUploadDir();
 		createUploadDir(uploadDir, log);
@@ -155,14 +157,15 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync {
 					Handler handler = new RouterHandler(router, responseHeaders);
 
 					// SSL should be the first handler in the pipeline
-					if (sslEngine != null) {
-						ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
+					if (sslEngineFactory != null) {
+						ch.pipeline().addLast("ssl", new SslHandler(sslEngineFactory.createSSLEngine()));
 					}
 
 					ch.pipeline()
 						.addLast(new HttpServerCodec())
 						.addLast(new FileUploadHandler(uploadDir))
 						.addLast(new FlinkHttpObjectAggregator(maxContentLength, responseHeaders))
+						.addLast(new ChunkedWriteHandler())
 						.addLast(handler.name(), handler)
 						.addLast(new PipelineErrorHandler(log, responseHeaders));
 				}
@@ -198,7 +201,7 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync {
 
 			final String protocol;
 
-			if (sslEngine != null) {
+			if (sslEngineFactory != null) {
 				protocol = "https://";
 			} else {
 				protocol = "http://";

http://git-wip-us.apache.org/repos/asf/flink/blob/a357bede/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
index 542a937..875230f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.net.SSLEngineFactory;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.Preconditions;
@@ -29,7 +30,6 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
 
 import javax.annotation.Nullable;
-import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
 import java.nio.file.Path;
@@ -52,7 +52,7 @@ public final class RestServerEndpointConfiguration {
 	private final int restBindPort;
 
 	@Nullable
-	private final SSLEngine sslEngine;
+	private final SSLEngineFactory sslEngineFactory;
 
 	private final Path uploadDir;
 
@@ -64,9 +64,10 @@ public final class RestServerEndpointConfiguration {
 			final String restAddress,
 			@Nullable String restBindAddress,
 			int restBindPort,
-			@Nullable SSLEngine sslEngine,
+			@Nullable SSLEngineFactory sslEngineFactory,
 			final Path uploadDir,
-			final int maxContentLength, final Map<String, String> responseHeaders) {
+			final int maxContentLength,
+			final Map<String, String> responseHeaders) {
 
 		Preconditions.checkArgument(0 <= restBindPort && restBindPort < 65536, "The bing rest port " + restBindPort + " is out of range (0, 65536[");
 		Preconditions.checkArgument(maxContentLength > 0, "maxContentLength must be positive, was: %d", maxContentLength);
@@ -74,7 +75,7 @@ public final class RestServerEndpointConfiguration {
 		this.restAddress = requireNonNull(restAddress);
 		this.restBindAddress = restBindAddress;
 		this.restBindPort = restBindPort;
-		this.sslEngine = sslEngine;
+		this.sslEngineFactory = sslEngineFactory;
 		this.uploadDir = requireNonNull(uploadDir);
 		this.maxContentLength = maxContentLength;
 		this.responseHeaders = Collections.unmodifiableMap(requireNonNull(responseHeaders));
@@ -110,8 +111,9 @@ public final class RestServerEndpointConfiguration {
 	 *
 	 * @return SSLEngine that the REST server endpoint should use, or null if SSL was disabled
 	 */
-	public SSLEngine getSslEngine() {
-		return sslEngine;
+	@Nullable
+	public SSLEngineFactory getSslEngineFactory() {
+		return sslEngineFactory;
 	}
 
 	/**
@@ -154,19 +156,16 @@ public final class RestServerEndpointConfiguration {
 		final String restBindAddress = config.getString(RestOptions.BIND_ADDRESS);
 		final int port = config.getInteger(RestOptions.PORT);
 
-		SSLEngine sslEngine = null;
-		final boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
+		final SSLEngineFactory sslEngineFactory;
+		final boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED) && config.getBoolean(WebOptions.SSL_ENABLED);
 		if (enableSSL) {
 			try {
-				SSLContext sslContext = SSLUtils.createSSLServerContext(config);
-				if (sslContext != null) {
-					sslEngine = sslContext.createSSLEngine();
-					SSLUtils.setSSLVerAndCipherSuites(sslEngine, config);
-					sslEngine.setUseClientMode(false);
-				}
+				sslEngineFactory = SSLUtils.createServerSSLEngineFactory(config);
 			} catch (Exception e) {
-				throw new ConfigurationException("Failed to initialize SSLContext for REST server endpoint.", e);
+				throw new ConfigurationException("Failed to initialize SSLEngineFactory for REST server endpoint.", e);
 			}
+		} else {
+			sslEngineFactory = null;
 		}
 
 		final Path uploadDir = Paths.get(
@@ -183,7 +182,7 @@ public final class RestServerEndpointConfiguration {
 			restAddress,
 			restBindAddress,
 			port,
-			sslEngine,
+			sslEngineFactory,
 			uploadDir,
 			maxContentLength,
 			responseHeaders);

http://git-wip-us.apache.org/repos/asf/flink/blob/a357bede/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
index 87d0ccc..1bf3173 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
@@ -29,6 +29,11 @@ import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLServerSocket;
 
 import java.net.ServerSocket;
+import java.util.Arrays;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
 
 /**
  * Tests for the {@link SSLUtils}.
@@ -227,4 +232,27 @@ public class SSLUtilsTest {
 		Assert.assertTrue(algorithms[1].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA") || algorithms[1].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA256"));
 	}
 
+	/**
+	 * Tests that {@link SSLEngineFactory} is created correctly.
+	 */
+	@Test
+	public void testCreateSSLEngineFactory() throws Exception {
+		Configuration serverConfig = new Configuration();
+		serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
+		serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore");
+		serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
+		serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
+		serverConfig.setString(SecurityOptions.SSL_PROTOCOL, "TLSv1");
+		serverConfig.setString(SecurityOptions.SSL_ALGORITHMS, "TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256");
+
+		final SSLEngineFactory serverSSLEngineFactory = SSLUtils.createServerSSLEngineFactory(serverConfig);
+		final SSLEngine sslEngine = serverSSLEngineFactory.createSSLEngine();
+
+		assertThat(
+			Arrays.asList(sslEngine.getEnabledProtocols()),
+			contains("TLSv1"));
+		assertThat(
+			Arrays.asList(sslEngine.getEnabledCipherSuites()),
+			containsInAnyOrder("TLS_DHE_RSA_WITH_AES_128_CBC_SHA", "TLS_DHE_RSA_WITH_AES_128_CBC_SHA256"));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a357bede/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 09e36de..377de08 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -23,11 +23,15 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
+import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
 import org.apache.flink.runtime.rest.messages.ConversionException;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -45,7 +49,6 @@ import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.testutils.category.New;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -54,15 +57,20 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import org.apache.commons.io.IOUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import javax.annotation.Nonnull;
+import javax.net.ssl.SSLContext;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
@@ -77,10 +85,10 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
+import static java.util.Objects.requireNonNull;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
@@ -95,6 +103,7 @@ import static org.mockito.Mockito.when;
  * IT cases for {@link RestClient} and {@link RestServerEndpoint}.
  */
 @Category(New.class)
+@RunWith(Parameterized.class)
 public class RestServerEndpointITCase extends TestLogger {
 
 	private static final JobID PATH_JOB_ID = new JobID();
@@ -111,14 +120,56 @@ public class RestServerEndpointITCase extends TestLogger {
 	private TestUploadHandler testUploadHandler;
 	private InetSocketAddress serverAddress;
 
-	@Before
-	public void setup() throws Exception {
-		Configuration config = new Configuration();
+	private final Configuration config;
+	private SSLContext defaultSSLContext;
+
+	public RestServerEndpointITCase(final Configuration config) {
+		this.config = requireNonNull(config);
+	}
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> data() {
+		final Configuration config = getBaseConfig();
+
+		final ClassLoader classLoader = ClassLoader.getSystemClassLoader();
+		final String truststorePath = new File(classLoader
+			.getResource("local127.truststore")
+			.getFile()).getAbsolutePath();
+		final String keystorePath = new File(classLoader
+			.getResource("local127.keystore")
+			.getFile()).getAbsolutePath();
+
+		final Configuration sslConfig = new Configuration(config);
+		sslConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
+		sslConfig.setString(SecurityOptions.SSL_TRUSTSTORE, truststorePath);
+		sslConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password");
+		sslConfig.setString(SecurityOptions.SSL_KEYSTORE, keystorePath);
+		sslConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
+		sslConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
+
+		return Arrays.asList(new Object[][]{
+			{config}, {sslConfig}
+		});
+	}
+
+	private static Configuration getBaseConfig() {
+		final Configuration config = new Configuration();
 		config.setInteger(RestOptions.PORT, 0);
 		config.setString(RestOptions.ADDRESS, "localhost");
-		config.setString(WebOptions.UPLOAD_DIR, temporaryFolder.newFolder().getCanonicalPath());
 		config.setInteger(RestOptions.SERVER_MAX_CONTENT_LENGTH, TEST_REST_MAX_CONTENT_LENGTH);
 		config.setInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH, TEST_REST_MAX_CONTENT_LENGTH);
+		return config;
+	}
+
+	@Before
+	public void setup() throws Exception {
+		config.setString(WebOptions.UPLOAD_DIR, temporaryFolder.newFolder().getCanonicalPath());
+
+		defaultSSLContext = SSLContext.getDefault();
+		final SSLContext sslClientContext = SSLUtils.createSSLClientContext(config);
+		if (sslClientContext != null) {
+			SSLContext.setDefault(sslClientContext);
+		}
 
 		RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
 		RestClientConfiguration clientConfig = RestClientConfiguration.fromConfiguration(config);
@@ -126,8 +177,9 @@ public class RestServerEndpointITCase extends TestLogger {
 		final String restAddress = "http://localhost:1234";
 		RestfulGateway mockRestfulGateway = mock(RestfulGateway.class);
 		when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
-		GatewayRetriever<RestfulGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
-		when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway));
+
+		final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () ->
+			CompletableFuture.completedFuture(mockRestfulGateway);
 
 		TestHandler testHandler = new TestHandler(
 			CompletableFuture.completedFuture(restAddress),
@@ -139,7 +191,18 @@ public class RestServerEndpointITCase extends TestLogger {
 			mockGatewayRetriever,
 			RpcUtils.INF_TIMEOUT);
 
-		serverEndpoint = new TestRestServerEndpoint(serverConfig, testHandler, testUploadHandler);
+		final StaticFileServerHandler<RestfulGateway> staticFileServerHandler = new StaticFileServerHandler<>(
+			mockGatewayRetriever,
+			CompletableFuture.completedFuture(restAddress),
+			RpcUtils.INF_TIMEOUT,
+			temporaryFolder.getRoot());
+
+		final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = Arrays.asList(
+			Tuple2.of(new TestHeaders(), testHandler),
+			Tuple2.of(TestUploadHeaders.INSTANCE, testUploadHandler),
+			Tuple2.of(WebContentHandlerSpecification.getInstance(), staticFileServerHandler));
+
+		serverEndpoint = new TestRestServerEndpoint(serverConfig, handlers);
 		restClient = new TestRestClient(clientConfig);
 
 		serverEndpoint.start();
@@ -148,6 +211,8 @@ public class RestServerEndpointITCase extends TestLogger {
 
 	@After
 	public void teardown() throws Exception {
+		SSLContext.setDefault(defaultSSLContext);
+
 		if (restClient != null) {
 			restClient.shutdown(timeout);
 			restClient = null;
@@ -334,6 +399,22 @@ public class RestServerEndpointITCase extends TestLogger {
 		assertEquals(400, connection.getResponseCode());
 	}
 
+	/**
+	 * Tests that files can be served with the {@link StaticFileServerHandler}.
+	 */
+	@Test
+	public void testStaticFileServerHandler() throws Exception {
+		final File file = temporaryFolder.newFile();
+		Files.write(file.toPath(), Collections.singletonList("foobar"));
+
+		final URL url = new URL(serverEndpoint.getRestBaseUrl() + "/" + file.getName());
+		final HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+		connection.setRequestMethod("GET");
+		final String fileContents = IOUtils.toString(connection.getInputStream());
+
+		assertEquals("foobar", fileContents.trim());
+	}
+
 	private HttpURLConnection openHttpConnectionForUpload(final String boundary) throws IOException {
 		final HttpURLConnection connection =
 			(HttpURLConnection) new URL(serverEndpoint.getRestBaseUrl() + "/upload").openConnection();
@@ -356,29 +437,22 @@ public class RestServerEndpointITCase extends TestLogger {
 
 	private static class TestRestServerEndpoint extends RestServerEndpoint {
 
-		private final TestHandler testHandler;
-
-		private final TestUploadHandler testUploadHandler;
+		private final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers;
 
 		TestRestServerEndpoint(
-			RestServerEndpointConfiguration configuration,
-			TestHandler testHandler,
-			TestUploadHandler testUploadHandler) throws IOException {
+				RestServerEndpointConfiguration configuration,
+				List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers) throws IOException {
 			super(configuration);
-
-			this.testHandler = Preconditions.checkNotNull(testHandler);
-			this.testUploadHandler = Preconditions.checkNotNull(testUploadHandler);
+			this.handlers = requireNonNull(handlers);
 		}
 
 		@Override
 		protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
-			return Arrays.asList(
-				Tuple2.of(new TestHeaders(), testHandler),
-				Tuple2.of(TestUploadHeaders.INSTANCE, testUploadHandler));
+			return handlers;
 		}
 
 		@Override
-		protected void startInternal() throws Exception {}
+		protected void startInternal() {}
 	}
 
 	private static class TestHandler extends AbstractRestHandler<RestfulGateway, TestRequest, TestResponse, TestParameters> {


[2/2] flink git commit: [hotfix] [docs] Use markdown hyperlink instead of writing out the URL.

Posted by se...@apache.org.
[hotfix] [docs] Use markdown hyperlink instead of writing out the URL.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/759555c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/759555c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/759555c0

Branch: refs/heads/release-1.5
Commit: 759555c071a2cdfa0f7fca39bc281df97a59e9fa
Parents: a357bed
Author: gyao <ga...@data-artisans.com>
Authored: Wed May 9 13:41:29 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 10 00:28:32 2018 +0200

----------------------------------------------------------------------
 docs/ops/security-ssl.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/759555c0/docs/ops/security-ssl.md
----------------------------------------------------------------------
diff --git a/docs/ops/security-ssl.md b/docs/ops/security-ssl.md
index ceb641b..f43d214 100644
--- a/docs/ops/security-ssl.md
+++ b/docs/ops/security-ssl.md
@@ -35,7 +35,7 @@ SSL can be enabled for all network communication between flink components. SSL k
 
 ## Deploying Keystores and Truststores
 
-You need to have a Java Keystore generated and copied to each node in the flink cluster. The common name or subject alternative names in the certificate should match the node's hostname and IP address. Keystores and truststores can be generated using the keytool utility (https://docs.oracle.com/javase/8/docs/technotes/tools/unix/keytool.html). All flink components should have read access to the keystore and truststore files.
+You need to have a Java Keystore generated and copied to each node in the flink cluster. The common name or subject alternative names in the certificate should match the node's hostname and IP address. Keystores and truststores can be generated using the [keytool utility](https://docs.oracle.com/javase/8/docs/technotes/tools/unix/keytool.html). All flink components should have read access to the keystore and truststore files.
 
 ### Example: Creating self signed CA and keystores for a 2 node cluster