You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hx...@apache.org on 2022/11/30 09:39:27 UTC

[flink] branch release-1.16 updated (73dfd61858b -> 519aeb2d190)

This is an automated email from the ASF dual-hosted git repository.

hxb pushed a change to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 73dfd61858b [FLINK-29092][Connectors/Test] Use one bucket assigner
     new 05c64f1758c [FLINK-29155][python] Port Beam ServerFactory class to flink-python module
     new 6d66200d572 [FLINK-29155][python] Change default KEEP_ALIVE_TIME_SEC to 19
     new 519aeb2d190 [FLINK-29155][python] Adjust the default value of bundle size and arrow batch size

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../shortcodes/generated/python_configuration.html |   4 +-
 .../apache/beam/sdk/fn/server/ServerFactory.java   | 303 +++++++++++++++++++++
 .../org/apache/flink/python/PythonOptions.java     |   4 +-
 3 files changed, 307 insertions(+), 4 deletions(-)
 create mode 100644 flink-python/src/main/java/org/apache/beam/sdk/fn/server/ServerFactory.java


[flink] 02/03: [FLINK-29155][python] Change default KEEP_ALIVE_TIME_SEC to 19

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6d66200d5722aa5f33a32de801259eb16f095d15
Author: huangxingbo <hx...@apache.org>
AuthorDate: Tue Nov 22 14:55:46 2022 +0800

    [FLINK-29155][python] Change default KEEP_ALIVE_TIME_SEC to 19
    
    The BDP ping period is locally-decided and the keep alive time is 20 seconds in client side,
    so we choose the server to allow pings every 19 seconds.
    
    This closes #21363.
---
 .../main/java/org/apache/beam/sdk/fn/server/ServerFactory.java   | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git a/flink-python/src/main/java/org/apache/beam/sdk/fn/server/ServerFactory.java b/flink-python/src/main/java/org/apache/beam/sdk/fn/server/ServerFactory.java
index f2566321b78..db55b8cbcd9 100644
--- a/flink-python/src/main/java/org/apache/beam/sdk/fn/server/ServerFactory.java
+++ b/flink-python/src/main/java/org/apache/beam/sdk/fn/server/ServerFactory.java
@@ -42,10 +42,17 @@ import java.util.function.Supplier;
 
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 
+// This class is copied from Beam's org.apache.beam.sdk.fn.server.ServerFactory,
+// can be removed after https://github.com/apache/beam/issues/21598 is fixed.
+//
+// Changed lines: 53~55
+
 /** A {@link Server gRPC server} factory. */
 public abstract class ServerFactory {
 
-    private static final int KEEP_ALIVE_TIME_SEC = 20;
+    // The BDP ping period is locally-decided and the keep alive time is 20 seconds in client
+    // side, so we choose the server to allow pings every 19 seconds.
+    private static final int KEEP_ALIVE_TIME_SEC = 19;
 
     /** Create a default {@link InetSocketAddressServerFactory}. */
     public static ServerFactory createDefault() {


[flink] 01/03: [FLINK-29155][python] Port Beam ServerFactory class to flink-python module

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 05c64f1758c94291261a1edbe8f10a37b6b11fec
Author: huangxingbo <hx...@apache.org>
AuthorDate: Tue Nov 22 14:52:02 2022 +0800

    [FLINK-29155][python] Port Beam ServerFactory class to flink-python module
    
    This closes #21363.
---
 .../apache/beam/sdk/fn/server/ServerFactory.java   | 296 +++++++++++++++++++++
 1 file changed, 296 insertions(+)

diff --git a/flink-python/src/main/java/org/apache/beam/sdk/fn/server/ServerFactory.java b/flink-python/src/main/java/org/apache/beam/sdk/fn/server/ServerFactory.java
new file mode 100644
index 00000000000..f2566321b78
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/beam/sdk/fn/server/ServerFactory.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.server;
+
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.sdk.fn.channel.SocketAddressFactory;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.BindableService;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Server;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ServerBuilder;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ServerInterceptors;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.netty.NettyServerBuilder;
+import org.apache.beam.vendor.grpc.v1p43p2.io.netty.channel.epoll.EpollEventLoopGroup;
+import org.apache.beam.vendor.grpc.v1p43p2.io.netty.channel.epoll.EpollServerDomainSocketChannel;
+import org.apache.beam.vendor.grpc.v1p43p2.io.netty.channel.epoll.EpollServerSocketChannel;
+import org.apache.beam.vendor.grpc.v1p43p2.io.netty.channel.unix.DomainSocketAddress;
+import org.apache.beam.vendor.grpc.v1p43p2.io.netty.util.internal.ThreadLocalRandom;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.net.HostAndPort;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+/** A {@link Server gRPC server} factory. */
+public abstract class ServerFactory {
+
+    private static final int KEEP_ALIVE_TIME_SEC = 20;
+
+    /** Create a default {@link InetSocketAddressServerFactory}. */
+    public static ServerFactory createDefault() {
+        return new InetSocketAddressServerFactory(UrlFactory.createDefault());
+    }
+
+    /** Create a {@link InetSocketAddressServerFactory} that uses the given url factory. */
+    public static ServerFactory createWithUrlFactory(UrlFactory urlFactory) {
+        return new InetSocketAddressServerFactory(urlFactory);
+    }
+
+    /** Create a {@link InetSocketAddressServerFactory} that uses ports from a supplier. */
+    public static ServerFactory createWithPortSupplier(Supplier<Integer> portSupplier) {
+        return new InetSocketAddressServerFactory(UrlFactory.createDefault(), portSupplier);
+    }
+
+    /**
+     * Create a {@link InetSocketAddressServerFactory} that uses the given url factory and ports
+     * from a supplier.
+     */
+    public static ServerFactory createWithUrlFactoryAndPortSupplier(
+            UrlFactory urlFactory, Supplier<Integer> portSupplier) {
+        return new InetSocketAddressServerFactory(urlFactory, portSupplier);
+    }
+
+    /** Create a {@link EpollSocket}. */
+    public static ServerFactory createEpollSocket() {
+        return new EpollSocket();
+    }
+
+    /** Create a {@link EpollDomainSocket}. */
+    public static ServerFactory createEpollDomainSocket() {
+        return new EpollDomainSocket();
+    }
+
+    /**
+     * Creates an instance of this server using an ephemeral address. The allocation of the address
+     * is server type dependent, which means the address may be a port for certain type of server,
+     * or a file path for other certain types. The chosen address is accessible to the caller from
+     * the URL set in the input {@link Endpoints.ApiServiceDescriptor.Builder}. Server applies
+     * {@link GrpcContextHeaderAccessorProvider#interceptor()} to all incoming requests.
+     */
+    public abstract Server allocateAddressAndCreate(
+            List<BindableService> services, Endpoints.ApiServiceDescriptor.Builder builder)
+            throws IOException;
+
+    /**
+     * Creates an instance of this server at the address specified by the given service descriptor
+     * and bound to multiple services. Server applies {@link
+     * GrpcContextHeaderAccessorProvider#interceptor()} to all incoming requests.
+     */
+    public abstract Server create(
+            List<BindableService> services, Endpoints.ApiServiceDescriptor serviceDescriptor)
+            throws IOException;
+    /**
+     * Creates a {@link Server gRPC Server} using the default server factory.
+     *
+     * <p>The server is created listening any open port on "localhost".
+     */
+    public static class InetSocketAddressServerFactory extends ServerFactory {
+        private final UrlFactory urlFactory;
+        private final Supplier<Integer> portSupplier;
+
+        private InetSocketAddressServerFactory(UrlFactory urlFactory) {
+            this(urlFactory, () -> 0);
+        }
+
+        private InetSocketAddressServerFactory(
+                UrlFactory urlFactory, Supplier<Integer> portSupplier) {
+            this.urlFactory = urlFactory;
+            this.portSupplier = portSupplier;
+        }
+
+        @Override
+        public Server allocateAddressAndCreate(
+                List<BindableService> services,
+                Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor)
+                throws IOException {
+            InetSocketAddress address =
+                    new InetSocketAddress(InetAddress.getLoopbackAddress(), portSupplier.get());
+            Server server = createServer(services, address);
+            apiServiceDescriptor.setUrl(
+                    urlFactory.createUrl(address.getHostName(), server.getPort()));
+            return server;
+        }
+
+        @Override
+        public Server create(
+                List<BindableService> services, Endpoints.ApiServiceDescriptor serviceDescriptor)
+                throws IOException {
+            SocketAddress socketAddress =
+                    SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
+            checkArgument(
+                    socketAddress instanceof InetSocketAddress,
+                    "%s %s requires a host:port socket address, got %s",
+                    getClass().getSimpleName(),
+                    ServerFactory.class.getSimpleName(),
+                    serviceDescriptor.getUrl());
+            return createServer(services, (InetSocketAddress) socketAddress);
+        }
+
+        private static Server createServer(List<BindableService> services, InetSocketAddress socket)
+                throws IOException {
+            NettyServerBuilder builder =
+                    NettyServerBuilder.forPort(socket.getPort())
+                            // Set the message size to max value here. The actual size is governed
+                            // by the
+                            // buffer size in the layers above.
+                            .maxMessageSize(Integer.MAX_VALUE)
+                            .permitKeepAliveTime(KEEP_ALIVE_TIME_SEC, TimeUnit.SECONDS);
+            services.stream()
+                    .forEach(
+                            service ->
+                                    builder.addService(
+                                            ServerInterceptors.intercept(
+                                                    service,
+                                                    GrpcContextHeaderAccessorProvider
+                                                            .interceptor())));
+            return builder.build().start();
+        }
+    }
+
+    /**
+     * Creates a {@link Server gRPC Server} using a Unix domain socket. Note that this requires <a
+     * href="http://netty.io/wiki/forked-tomcat-native.html">Netty TcNative</a> available to be able
+     * to provide a {@link EpollServerDomainSocketChannel}.
+     *
+     * <p>The unix domain socket is located at ${java.io.tmpdir}/fnapi${random[0-10000)}.sock
+     */
+    private static class EpollDomainSocket extends ServerFactory {
+        private static File chooseRandomTmpFile(int port) {
+            return new File(
+                    System.getProperty("java.io.tmpdir"), String.format("fnapi%d.sock", port));
+        }
+
+        @Override
+        public Server allocateAddressAndCreate(
+                List<BindableService> services,
+                Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor)
+                throws IOException {
+            File tmp;
+            do {
+                tmp = chooseRandomTmpFile(ThreadLocalRandom.current().nextInt(10000));
+            } while (tmp.exists());
+            apiServiceDescriptor.setUrl("unix://" + tmp.getAbsolutePath());
+            return create(services, apiServiceDescriptor.build());
+        }
+
+        @Override
+        public Server create(
+                List<BindableService> services, Endpoints.ApiServiceDescriptor serviceDescriptor)
+                throws IOException {
+            SocketAddress socketAddress =
+                    SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
+            checkArgument(
+                    socketAddress instanceof DomainSocketAddress,
+                    "%s requires a Unix domain socket address, got %s",
+                    EpollDomainSocket.class.getSimpleName(),
+                    serviceDescriptor.getUrl());
+            return createServer(services, (DomainSocketAddress) socketAddress);
+        }
+
+        private static Server createServer(
+                List<BindableService> services, DomainSocketAddress domainSocket)
+                throws IOException {
+            NettyServerBuilder builder =
+                    NettyServerBuilder.forAddress(domainSocket)
+                            .channelType(EpollServerDomainSocketChannel.class)
+                            .workerEventLoopGroup(new EpollEventLoopGroup())
+                            .bossEventLoopGroup(new EpollEventLoopGroup())
+                            .maxMessageSize(Integer.MAX_VALUE)
+                            .permitKeepAliveTime(KEEP_ALIVE_TIME_SEC, TimeUnit.SECONDS);
+            for (BindableService service : services) {
+                // Wrap the service to extract headers
+                builder.addService(
+                        ServerInterceptors.intercept(
+                                service, GrpcContextHeaderAccessorProvider.interceptor()));
+            }
+            return builder.build().start();
+        }
+    }
+
+    /**
+     * Creates a {@link Server gRPC Server} using an Epoll socket. Note that this requires <a
+     * href="http://netty.io/wiki/forked-tomcat-native.html">Netty TcNative</a> available to be able
+     * to provide a {@link EpollServerSocketChannel}.
+     *
+     * <p>The server is created listening any open port on "localhost".
+     */
+    private static class EpollSocket extends ServerFactory {
+        @Override
+        public Server allocateAddressAndCreate(
+                List<BindableService> services,
+                Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor)
+                throws IOException {
+            InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
+            Server server = createServer(services, address);
+            apiServiceDescriptor.setUrl(
+                    HostAndPort.fromParts(address.getHostName(), server.getPort()).toString());
+            return server;
+        }
+
+        @Override
+        public Server create(
+                List<BindableService> services, Endpoints.ApiServiceDescriptor serviceDescriptor)
+                throws IOException {
+            SocketAddress socketAddress =
+                    SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
+            checkArgument(
+                    socketAddress instanceof InetSocketAddress,
+                    "%s requires a host:port socket address, got %s",
+                    EpollSocket.class.getSimpleName(),
+                    serviceDescriptor.getUrl());
+            return createServer(services, (InetSocketAddress) socketAddress);
+        }
+
+        private static Server createServer(List<BindableService> services, InetSocketAddress socket)
+                throws IOException {
+            ServerBuilder builder =
+                    NettyServerBuilder.forAddress(socket)
+                            .channelType(EpollServerSocketChannel.class)
+                            .workerEventLoopGroup(new EpollEventLoopGroup())
+                            .bossEventLoopGroup(new EpollEventLoopGroup())
+                            .maxMessageSize(Integer.MAX_VALUE)
+                            .permitKeepAliveTime(KEEP_ALIVE_TIME_SEC, TimeUnit.SECONDS);
+            for (BindableService service : services) {
+                // Wrap the service to extract headers
+                builder.addService(
+                        ServerInterceptors.intercept(
+                                service, GrpcContextHeaderAccessorProvider.interceptor()));
+            }
+            return builder.build().start();
+        }
+    }
+
+    /**
+     * Factory that constructs client-accessible URLs from a local server address and port.
+     * Necessary when clients access server from a different networking context.
+     */
+    @FunctionalInterface
+    public interface UrlFactory {
+        String createUrl(String address, int port);
+
+        static UrlFactory createDefault() {
+            return (host, port) -> HostAndPort.fromParts(host, port).toString();
+        }
+    }
+}


[flink] 03/03: [FLINK-29155][python] Adjust the default value of bundle size and arrow batch size

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 519aeb2d190a0584c90279ebb872e6d0a94a94d7
Author: huangxingbo <hx...@apache.org>
AuthorDate: Tue Nov 22 14:59:29 2022 +0800

    [FLINK-29155][python] Adjust the default value of bundle size and arrow batch size
    
    The default value of bundle size is 100000. But in most scenarios, this
    default value is unreasonable, and it is easy to cause instability of
    the grpc server, so we adjust the default value to 1000.
    
    This closes #21363.
---
 docs/layouts/shortcodes/generated/python_configuration.html           | 4 ++--
 flink-python/src/main/java/org/apache/flink/python/PythonOptions.java | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/python_configuration.html b/docs/layouts/shortcodes/generated/python_configuration.html
index a6b5779a977..a7421bf6949 100644
--- a/docs/layouts/shortcodes/generated/python_configuration.html
+++ b/docs/layouts/shortcodes/generated/python_configuration.html
@@ -40,13 +40,13 @@
         </tr>
         <tr>
             <td><h5>python.fn-execution.arrow.batch.size</h5></td>
-            <td style="word-wrap: break-word;">10000</td>
+            <td style="word-wrap: break-word;">1000</td>
             <td>Integer</td>
             <td>The maximum number of elements to include in an arrow batch for Python user-defined function execution. The arrow batch size should not exceed the bundle size. Otherwise, the bundle size will be used as the arrow batch size.</td>
         </tr>
         <tr>
             <td><h5>python.fn-execution.bundle.size</h5></td>
-            <td style="word-wrap: break-word;">100000</td>
+            <td style="word-wrap: break-word;">1000</td>
             <td>Integer</td>
             <td>The maximum number of elements to include in a bundle for Python user-defined function execution. The elements are processed asynchronously. One bundle of elements are processed before processing the next bundle of elements. A larger value can improve the throughput, but at the cost of more memory usage and higher latency.</td>
         </tr>
diff --git a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
index ec3ef9c1e0b..71f3f4744f7 100644
--- a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
+++ b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
@@ -36,7 +36,7 @@ public class PythonOptions {
     public static final ConfigOption<Integer> MAX_BUNDLE_SIZE =
             ConfigOptions.key("python.fn-execution.bundle.size")
                     .intType()
-                    .defaultValue(100000)
+                    .defaultValue(1000)
                     .withDescription(
                             "The maximum number of elements to include in a bundle for Python "
                                     + "user-defined function execution. The elements are processed asynchronously. "
@@ -57,7 +57,7 @@ public class PythonOptions {
     public static final ConfigOption<Integer> MAX_ARROW_BATCH_SIZE =
             ConfigOptions.key("python.fn-execution.arrow.batch.size")
                     .intType()
-                    .defaultValue(10000)
+                    .defaultValue(1000)
                     .withDescription(
                             "The maximum number of elements to include in an arrow batch for Python "
                                     + "user-defined function execution. The arrow batch size should not exceed the "