You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/08/27 07:11:14 UTC

[flink-benchmarks] branch master updated: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git


The following commit(s) were added to refs/heads/master by this push:
     new de04957  [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints
de04957 is described below

commit de049572690f4f744c9706dba5268e242f9e3bc8
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Mon Aug 24 12:19:10 2020 +0200

    [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints
    
    The benchmark results as follows:
    
    Benchmark                                             (mode)   Mode  Cnt   Score   Error    Units
    UnalignedCheckpointTimeBenchmark.unalignedCheckpoint  REMOTE  thrpt   30  46.774 ± 3.111  ops/min
    UnalignedCheckpointTimeBenchmark.unalignedCheckpoint   LOCAL  thrpt   30  55.811 ± 3.590  ops/min
    
    Benchmark                                             (mode)   Mode  Cnt      Score      Error   Units
    RemoteChannelThroughputBenchmark.remoteRebalance    ALIGNED    thrpt   30  14499.562 ± 1019.582  ops/ms
    RemoteChannelThroughputBenchmark.remoteRebalance    UNALIGNED  thrpt   30  13393.426 ±  587.422  ops/ms
---
 .../flink/benchmark/FlinkEnvironmentContext.java   |   3 +-
 .../RemoteChannelThroughputBenchmark.java          |   5 +
 .../UnalignedCheckpointTimeBenchmark.java          | 158 +++++++++++++++++++++
 3 files changed, 165 insertions(+), 1 deletion(-)

diff --git a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
index c60a9b2..c3d1b2c 100644
--- a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
+++ b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
@@ -35,7 +35,7 @@ public class FlinkEnvironmentContext {
 
     public static final int NUM_NETWORK_BUFFERS = 1000;
 
-    public final StreamExecutionEnvironment env = getStreamExecutionEnvironment();
+    public StreamExecutionEnvironment env;
 
     protected final int parallelism = 1;
     protected final boolean objectReuse = true;
@@ -43,6 +43,7 @@ public class FlinkEnvironmentContext {
     @Setup
     public void setUp() throws IOException {
         // set up the execution environment
+        env = getStreamExecutionEnvironment();
         env.setParallelism(parallelism);
         env.getConfig().disableSysoutLogging();
         if (objectReuse) {
diff --git a/src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java b/src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java
index 2cb0c7f..a88deea 100644
--- a/src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
 
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.TearDown;
 import org.openjdk.jmh.runner.Runner;
@@ -48,6 +49,9 @@ public class RemoteChannelThroughputBenchmark extends BenchmarkBase {
 
     private MiniCluster miniCluster;
 
+    @Param({"ALIGNED", "UNALIGNED"})
+    public String mode = "ALIGNED";
+
     public static void main(String[] args)
             throws RunnerException {
         Options options = new OptionsBuilder()
@@ -80,6 +84,7 @@ public class RemoteChannelThroughputBenchmark extends BenchmarkBase {
         StreamExecutionEnvironment env = context.env;
         env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
         env.setParallelism(PARALLELISM);
+        env.getCheckpointConfig().enableUnalignedCheckpoints(!mode.equals("ALIGNED"));
 
         DataStreamSource<Long> source = env.addSource(new LongSource(RECORDS_PER_SUBTASK));
         source
diff --git a/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java b/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
new file mode 100644
index 0000000..463f91a
--- /dev/null
+++ b/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.io.IOException;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+
+/**
+ * The benchmark for measuring the time taken to finish the configured number of
+ * unaligned checkpoints.
+ */
+@OutputTimeUnit(MINUTES)
+@OperationsPerInvocation(value = UnalignedCheckpointTimeBenchmark.NUM_FINISHED_CHECKPOINTS)
+public class UnalignedCheckpointTimeBenchmark extends BenchmarkBase {
+    public static final int NUM_FINISHED_CHECKPOINTS = 5;
+    private static final int NUM_VERTICES = 3;
+    private static final int PARALLELISM = 4;
+    private static final long CHECKPOINT_INTERVAL_MS = 10;
+
+    public static void main(String[] args) throws RunnerException {
+        Options options = new OptionsBuilder()
+            .verbosity(VerboseMode.NORMAL)
+            .include(UnalignedCheckpointTimeBenchmark.class.getCanonicalName())
+            .build();
+
+        new Runner(options).run();
+    }
+
+    @Benchmark
+    public void unalignedCheckpoint(UnalignedCheckpointEnvironmentContext context) throws Exception {
+        StreamExecutionEnvironment env = context.env;
+        DataStreamSource<byte[]> source = env.addSource(new FiniteCheckpointSource(NUM_FINISHED_CHECKPOINTS));
+        source
+            .slotSharingGroup("source").rebalance()
+            .map((MapFunction<byte[], byte[]>) value -> value).slotSharingGroup("map").rebalance()
+            .addSink(new SlowDiscardSink<>()).slotSharingGroup("sink");
+
+        env.execute();
+    }
+
+    public static class UnalignedCheckpointEnvironmentContext extends FlinkEnvironmentContext {
+
+        @Param({"REMOTE", "LOCAL"})
+        public String mode = "REMOTE";
+
+        @Setup
+        public void setUp() throws IOException {
+            super.setUp();
+
+            env.setParallelism(parallelism);
+            env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
+            env.getCheckpointConfig().enableUnalignedCheckpoints(true);
+        }
+
+        protected Configuration createConfiguration() {
+            Configuration conf = super.createConfiguration();
+
+            if (mode.equals("REMOTE")) {
+                conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
+                conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_VERTICES * PARALLELISM);
+            } else {
+                conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_VERTICES * PARALLELISM);
+                conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+            }
+            return conf;
+        }
+    }
+
+    /**
+     * The source for finishing the configured number of checkpoints before exiting.
+     */
+    public static class FiniteCheckpointSource extends RichParallelSourceFunction<byte[]> implements CheckpointListener {
+
+        private final int numExpectedCheckpoints;
+        private final byte[] bytes = new byte[1024];
+
+        private volatile boolean running = true;
+        private volatile int numFinishedCheckpoints;
+
+        FiniteCheckpointSource(int numCheckpoints) {
+            this.numExpectedCheckpoints = numCheckpoints;
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) {
+            ++numFinishedCheckpoints;
+        }
+
+
+        @Override
+        public void run(SourceContext<byte[]> ctx) {
+            while (running) {
+                synchronized (ctx.getCheckpointLock()) {
+                    ctx.collect(bytes);
+
+                    if (numFinishedCheckpoints >= numExpectedCheckpoints) {
+                        cancel();
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void cancel() {
+            running = false;
+        }
+    }
+
+    /**
+     * The custom sink for processing records slowly to cause accumulate in-flight
+     * buffers even back pressure.
+     */
+    public static class SlowDiscardSink<T> implements SinkFunction<T> {
+
+        @Override
+        public void invoke(T value, SinkFunction.Context context) throws Exception {
+            Thread.sleep(3);
+        }
+    }
+}