You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/02/28 15:32:07 UTC

[flink] branch master updated: [FLINK-25026] UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint fails on AZP

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e0dd372  [FLINK-25026] UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint fails on AZP
e0dd372 is described below

commit e0dd372337fb43e89988cd53129542bff0e86b14
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Feb 22 16:27:47 2022 +0100

    [FLINK-25026] UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint fails on AZP
    
    Tests that extend from UnalignedCheckpointTestBase create a lot of
    MiniClusters. E.g. the rescale it case creates 72 tests * 2 clusters
    (pre & post rescale). Direct buffers allocated by netty are freed during
    the GC.
    
    At the same time Flink uses PooledBufferAllocator, where we return used
    buffers earlier and we do not need to wait for GC to kick in. The idea
    to make the test more stable is to reuse a single NettyBufferPool for
    all clusters that are started in those tests. That way we can reuse
    buffers that were previously allocated and we do not need to wait until
    they are freed.
    
    Lastly as a note. This should not be an issue in production setups, as
    we do not start multiple shuffle environments in a single JVM process
    (TM).
---
 .../io/network/NettyShuffleServiceFactory.java     |  42 ++++++---
 .../io/network/netty/NettyConnectionManager.java   |  21 ++++-
 .../SharedPoolNettyShuffleServiceFactory.java      | 104 +++++++++++++++++++++
 .../checkpointing/UnalignedCheckpointTestBase.java |  47 +++++-----
 4 files changed, 175 insertions(+), 39 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index e50b8e6..05e3932 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -108,13 +108,41 @@ public class NettyShuffleServiceFactory
             ResultPartitionManager resultPartitionManager,
             MetricGroup metricGroup,
             Executor ioExecutor) {
+        NettyConfig nettyConfig = config.nettyConfig();
+        ConnectionManager connectionManager =
+                nettyConfig != null
+                        ? new NettyConnectionManager(
+                                resultPartitionManager,
+                                taskEventPublisher,
+                                nettyConfig,
+                                config.getMaxNumberOfConnections(),
+                                config.isConnectionReuseEnabled())
+                        : new LocalConnectionManager();
+        return createNettyShuffleEnvironment(
+                config,
+                taskExecutorResourceId,
+                taskEventPublisher,
+                resultPartitionManager,
+                connectionManager,
+                metricGroup,
+                ioExecutor);
+    }
+
+    @VisibleForTesting
+    public static NettyShuffleEnvironment createNettyShuffleEnvironment(
+            NettyShuffleEnvironmentConfiguration config,
+            ResourceID taskExecutorResourceId,
+            TaskEventPublisher taskEventPublisher,
+            ResultPartitionManager resultPartitionManager,
+            ConnectionManager connectionManager,
+            MetricGroup metricGroup,
+            Executor ioExecutor) {
         checkNotNull(config);
         checkNotNull(taskExecutorResourceId);
         checkNotNull(taskEventPublisher);
         checkNotNull(resultPartitionManager);
         checkNotNull(metricGroup);
-
-        NettyConfig nettyConfig = config.nettyConfig();
+        checkNotNull(connectionManager);
 
         FileChannelManager fileChannelManager =
                 new FileChannelManagerImpl(config.getTempDirs(), DIR_NAME_PREFIX);
@@ -127,16 +155,6 @@ public class NettyShuffleServiceFactory
                             .collect(Collectors.joining("\n\t")));
         }
 
-        ConnectionManager connectionManager =
-                nettyConfig != null
-                        ? new NettyConnectionManager(
-                                resultPartitionManager,
-                                taskEventPublisher,
-                                nettyConfig,
-                                config.getMaxNumberOfConnections(),
-                                config.isConnectionReuseEnabled())
-                        : new LocalConnectionManager();
-
         NetworkBufferPool networkBufferPool =
                 new NetworkBufferPool(
                         config.numNetworkBuffers(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
index 84de0cb..a240172 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.PartitionRequestClient;
@@ -47,9 +48,27 @@ public class NettyConnectionManager implements ConnectionManager {
             int maxNumberOfConnections,
             boolean connectionReuseEnabled) {
 
+        this(
+                new NettyBufferPool(nettyConfig.getNumberOfArenas()),
+                partitionProvider,
+                taskEventPublisher,
+                nettyConfig,
+                maxNumberOfConnections,
+                connectionReuseEnabled);
+    }
+
+    @VisibleForTesting
+    public NettyConnectionManager(
+            NettyBufferPool bufferPool,
+            ResultPartitionProvider partitionProvider,
+            TaskEventPublisher taskEventPublisher,
+            NettyConfig nettyConfig,
+            int maxNumberOfConnections,
+            boolean connectionReuseEnabled) {
+
         this.server = new NettyServer(nettyConfig);
         this.client = new NettyClient(nettyConfig);
-        this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());
+        this.bufferPool = checkNotNull(bufferPool);
 
         this.partitionRequestClientFactory =
                 new PartitionRequestClientFactory(
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SharedPoolNettyShuffleServiceFactory.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SharedPoolNettyShuffleServiceFactory.java
new file mode 100644
index 0000000..e22187c
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SharedPoolNettyShuffleServiceFactory.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.NettyShuffleServiceFactory;
+import org.apache.flink.runtime.io.network.TaskEventPublisher;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
+import org.apache.flink.runtime.shuffle.ShuffleServiceFactory;
+import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A variation on {@link NettyShuffleServiceFactory} that uses a single {@link NettyBufferPool} for
+ * all created {@link ShuffleEnvironment environments}.
+ *
+ * <p>Used in {@link UnalignedCheckpointTestBase}.
+ */
+public final class SharedPoolNettyShuffleServiceFactory
+        implements ShuffleServiceFactory<NettyShuffleDescriptor, ResultPartition, SingleInputGate> {
+
+    private final NettyShuffleServiceFactory nettyShuffleServiceFactory =
+            new NettyShuffleServiceFactory();
+
+    private static NettyBufferPool bufferPool;
+
+    public static void resetBufferPool(int numberOfArenas) {
+        bufferPool = new NettyBufferPool(numberOfArenas);
+    }
+
+    public static void clearBufferPool() {
+        bufferPool = null;
+    }
+
+    @Override
+    public ShuffleMaster<NettyShuffleDescriptor> createShuffleMaster(
+            ShuffleMasterContext shuffleMasterContext) {
+        return nettyShuffleServiceFactory.createShuffleMaster(shuffleMasterContext);
+    }
+
+    @Override
+    public ShuffleEnvironment<ResultPartition, SingleInputGate> createShuffleEnvironment(
+            ShuffleEnvironmentContext shuffleEnvironmentContext) {
+
+        checkNotNull(shuffleEnvironmentContext);
+        NettyShuffleEnvironmentConfiguration networkConfig =
+                NettyShuffleEnvironmentConfiguration.fromConfiguration(
+                        shuffleEnvironmentContext.getConfiguration(),
+                        shuffleEnvironmentContext.getNetworkMemorySize(),
+                        shuffleEnvironmentContext.isLocalCommunicationOnly(),
+                        shuffleEnvironmentContext.getHostAddress());
+
+        final NettyConfig nettyConfig = networkConfig.nettyConfig();
+        final TaskEventPublisher taskEventPublisher = shuffleEnvironmentContext.getEventPublisher();
+        final ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
+        final ConnectionManager connectionManager =
+                nettyConfig != null
+                        ? new NettyConnectionManager(
+                                bufferPool,
+                                resultPartitionManager,
+                                taskEventPublisher,
+                                nettyConfig,
+                                networkConfig.getMaxNumberOfConnections(),
+                                networkConfig.isConnectionReuseEnabled())
+                        : new LocalConnectionManager();
+
+        return NettyShuffleServiceFactory.createNettyShuffleEnvironment(
+                networkConfig,
+                shuffleEnvironmentContext.getTaskExecutorResourceId(),
+                taskEventPublisher,
+                resultPartitionManager,
+                connectionManager,
+                shuffleEnvironmentContext.getParentMetricGroup(),
+                shuffleEnvironmentContext.getIoExecutor());
+    }
+}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index bdfba39..d4a344b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -51,6 +51,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.runtime.shuffle.ShuffleServiceOptions;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
@@ -69,8 +70,9 @@ import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.FutureUtils;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
-import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.ErrorCollector;
@@ -123,6 +125,20 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
 
     @Rule public TestName name = new TestName();
 
+    @BeforeClass
+    public static void beforeAll() {
+        // set to some high enough number, it is recommended to have as many arenas as slots
+        // If a single buffer pool is shared between all tms we should have tms * slots_per_tm
+        // This should be the maximum across all tests run
+        SharedPoolNettyShuffleServiceFactory.resetBufferPool(60);
+    }
+
+    @AfterClass
+    public static void afterAll() {
+        // safety precaution, make sure the buffer pool can be cleared by th GC
+        SharedPoolNettyShuffleServiceFactory.clearBufferPool();
+    }
+
     @Nullable
     protected File execute(UnalignedSettings settings) throws Exception {
         final File checkpointDir = temp.newFolder();
@@ -140,6 +156,7 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
                         .mapToInt(node -> node.getParallelism())
                         .reduce(0, settings.channelType.slotSharing ? Integer::max : Integer::sum);
         int numberTaskmanagers = settings.channelType.slotsToTaskManagers.apply(requiredSlots);
+
         final int slotsPerTM = (requiredSlots + numberTaskmanagers - 1) / numberTaskmanagers;
         final MiniClusterWithClientResource miniCluster =
                 new MiniClusterWithClientResource(
@@ -156,7 +173,6 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
             // print the test parameters to help debugging when the case is stuck
             System.out.println(
                     "Starting " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");
-            waitForCleanShutdown();
             final CompletableFuture<JobSubmissionResult> result =
                     miniCluster.getMiniCluster().submitJob(streamGraph.getJobGraph());
 
@@ -196,30 +212,6 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
         return setupEnv.getStreamGraph();
     }
 
-    private void waitForCleanShutdown() throws InterruptedException {
-        // direct memory in netty will be freed through gc/finalization
-        // too many successive executions will lead to OOM by netty
-        // slow down when half the memory is taken and wait for gc
-        if (PlatformDependent.usedDirectMemory() > PlatformDependent.maxDirectMemory() / 2) {
-            final Duration waitTime = Duration.ofSeconds(10);
-            Deadline deadline = Deadline.fromNow(waitTime);
-            while (PlatformDependent.usedDirectMemory() > 0 && deadline.hasTimeLeft()) {
-                System.gc();
-                Thread.sleep(100);
-            }
-            final Duration timeLeft = deadline.timeLeft();
-            if (timeLeft.isNegative()) {
-                LOG.warn(
-                        "Waited 10s for clean shutdown of previous runs but there is still direct memory in use: "
-                                + PlatformDependent.usedDirectMemory());
-            } else {
-                LOG.info(
-                        "Needed to wait {} ms for full cleanup of previous runs.",
-                        waitTime.minus(timeLeft).toMillis());
-            }
-        }
-    }
-
     protected abstract void checkCounters(JobExecutionResult result);
 
     /** A source that generates longs in a fixed number of splits. */
@@ -763,6 +755,9 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
                         restoreCheckpoint.toURI().toString());
             }
 
+            conf.set(
+                    ShuffleServiceOptions.SHUFFLE_SERVICE_FACTORY_CLASS,
+                    "org.apache.flink.test.checkpointing.SharedPoolNettyShuffleServiceFactory");
             conf.set(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, buffersPerChannel);
             conf.set(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 60000);
             conf.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1));