You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/02 21:28:34 UTC

[GitHub] [flink-statefun] FilKarnicki opened a new pull request #306: [FLINK-25866][statefun] Support additional TLS configuration

FilKarnicki opened a new pull request #306:
URL: https://github.com/apache/flink-statefun/pull/306


   ### What is the purpose of the change
   
   The goal of this PR is to add mutual TLS capabilities to flink-statefun. In Master, currently only server-side TLS is allowed, meaning the flink-statefun job can assert whether the remote service it's talking to is who it says it is, but not the other way around. Furthermore, only the default java certificate authority truststore can currently be used.
   
   This change allows users to specify additional settings in module.yaml's function spec:
   ```yaml
   spec:
     transport:
       type: io.statefun.transports.v1/async
       #(...)
       trust_cacerts: ~/trustedCAs.pem
       client_cert: classpath:clientPublic.crt
       client_key: ~/clientPrivate.key
       client_key_password: changeme
   ```
   Certs/keys get loaded with `ResourceLocator` and used in Netty's `SslContext`
   
   ### Main changes are:
   
   - Added a test for `NettyClient`
   - Made slight changes to `HttpConnectionPoolManager` in order to catch failure case responses without them being automatically retried
   - Included a bunch of certificates/keys for testing (with README.md files explaining how the certs were created)
   - Replaced undertow in `statefun-smoke-e2e-java` with `netty` in order to be able to use the same cert/key loading mechanisms as the netty client in `statefun-flink-core`
   
   ### Verifying this change
   `TransportClientTest` launches a netty service returning a stub response. There are three endpoints: http, https with required mutual TLS and https with server-side TLS only (for verifying existing, default jre truststore usecases)
   
   `NettyClientTest` contains a number of tests used to verify the correct behaviour
   
   `statefun-smoke-e2e-java` was updated to use mutual TLS
   
   Dependencies (does it add or upgrade a dependency): no
   The public API, i.e., is any changed class annotated with @Public(Evolving): N/A
   The serializers: no
   The runtime per-record code paths (performance sensitive): unsure - TLS was already available, so if anyone is already using TLS, they are already experiencing some performance hit
   Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
   The S3 file system connector: no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] FilKarnicki commented on a change in pull request #306: [FLINK-25866][statefun] Support additional TLS configuration

Posted by GitBox <gi...@apache.org>.
FilKarnicki commented on a change in pull request #306:
URL: https://github.com/apache/flink-statefun/pull/306#discussion_r818290846



##########
File path: statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/remote-module/module.yaml
##########
@@ -16,5 +16,11 @@
 kind: io.statefun.endpoints.v2/http
 spec:
   functions: statefun.smoke.e2e/command-interpreter-fn
-  urlPathTemplate: http://remote-function-host:8000
-  maxNumBatchRequests: 10000
\ No newline at end of file
+  urlPathTemplate: https://remote-function-host:8000
+  maxNumBatchRequests: 10000
+  transport:
+    type: io.statefun.transports.v1/async
+    trust_cacerts: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_ca.pem
+    client_cert: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_client.crt
+    client_key: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_client.key.p8
+    client_key_password: test

Review comment:
       I see, I'll make a change. I'm wondering if there's a way to read a pwd from a file while setting up TLS in Kafka. I'll have a poke around later :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] FilKarnicki commented on a change in pull request #306: [FLINK-25866][statefun] Support additional TLS configuration

Posted by GitBox <gi...@apache.org>.
FilKarnicki commented on a change in pull request #306:
URL: https://github.com/apache/flink-statefun/pull/306#discussion_r818294287



##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java
##########
@@ -208,4 +225,77 @@ private void releaseChannel0(Channel channel) {
     }
     channel.close().addListener(ignored -> pool.release(channel));
   }
+
+  private static SslContext getSslContext(NettyRequestReplySpec spec) {
+    Optional<String> maybeTrustCaCerts = spec.getTrustedCaCertsOptional();
+    Optional<String> maybeClientCerts = spec.getClientCertsOptional();
+    Optional<String> maybeClientKey = spec.getClientKeyOptional();
+    Optional<String> maybeKeyPassword = spec.getClientKeyPasswordOptional();
+
+    if (maybeClientCerts.isPresent() && !maybeClientKey.isPresent()) {
+      throw new IllegalStateException(
+          "You provided a client cert, but not a client key. Cannot continue.");
+    }
+    if (maybeClientKey.isPresent() && !maybeClientCerts.isPresent()) {

Review comment:
       My intention was to write a kind of an XOR gate here. No cert/key is fine, both cert+key is fine, but not when there's one and not the other. Let me think if I can make this better




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] igalshilman commented on a change in pull request #306: [FLINK-25866][statefun] Support additional TLS configuration

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #306:
URL: https://github.com/apache/flink-statefun/pull/306#discussion_r818262446



##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplySpec.java
##########
@@ -76,14 +92,30 @@ public NettyRequestReplySpec(
             ofNullable(connectTimeout),
             () -> DEFAULT_CONNECT_TIMEOUT);
     this.pooledConnectionTTL =
-        ofNullable(pooledConnectionTTL).orElseGet(() -> DEFAULT_POOLED_CONNECTION_TTL);
+        ofNullable(pooledConnectionTTL).orElse(DEFAULT_POOLED_CONNECTION_TTL);
     this.connectionPoolMaxSize =
         ofNullable(connectionPoolMaxSize).orElse(DEFAULT_CONNECTION_POOL_MAX_SIZE);
     this.maxRequestOrResponseSizeInBytes =
         ofNullable(maxRequestOrResponseSizeInBytes)
             .orElse(DEFAULT_MAX_REQUEST_OR_RESPONSE_SIZE_IN_BYTES);
   }
 
+  public Optional<String> getTrustedCaCertsOptional() {

Review comment:
       I think that the `Optional` suffix is redundant, since it is clear from the return type.

##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java
##########
@@ -208,4 +225,77 @@ private void releaseChannel0(Channel channel) {
     }
     channel.close().addListener(ignored -> pool.release(channel));
   }
+
+  private static SslContext getSslContext(NettyRequestReplySpec spec) {
+    Optional<String> maybeTrustCaCerts = spec.getTrustedCaCertsOptional();
+    Optional<String> maybeClientCerts = spec.getClientCertsOptional();
+    Optional<String> maybeClientKey = spec.getClientKeyOptional();
+    Optional<String> maybeKeyPassword = spec.getClientKeyPasswordOptional();
+
+    if (maybeClientCerts.isPresent() && !maybeClientKey.isPresent()) {
+      throw new IllegalStateException(
+          "You provided a client cert, but not a client key. Cannot continue.");
+    }
+    if (maybeClientKey.isPresent() && !maybeClientCerts.isPresent()) {

Review comment:
       This seems to me as a duplicate condition as the condition above.

##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java
##########
@@ -208,4 +225,77 @@ private void releaseChannel0(Channel channel) {
     }
     channel.close().addListener(ignored -> pool.release(channel));
   }
+
+  private static SslContext getSslContext(NettyRequestReplySpec spec) {
+    Optional<String> maybeTrustCaCerts = spec.getTrustedCaCertsOptional();
+    Optional<String> maybeClientCerts = spec.getClientCertsOptional();
+    Optional<String> maybeClientKey = spec.getClientKeyOptional();
+    Optional<String> maybeKeyPassword = spec.getClientKeyPasswordOptional();
+
+    if (maybeClientCerts.isPresent() && !maybeClientKey.isPresent()) {
+      throw new IllegalStateException(
+          "You provided a client cert, but not a client key. Cannot continue.");
+    }
+    if (maybeClientKey.isPresent() && !maybeClientCerts.isPresent()) {
+      throw new IllegalStateException(
+          "You provided a client key, but not a client cert. Cannot continue.");
+    }
+
+    Optional<InputStream> maybeTrustCaCertsInputStream =
+        maybeTrustCaCerts.map(
+            trustedCaCertsLocation ->
+                openStreamIfExistsOrThrow(
+                    ResourceLocator.findNamedResource(trustedCaCertsLocation)));
+
+    Optional<InputStream> maybeCertInputStream =
+        maybeClientCerts.map(
+            clientCertLocation ->
+                openStreamIfExistsOrThrow(ResourceLocator.findNamedResource(clientCertLocation)));
+
+    Optional<InputStream> maybeKeyInputStream =
+        maybeClientKey.map(
+            clientKeyLocation ->
+                openStreamIfExistsOrThrow(ResourceLocator.findNamedResource(clientKeyLocation)));
+
+    SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
+    maybeTrustCaCertsInputStream.ifPresent(sslContextBuilder::trustManager);
+    maybeCertInputStream.ifPresent(
+        certInputStream ->
+            maybeKeyInputStream.ifPresent(

Review comment:
       What do you think about strongly indicating that `maybeKeyInputStream` is mandatory, by calling `get` on It (and suppress a warning)?

##########
File path: statefun-e2e-tests/statefun-smoke-e2e-java/src/main/java/org/apache/flink/statefun/e2e/smoke/java/CommandInterpreterAppServer.java
##########
@@ -20,33 +20,120 @@
 
 import static org.apache.flink.statefun.e2e.smoke.java.Constants.CMD_INTERPRETER_FN;
 
-import io.undertow.Undertow;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.*;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.*;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.ClientAuth;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContextBuilder;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslProvider;
 import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
 import org.apache.flink.statefun.sdk.java.StatefulFunctions;
-import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.Slices;
 
 public class CommandInterpreterAppServer {

Review comment:
       wow, well done! 

##########
File path: statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/remote-module/module.yaml
##########
@@ -16,5 +16,11 @@
 kind: io.statefun.endpoints.v2/http
 spec:
   functions: statefun.smoke.e2e/command-interpreter-fn
-  urlPathTemplate: http://remote-function-host:8000
-  maxNumBatchRequests: 10000
\ No newline at end of file
+  urlPathTemplate: https://remote-function-host:8000
+  maxNumBatchRequests: 10000
+  transport:
+    type: io.statefun.transports.v1/async
+    trust_cacerts: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_ca.pem
+    client_cert: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_client.crt
+    client_key: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_client.key.p8
+    client_key_password: test

Review comment:
       While I understand the need for a `client_key_password` to be present, it is a bit problematic, as it puts sensitive information into these yaml files.
   How about we add an additional `client_key_password_file` property, and let people the option to reference that from a, say, k8s secret mounted?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] igalshilman commented on a change in pull request #306: [FLINK-25866][statefun] Support additional TLS configuration

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #306:
URL: https://github.com/apache/flink-statefun/pull/306#discussion_r818359939



##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java
##########
@@ -208,4 +225,77 @@ private void releaseChannel0(Channel channel) {
     }
     channel.close().addListener(ignored -> pool.release(channel));
   }
+
+  private static SslContext getSslContext(NettyRequestReplySpec spec) {
+    Optional<String> maybeTrustCaCerts = spec.getTrustedCaCertsOptional();
+    Optional<String> maybeClientCerts = spec.getClientCertsOptional();
+    Optional<String> maybeClientKey = spec.getClientKeyOptional();
+    Optional<String> maybeKeyPassword = spec.getClientKeyPasswordOptional();
+
+    if (maybeClientCerts.isPresent() && !maybeClientKey.isPresent()) {
+      throw new IllegalStateException(
+          "You provided a client cert, but not a client key. Cannot continue.");
+    }
+    if (maybeClientKey.isPresent() && !maybeClientCerts.isPresent()) {

Review comment:
       Oh, I see that make sense 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] FilKarnicki commented on pull request #306: [FLINK-25866][statefun] Support additional TLS configuration

Posted by GitBox <gi...@apache.org>.
FilKarnicki commented on pull request #306:
URL: https://github.com/apache/flink-statefun/pull/306#issuecomment-1059726797


   hi @igalshilman I believe I addressed your concerns. Please let me know if there's anything else. Thanks! Fil


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] igalshilman merged pull request #306: [FLINK-25866][statefun] Support additional TLS configuration

Posted by GitBox <gi...@apache.org>.
igalshilman merged pull request #306:
URL: https://github.com/apache/flink-statefun/pull/306


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] igalshilman commented on a change in pull request #306: [FLINK-25866][statefun] Support additional TLS configuration

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #306:
URL: https://github.com/apache/flink-statefun/pull/306#discussion_r818360880



##########
File path: statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/remote-module/module.yaml
##########
@@ -16,5 +16,11 @@
 kind: io.statefun.endpoints.v2/http
 spec:
   functions: statefun.smoke.e2e/command-interpreter-fn
-  urlPathTemplate: http://remote-function-host:8000
-  maxNumBatchRequests: 10000
\ No newline at end of file
+  urlPathTemplate: https://remote-function-host:8000
+  maxNumBatchRequests: 10000
+  transport:
+    type: io.statefun.transports.v1/async
+    trust_cacerts: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_ca.pem
+    client_cert: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_client.crt
+    client_key: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_client.key.p8
+    client_key_password: test

Review comment:
       I don't think I understand. If you can specify it in a `module.yaml` as a utf8 string, then it should be possible to read the exact same string from a file, no?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org