You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/12/01 13:12:07 UTC

[flink] branch master updated (c65591d4109 -> 20808fde1ca)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from c65591d4109 [FLINK-30138][runtime][test] Increases timeout to essentially wait forever for an expected Exception to be thrown
     new d145febe96f [hotfix] Migrate NettyShuffleEnvironmentTest to Junit5 and AssertJ.
     new 20808fde1ca [FLINK-27944][runtime] Input channel metric will no longer be registered multiple times

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../io/network/NettyShuffleEnvironment.java        |  10 +-
 .../partition/consumer/SingleInputGateFactory.java |   5 +-
 .../io/network/NettyShuffleEnvironmentTest.java    | 132 ++++++++++++++-------
 .../partition/consumer/SingleInputGateTest.java    |   4 +-
 .../StreamNetworkBenchmarkEnvironment.java         |   4 +-
 5 files changed, 103 insertions(+), 52 deletions(-)


[flink] 02/02: [FLINK-27944][runtime] Input channel metric will no longer be registered multiple times

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 20808fde1ca7abf6afeeb9cda9586df4bbb95a85
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Dec 1 02:03:17 2022 +0800

    [FLINK-27944][runtime] Input channel metric will no longer be registered multiple times
    
    Co-authored-by: xiaogang <zh...@shizhuang-inc.com>
---
 .../io/network/NettyShuffleEnvironment.java        | 10 ++++-
 .../partition/consumer/SingleInputGateFactory.java |  5 +--
 .../io/network/NettyShuffleEnvironmentTest.java    | 45 ++++++++++++++++++++++
 .../partition/consumer/SingleInputGateTest.java    |  4 +-
 .../StreamNetworkBenchmarkEnvironment.java         |  4 +-
 5 files changed, 62 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
index ebfc929b148..2faff9650d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
 import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
 import org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory;
 import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
@@ -250,6 +251,9 @@ public class NettyShuffleEnvironment
 
             MetricGroup networkInputGroup = ownerContext.getInputGroup();
 
+            InputChannelMetrics inputChannelMetrics =
+                    new InputChannelMetrics(networkInputGroup, ownerContext.getParentGroup());
+
             SingleInputGate[] inputGates =
                     new SingleInputGate[inputGateDeploymentDescriptors.size()];
             for (int gateIndex = 0; gateIndex < inputGates.length; gateIndex++) {
@@ -257,7 +261,11 @@ public class NettyShuffleEnvironment
                         inputGateDeploymentDescriptors.get(gateIndex);
                 SingleInputGate inputGate =
                         singleInputGateFactory.create(
-                                ownerContext, gateIndex, igdd, partitionProducerStateProvider);
+                                ownerContext,
+                                gateIndex,
+                                igdd,
+                                partitionProducerStateProvider,
+                                inputChannelMetrics);
                 InputGateID id =
                         new InputGateID(
                                 igdd.getConsumedResultId(), ownerContext.getExecutionAttemptID());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
index d7d4b0108c4..d4a6423f473 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
@@ -116,7 +116,8 @@ public class SingleInputGateFactory {
             @Nonnull ShuffleIOOwnerContext owner,
             int gateIndex,
             @Nonnull InputGateDeploymentDescriptor igdd,
-            @Nonnull PartitionProducerStateProvider partitionProducerStateProvider) {
+            @Nonnull PartitionProducerStateProvider partitionProducerStateProvider,
+            @Nonnull InputChannelMetrics metrics) {
         SupplierWithException<BufferPool, IOException> bufferPoolFactory =
                 createBufferPoolFactory(networkBufferPool, floatingNetworkBuffersPerGate);
 
@@ -148,8 +149,6 @@ public class SingleInputGateFactory {
                         maybeCreateBufferDebloater(
                                 owningTaskName, gateIndex, networkInputGroup.addGroup(gateIndex)));
 
-        InputChannelMetrics metrics =
-                new InputChannelMetrics(networkInputGroup, owner.getParentGroup());
         createInputChannels(owningTaskName, igdd, inputGate, subpartitionIndexRange, metrics);
         return inputGate;
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
index 18c34e0f569..7aea39512ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
@@ -47,7 +47,9 @@ import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.throughput.BufferDebloatConfiguration;
 import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -58,8 +60,11 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
@@ -196,6 +201,46 @@ class NettyShuffleEnvironmentTest {
         }
     }
 
+    @Test
+    void testInputChannelMetricsOnlyRegisterOnce() throws IOException {
+        try (NettyShuffleEnvironment environment = new NettyShuffleEnvironmentBuilder().build()) {
+            Map<String, Integer> metricRegisteredCounter = new HashMap<>();
+            InterceptingTaskMetricGroup taskMetricGroup =
+                    new InterceptingTaskMetricGroup() {
+                        @Override
+                        protected void addMetric(String name, Metric metric) {
+                            metricRegisteredCounter.compute(
+                                    name,
+                                    (metricName, registerCount) ->
+                                            registerCount == null ? 1 : registerCount + 1);
+                            super.addMetric(name, metric);
+                        }
+                    };
+            ShuffleIOOwnerContext ownerContext =
+                    environment.createShuffleIOOwnerContext(
+                            "faker owner", createExecutionAttemptId(), taskMetricGroup);
+            final int numberOfGates = 3;
+            List<InputGateDeploymentDescriptor> gateDeploymentDescriptors = new ArrayList<>();
+            IntermediateDataSetID[] ids = new IntermediateDataSetID[numberOfGates];
+            for (int i = 0; i < numberOfGates; i++) {
+                ids[i] = new IntermediateDataSetID();
+                gateDeploymentDescriptors.add(
+                        new InputGateDeploymentDescriptor(
+                                ids[i],
+                                ResultPartitionType.PIPELINED,
+                                0,
+                                new ShuffleDescriptor[] {
+                                    NettyShuffleDescriptorBuilder.newBuilder().buildRemote()
+                                }));
+            }
+
+            environment.createInputGates(
+                    ownerContext, (ignore1, ignore2, ignore3) -> {}, gateDeploymentDescriptors);
+            // all metric should only be registered once.
+            assertThat(metricRegisteredCounter).allSatisfy((k, v) -> assertThat(v).isOne());
+        }
+    }
+
     private Metric getDebloatingMetric(Map<String, Metric> metrics, int i, String metricName) {
         final String inputScope = "taskmanager.job.task.Shuffle.Netty.Input";
         return metrics.get(inputScope + "." + i + "." + metricName);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index c5757cba4b2..2604d570a70 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -94,6 +94,7 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.cr
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createLocalInputChannel;
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel;
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createResultSubpartitionView;
+import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.newUnregisteredInputChannelMetrics;
 import static org.apache.flink.runtime.io.network.partition.InputGateFairnessTest.setupInputGate;
 import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
 import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
@@ -1227,7 +1228,8 @@ public class SingleInputGateTest extends InputGateTestBase {
                                 "TestTask", taskMetricGroup.executionId(), taskMetricGroup),
                         0,
                         gateDesc,
-                        SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER);
+                        SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER,
+                        newUnregisteredInputChannelMetrics());
     }
 
     private static Map<InputGateID, SingleInputGate> createInputGateWithLocalChannels(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index c52f241ac98..2b74faf3006 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -51,6 +51,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 
+import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.newUnregisteredInputChannelMetrics;
 import static org.apache.flink.util.ExceptionUtils.suppressExceptions;
 
 /**
@@ -274,7 +275,8 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
                                 taskMetricGroup),
                         gateIndex,
                         gateDescriptor,
-                        SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER);
+                        SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER,
+                        newUnregisteredInputChannelMetrics());
 
         return new InputGateWithMetrics(singleGate, new SimpleCounter());
     }


[flink] 01/02: [hotfix] Migrate NettyShuffleEnvironmentTest to Junit5 and AssertJ.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d145febe96feb131996bc579d25f4dea0a93c4f0
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Dec 1 01:54:06 2022 +0800

    [hotfix] Migrate NettyShuffleEnvironmentTest to Junit5 and AssertJ.
---
 .../io/network/NettyShuffleEnvironmentTest.java    | 87 ++++++++++------------
 1 file changed, 41 insertions(+), 46 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
index d1ec985bc24..18c34e0f569 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
@@ -52,13 +52,10 @@ import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.throughput.BufferDebloatConfiguration;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -70,27 +67,26 @@ import java.util.concurrent.Executors;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager;
 import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.powermock.api.mockito.PowerMockito.spy;
 
 /** Various tests for the {@link NettyShuffleEnvironment} class. */
-public class NettyShuffleEnvironmentTest extends TestLogger {
+class NettyShuffleEnvironmentTest {
 
     private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
 
     private static FileChannelManager fileChannelManager;
 
-    @Rule public ExpectedException expectedException = ExpectedException.none();
-
-    @BeforeClass
-    public static void setUp() {
+    @BeforeAll
+    static void setUp() {
         fileChannelManager = new FileChannelManagerImpl(new String[] {tempDir}, "testing");
     }
 
-    @AfterClass
-    public static void shutdown() throws Exception {
+    @AfterAll
+    static void shutdown() throws Exception {
         fileChannelManager.close();
     }
 
@@ -100,7 +96,7 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
      * working with the bare minimum of required buffers.
      */
     @Test
-    public void testRegisterTaskWithLimitedBuffers() throws Exception {
+    void testRegisterTaskWithLimitedBuffers() throws Exception {
         // outgoing: 1 buffer per channel + 1 extra buffer per ResultPartition
         // incoming: 2 exclusive buffers per channel + 1 floating buffer per single gate
         final int bufferCount =
@@ -114,7 +110,7 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
      * fails if the bare minimum of required buffers is not available (we are one buffer short).
      */
     @Test
-    public void testRegisterTaskWithInsufficientBuffers() throws Exception {
+    void testRegisterTaskWithInsufficientBuffers() throws Exception {
         // outgoing: 1 buffer per channel + 1 extra buffer per ResultPartition
         // incoming: 2 exclusive buffers per channel + 1 floating buffer per single gate
         final int bufferCount =
@@ -124,13 +120,13 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
                                         .defaultValue()
                         - 1;
 
-        expectedException.expect(IOException.class);
-        expectedException.expectMessage("Insufficient number of network buffers");
-        testRegisterTaskWithLimitedBuffers(bufferCount);
+        assertThatThrownBy(() -> testRegisterTaskWithLimitedBuffers(bufferCount))
+                .isInstanceOf(IOException.class)
+                .hasMessageContaining("Insufficient number of network buffers");
     }
 
     @Test
-    public void testSlowIODoesNotBlockRelease() throws Exception {
+    void testSlowIODoesNotBlockRelease() throws Exception {
         BlockerSync sync = new BlockerSync();
         ResultPartitionManager blockingResultPartitionManager =
                 new ResultPartitionManager() {
@@ -154,7 +150,7 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 
     @Test
     @SuppressWarnings("unchecked")
-    public void testRegisteringDebloatingMetrics() throws IOException {
+    void testRegisteringDebloatingMetrics() throws IOException {
         Map<String, Metric> metrics = new ConcurrentHashMap<>();
         final TaskMetricGroup taskMetricGroup = createTaskMetricGroup(metrics);
         final Configuration config = new Configuration();
@@ -183,22 +179,20 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
                                     new NettyShuffleDescriptorBuilder().buildRemote()
                                 })));
         for (int i = 0; i < 2; i++) {
-            assertEquals(
-                    TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().getBytes(),
-                    (long)
+            assertThat(
                             ((Gauge<Integer>)
                                             getDebloatingMetric(
                                                     metrics, i, MetricNames.DEBLOATED_BUFFER_SIZE))
-                                    .getValue());
-            assertEquals(
-                    0L,
-                    (long)
+                                    .getValue())
+                    .isEqualTo(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().getBytes());
+            assertThat(
                             ((Gauge<Long>)
                                             getDebloatingMetric(
                                                     metrics,
                                                     i,
                                                     MetricNames.ESTIMATED_TIME_TO_CONSUME_BUFFERS))
-                                    .getValue());
+                                    .getValue())
+                    .isZero();
         }
     }
 
@@ -269,29 +263,30 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
         Task.setupPartitionsAndGates(resultPartitions, inputGates);
 
         // verify buffer pools for the result partitions
-        assertEquals(Integer.MAX_VALUE, rp1.getBufferPool().getMaxNumberOfMemorySegments());
-        assertEquals(Integer.MAX_VALUE, rp2.getBufferPool().getMaxNumberOfMemorySegments());
-        assertEquals(expectedBuffers, rp3.getBufferPool().getMaxNumberOfMemorySegments());
-        assertEquals(expectedRp4Buffers, rp4.getBufferPool().getMaxNumberOfMemorySegments());
+        assertThat(rp1.getBufferPool().getMaxNumberOfMemorySegments()).isEqualTo(Integer.MAX_VALUE);
+        assertThat(rp2.getBufferPool().getMaxNumberOfMemorySegments()).isEqualTo(Integer.MAX_VALUE);
+        assertThat(rp3.getBufferPool().getMaxNumberOfMemorySegments()).isEqualTo(expectedBuffers);
+        assertThat(rp4.getBufferPool().getMaxNumberOfMemorySegments())
+                .isEqualTo(expectedRp4Buffers);
 
         for (ResultPartition rp : resultPartitions) {
-            assertEquals(
-                    rp.getNumberOfSubpartitions() + 1,
-                    rp.getBufferPool().getNumberOfRequiredMemorySegments());
-            assertEquals(rp.getNumberOfSubpartitions() + 1, rp.getBufferPool().getNumBuffers());
+            assertThat(rp.getBufferPool().getNumberOfRequiredMemorySegments())
+                    .isEqualTo(rp.getNumberOfSubpartitions() + 1);
+            assertThat(rp.getBufferPool().getNumBuffers())
+                    .isEqualTo(rp.getNumberOfSubpartitions() + 1);
         }
 
         // verify buffer pools for the input gates (NOTE: credit-based uses minimum required buffers
         // for exclusive buffers not managed by the buffer pool)
-        assertEquals(1, ig1.getBufferPool().getNumberOfRequiredMemorySegments());
-        assertEquals(1, ig2.getBufferPool().getNumberOfRequiredMemorySegments());
-        assertEquals(1, ig3.getBufferPool().getNumberOfRequiredMemorySegments());
-        assertEquals(1, ig4.getBufferPool().getNumberOfRequiredMemorySegments());
-
-        assertEquals(floatingBuffers, ig1.getBufferPool().getMaxNumberOfMemorySegments());
-        assertEquals(floatingBuffers, ig2.getBufferPool().getMaxNumberOfMemorySegments());
-        assertEquals(floatingBuffers, ig3.getBufferPool().getMaxNumberOfMemorySegments());
-        assertEquals(floatingBuffers, ig4.getBufferPool().getMaxNumberOfMemorySegments());
+        assertThat(ig1.getBufferPool().getNumberOfRequiredMemorySegments()).isOne();
+        assertThat(ig2.getBufferPool().getNumberOfRequiredMemorySegments()).isOne();
+        assertThat(ig3.getBufferPool().getNumberOfRequiredMemorySegments()).isOne();
+        assertThat(ig4.getBufferPool().getNumberOfRequiredMemorySegments()).isOne();
+
+        assertThat(ig1.getBufferPool().getMaxNumberOfMemorySegments()).isEqualTo(floatingBuffers);
+        assertThat(ig2.getBufferPool().getMaxNumberOfMemorySegments()).isEqualTo(floatingBuffers);
+        assertThat(ig3.getBufferPool().getMaxNumberOfMemorySegments()).isEqualTo(floatingBuffers);
+        assertThat(ig4.getBufferPool().getMaxNumberOfMemorySegments()).isEqualTo(floatingBuffers);
 
         verify(ig1, times(1)).setupChannels();
         verify(ig2, times(1)).setupChannels();