You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/08/27 07:11:14 UTC
[flink-benchmarks] branch master updated:
[FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints
This is an automated email from the ASF dual-hosted git repository.
zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git
The following commit(s) were added to refs/heads/master by this push:
new de04957 [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints
de04957 is described below
commit de049572690f4f744c9706dba5268e242f9e3bc8
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Mon Aug 24 12:19:10 2020 +0200
[FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints
The benchmark results as follows:
Benchmark (mode) Mode Cnt Score Error Units
UnalignedCheckpointTimeBenchmark.unalignedCheckpoint REMOTE thrpt 30 46.774 ± 3.111 ops/min
UnalignedCheckpointTimeBenchmark.unalignedCheckpoint LOCAL thrpt 30 55.811 ± 3.590 ops/min
Benchmark (mode) Mode Cnt Score Error Units
RemoteChannelThroughputBenchmark.remoteRebalance ALIGNED thrpt 30 14499.562 ± 1019.582 ops/ms
RemoteChannelThroughputBenchmark.remoteRebalance UNALIGNED thrpt 30 13393.426 ± 587.422 ops/ms
---
.../flink/benchmark/FlinkEnvironmentContext.java | 3 +-
.../RemoteChannelThroughputBenchmark.java | 5 +
.../UnalignedCheckpointTimeBenchmark.java | 158 +++++++++++++++++++++
3 files changed, 165 insertions(+), 1 deletion(-)
diff --git a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
index c60a9b2..c3d1b2c 100644
--- a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
+++ b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
@@ -35,7 +35,7 @@ public class FlinkEnvironmentContext {
public static final int NUM_NETWORK_BUFFERS = 1000;
- public final StreamExecutionEnvironment env = getStreamExecutionEnvironment();
+ public StreamExecutionEnvironment env;
protected final int parallelism = 1;
protected final boolean objectReuse = true;
@@ -43,6 +43,7 @@ public class FlinkEnvironmentContext {
@Setup
public void setUp() throws IOException {
// set up the execution environment
+ env = getStreamExecutionEnvironment();
env.setParallelism(parallelism);
env.getConfig().disableSysoutLogging();
if (objectReuse) {
diff --git a/src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java b/src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java
index 2cb0c7f..a88deea 100644
--- a/src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.runner.Runner;
@@ -48,6 +49,9 @@ public class RemoteChannelThroughputBenchmark extends BenchmarkBase {
private MiniCluster miniCluster;
+ @Param({"ALIGNED", "UNALIGNED"})
+ public String mode = "ALIGNED";
+
public static void main(String[] args)
throws RunnerException {
Options options = new OptionsBuilder()
@@ -80,6 +84,7 @@ public class RemoteChannelThroughputBenchmark extends BenchmarkBase {
StreamExecutionEnvironment env = context.env;
env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
env.setParallelism(PARALLELISM);
+ env.getCheckpointConfig().enableUnalignedCheckpoints(!mode.equals("ALIGNED"));
DataStreamSource<Long> source = env.addSource(new LongSource(RECORDS_PER_SUBTASK));
source
diff --git a/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java b/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
new file mode 100644
index 0000000..463f91a
--- /dev/null
+++ b/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.io.IOException;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+
+/**
+ * The benchmark for measuring the time taken to finish the configured number of
+ * unaligned checkpoints.
+ */
+@OutputTimeUnit(MINUTES)
+@OperationsPerInvocation(value = UnalignedCheckpointTimeBenchmark.NUM_FINISHED_CHECKPOINTS)
+public class UnalignedCheckpointTimeBenchmark extends BenchmarkBase {
+ public static final int NUM_FINISHED_CHECKPOINTS = 5;
+ private static final int NUM_VERTICES = 3;
+ private static final int PARALLELISM = 4;
+ private static final long CHECKPOINT_INTERVAL_MS = 10;
+
+ public static void main(String[] args) throws RunnerException {
+ Options options = new OptionsBuilder()
+ .verbosity(VerboseMode.NORMAL)
+ .include(UnalignedCheckpointTimeBenchmark.class.getCanonicalName())
+ .build();
+
+ new Runner(options).run();
+ }
+
+ @Benchmark
+ public void unalignedCheckpoint(UnalignedCheckpointEnvironmentContext context) throws Exception {
+ StreamExecutionEnvironment env = context.env;
+ DataStreamSource<byte[]> source = env.addSource(new FiniteCheckpointSource(NUM_FINISHED_CHECKPOINTS));
+ source
+ .slotSharingGroup("source").rebalance()
+ .map((MapFunction<byte[], byte[]>) value -> value).slotSharingGroup("map").rebalance()
+ .addSink(new SlowDiscardSink<>()).slotSharingGroup("sink");
+
+ env.execute();
+ }
+
+ public static class UnalignedCheckpointEnvironmentContext extends FlinkEnvironmentContext {
+
+ @Param({"REMOTE", "LOCAL"})
+ public String mode = "REMOTE";
+
+ @Setup
+ public void setUp() throws IOException {
+ super.setUp();
+
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
+ env.getCheckpointConfig().enableUnalignedCheckpoints(true);
+ }
+
+ protected Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+
+ if (mode.equals("REMOTE")) {
+ conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
+ conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_VERTICES * PARALLELISM);
+ } else {
+ conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_VERTICES * PARALLELISM);
+ conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+ }
+ return conf;
+ }
+ }
+
+ /**
+ * The source for finishing the configured number of checkpoints before exiting.
+ */
+ public static class FiniteCheckpointSource extends RichParallelSourceFunction<byte[]> implements CheckpointListener {
+
+ private final int numExpectedCheckpoints;
+ private final byte[] bytes = new byte[1024];
+
+ private volatile boolean running = true;
+ private volatile int numFinishedCheckpoints;
+
+ FiniteCheckpointSource(int numCheckpoints) {
+ this.numExpectedCheckpoints = numCheckpoints;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ ++numFinishedCheckpoints;
+ }
+
+
+ @Override
+ public void run(SourceContext<byte[]> ctx) {
+ while (running) {
+ synchronized (ctx.getCheckpointLock()) {
+ ctx.collect(bytes);
+
+ if (numFinishedCheckpoints >= numExpectedCheckpoints) {
+ cancel();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }
+
+ /**
+ * The custom sink for processing records slowly to cause accumulate in-flight
+ * buffers even back pressure.
+ */
+ public static class SlowDiscardSink<T> implements SinkFunction<T> {
+
+ @Override
+ public void invoke(T value, SinkFunction.Context context) throws Exception {
+ Thread.sleep(3);
+ }
+ }
+}