You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/19 12:35:12 UTC

[GitHub] [flink-benchmarks] zhijiangW opened a new pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

zhijiangW opened a new pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2


   The results as below:
   
   # Run complete. Total time: 00:12:56
   
   Benchmark                                                                                                                 Mode  Cnt     Score     Error   Units
   UnalignedCheckpointBenchmark.unalignedCheckpointWithLocalChannel      thrpt   30  1985.953 ± 156.639  ops/ms
   UnalignedCheckpointBenchmark.unalignedCheckpointWithRemoteChannel  thrpt   30  1327.177 ±  86.421  ops/ms


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] pnowojski commented on a change in pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#discussion_r477106058



##########
File path: src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java
##########
@@ -48,6 +49,9 @@
 
     private MiniCluster miniCluster;
 
+    @Param({"AlignedCheckpoint", "UnalignedCheckpoint"})

Review comment:
       There are no external users that I know of :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] zhijiangW edited a comment on pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
zhijiangW edited a comment on pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#issuecomment-678969057


   I found that there is a limitation for checkpoint interval which must not be less than 10ms. So it would more make sense if we can guarantee that the checkpointing time is more than 10ms in our benchmark or we adjust the limitation to allow the internal more than `0` in `CheckpointConfig`?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] zhijiangW commented on a change in pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
zhijiangW commented on a change in pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#discussion_r477009866



##########
File path: src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java
##########
@@ -80,6 +84,7 @@ public void remoteRebalance(FlinkEnvironmentContext context) throws Exception {
         StreamExecutionEnvironment env = context.env;
         env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
         env.setParallelism(PARALLELISM);
+        env.getCheckpointConfig().enableUnalignedCheckpoints(!mode.equals("AlignedCheckpoint"));

Review comment:
       Either option sounds fine to me. But where is the codespeed UI and can I modify it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] zhijiangW commented on pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
zhijiangW commented on pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#issuecomment-678889692


   > But the problem with the current setup is that if checkpointing time is small, you will be measuring just the time to complete last checkpoint, instead of 5. And the problem is that even if the current interval makes sense, the checkpointing time can change in the future, as we improve things, which would render this benchmark obsolete.
   
   You are right. I misunderstood the semantic of checkpoint interval before. I wrongly thought that the internal was only for triggering the first checkpoint, and the following checkpoint will be triggered immediately after the preceding one completes. As you said, internal `0` should be proper for voiding delays for intermediate checkpoints. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] pnowojski commented on a change in pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#discussion_r477276471



##########
File path: src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
##########
@@ -42,17 +43,22 @@
 
 import static java.util.concurrent.TimeUnit.HOURS;
 
+/**
+ * The benchmark for measuring the time taken to finish the configured number of
+ * unaligned checkpoints.
+ */
 @OutputTimeUnit(HOURS)

Review comment:
       NIT: maybe per minute?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] pnowojski commented on pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
pnowojski commented on pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#issuecomment-678973685


   I think `10ms` might be quick enough, that in the foreseeable future checkpoints will not complete that quickly, so IMO it would be good enough to just set `10ms` as the interval. It would also give the those couple of ms to back pressure on the start up.
   
   From this perspective I think maybe it's not worth adding some extra changes in the production code to allow `0ms` checkpoint interval.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] xintongsong commented on pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
xintongsong commented on pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#issuecomment-681270940


   @pnowojski 
   I assume you've assigned me as a reviewer by mis-operation? I'm completely not familiar with the background of this effort.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] zhijiangW edited a comment on pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
zhijiangW edited a comment on pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#issuecomment-678890642


   > Do you think this can happen? Sources will produce huge number of records immediately (millions of records per second). Even if checkpoint is triggered just a couple of ms after the source started, the task should be already fully back pressured. If this worries you, maybe there is a way of delaying the first checkpoint?
   
   It is also up to how many in-flight buffers we given for network setting. Actually my previous setting of interval 100ms was mainly for delaying the first checkpoint for back pressure, and ignored the delay of intermediate checkpoints. And I also tested to guarantee that there are indeed input & output inflight buffers spilled during checkpointing.  I would try out other ways to make sure the back pressure happen.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] zhijiangW commented on a change in pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
zhijiangW commented on a change in pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#discussion_r477008616



##########
File path: src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java
##########
@@ -48,6 +49,9 @@
 
     private MiniCluster miniCluster;
 
+    @Param({"AlignedCheckpoint", "UnalignedCheckpoint"})

Review comment:
       Actually I tried the shorten name in early version, but i am wondering it might confuse external users, so i changed to the semantic name finally. Anyway, i have no preference here and will change it as you suggested. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] pnowojski commented on a change in pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#discussion_r474641711



##########
File path: src/main/java/org/apache/flink/benchmark/UnalignedCheckpointBenchmark.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.io.IOException;
+
+@OperationsPerInvocation(value = UnalignedCheckpointBenchmark.RECORDS_PER_INVOCATION)

Review comment:
       If that's the sole motivation, maybe change the `@OutputTimeUnit(MILLISECONDS)` to seconds or minutes for this benchmark?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] pnowojski commented on pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
pnowojski commented on pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#issuecomment-681620966


   Yes, sorry @xintongsong ! 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] zhijiangW commented on pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
zhijiangW commented on pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#issuecomment-679053969


   @pnowojski , I have updated the codes based on the comments!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] pnowojski commented on pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
pnowojski commented on pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#issuecomment-678248194


   > I think my current way is exactly for this purpose, just set N =5 and the default concurrent cp should be 1 if I remembered correctly. I am not quite sure whether it is really necessary to set even smaller interval since 100ms is already small enough in most practical cases. If we give much smaller values, the backpressure might not be triggered before N checkpoints finished.
   
   But the problem with the current setup is that if checkpointing time is small, you will be measuring just the time to complete last checkpoint, instead of 5. And the problem is that even if the current interval makes sense, the checkpointing time can change in the future, as we improve things, which would render this benchmark obsolete.
   
   That's why I suggested to just trigger checkpoint as quickly as possible, to avoid this problem.
   
   >  If we give much smaller values, the backpressure might not be triggered before N checkpoints finished.
   
   Do you think this can happen? Sources will produce huge number of records immediately (millions of records per second). Even if checkpoint is triggered just a couple of `ms` after the source started, the task should be already fully back pressured.
   
   If this worries you, maybe there is a way of delaying the first checkpoint?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] pnowojski commented on a change in pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#discussion_r475537292



##########
File path: src/main/java/org/apache/flink/benchmark/UnalignedCheckpointBenchmark.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.io.IOException;
+
+import static java.util.concurrent.TimeUnit.HOURS;
+
+@OutputTimeUnit(HOURS)
+public class UnalignedCheckpointBenchmark extends BenchmarkBase {

Review comment:
       `@OperationsPerInvocation(NUM_FINISHED_CHECKPOINTS)`?

##########
File path: src/main/java/org/apache/flink/benchmark/UnalignedCheckpointBenchmark.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.io.IOException;
+
+import static java.util.concurrent.TimeUnit.HOURS;
+
+@OutputTimeUnit(HOURS)
+public class UnalignedCheckpointBenchmark extends BenchmarkBase {

Review comment:
       nit: add a java doc that this benchmark checks for the time to perform a checkpoint? and/or maybe rename to `UnalignedCheckpointTimeBenchmark`?

##########
File path: src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java
##########
@@ -80,6 +84,7 @@ public void remoteRebalance(FlinkEnvironmentContext context) throws Exception {
         StreamExecutionEnvironment env = context.env;
         env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
         env.setParallelism(PARALLELISM);
+        env.getCheckpointConfig().enableUnalignedCheckpoints(!mode.equals("AlignedCheckpoint"));

Review comment:
       Good idea for re-using this benchmark :)
   
   Before merging this, in order to preserve the benchmark continuity with the previous results, it would be a good idea to rename the `remoteRebalance` benchmark in the codespeed UI to `remoteRebalance.ALIGNED`.
   
   Alternatively after merging, we can remove the `remoteRebalance` benchmark from the UI to clean it up.

##########
File path: src/main/java/org/apache/flink/benchmark/UnalignedCheckpointBenchmark.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.io.IOException;
+
+import static java.util.concurrent.TimeUnit.HOURS;
+
+@OutputTimeUnit(HOURS)
+public class UnalignedCheckpointBenchmark extends BenchmarkBase {
+    private static final int NUM_VERTICES = 3;
+    private static final int PARALLELISM = 4;
+    private static final long CHECKPOINT_INTERVAL_MS = 10;
+    private static final int NUM_FINISHED_CHECKPOINTS = 5;
+
+    public static void main(String[] args) throws RunnerException {
+        Options options = new OptionsBuilder()
+            .verbosity(VerboseMode.NORMAL)
+            .include(UnalignedCheckpointBenchmark.class.getCanonicalName())
+            .build();
+
+        new Runner(options).run();
+    }
+
+    @Benchmark
+    public void unalignedCheckpoint(UnalignedCheckpointEnvironmentContext context) throws Exception {
+        StreamExecutionEnvironment env = context.env;
+        DataStreamSource<byte[]> source = env.addSource(new FiniteCheckpointSource(NUM_FINISHED_CHECKPOINTS));
+        source
+            .slotSharingGroup("source").rebalance()
+            .map((MapFunction<byte[], byte[]>) value -> value).slotSharingGroup("map").rebalance()
+            .addSink(new SlowDiscardSink<>()).slotSharingGroup("sink");
+
+        env.execute();
+    }
+
+    public static class UnalignedCheckpointEnvironmentContext extends FlinkEnvironmentContext {
+
+        @Param({"RemoteChannel", "LocalChannel"})

Review comment:
       `REMOTE`/`LOCAL`?

##########
File path: src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java
##########
@@ -48,6 +49,9 @@
 
     private MiniCluster miniCluster;
 
+    @Param({"AlignedCheckpoint", "UnalignedCheckpoint"})

Review comment:
       `ALIGNED`/`UNALIGNED` to shorten the benchmark name plus to be consistent with the existing parameters (`ORDERED`/`UNORDERED`, `ROCKS`/`MEMORY`/`FS_ASYNC`/...)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] pnowojski commented on a change in pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#discussion_r474641711



##########
File path: src/main/java/org/apache/flink/benchmark/UnalignedCheckpointBenchmark.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.io.IOException;
+
+@OperationsPerInvocation(value = UnalignedCheckpointBenchmark.RECORDS_PER_INVOCATION)

Review comment:
       If that's the sole motivation, maybe change the `@OutputTimeUnit(MILLISECONDS)` to seconds or minutes for this benchmark? The Web UI would be still displaying it incorrectly, but I think that's a lesser evil.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] zhijiangW commented on pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
zhijiangW commented on pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#issuecomment-680795169


   @pnowojski , I have adjusted the `remoteRebalance` to `remoteReblance.ALIGNED` in UI and all the comments are also addressed in separate commit.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] zhijiangW commented on a change in pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
zhijiangW commented on a change in pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#discussion_r475371153



##########
File path: src/main/java/org/apache/flink/benchmark/UnalignedCheckpointBenchmark.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.io.IOException;
+
+@OperationsPerInvocation(value = UnalignedCheckpointBenchmark.RECORDS_PER_INVOCATION)

Review comment:
       Yes, it would be better to adjust `OutputTimeUnit`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] pnowojski commented on a change in pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#discussion_r473849350



##########
File path: src/main/java/org/apache/flink/benchmark/UnalignedCheckpointBenchmark.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.io.IOException;
+
+@OperationsPerInvocation(value = UnalignedCheckpointBenchmark.RECORDS_PER_INVOCATION)

Review comment:
       This is not correct, you are not using `RECORDS_PER_INVOCATION` to control the number of records per invocation.

##########
File path: src/main/java/org/apache/flink/benchmark/UnalignedCheckpointBenchmark.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.io.IOException;
+
+@OperationsPerInvocation(value = UnalignedCheckpointBenchmark.RECORDS_PER_INVOCATION)
+public class UnalignedCheckpointBenchmark extends BenchmarkBase {
+    public static final int RECORDS_PER_INVOCATION = 10_000_000;
+    private static final int NUM_VERTICES = 3;
+    private static final int PARALLELISM = 4;
+    private static final long CHECKPOINT_INTERVAL_MS = 100;
+
+    public static void main(String[] args) throws RunnerException {
+        Options options = new OptionsBuilder()
+            .verbosity(VerboseMode.NORMAL)
+            .include(UnalignedCheckpointBenchmark.class.getCanonicalName())
+            .build();
+
+        new Runner(options).run();
+    }
+
+    @Benchmark
+    public void unalignedCheckpointWithRemoteChannel(UCRemoteEnvironmentContext context) throws Exception {
+        unalignedCheckpoint(context);
+    }
+
+    @Benchmark
+    public void unalignedCheckpointWithLocalChannel(UCLocalEnvironmentContext context) throws Exception {
+        unalignedCheckpoint(context);
+    }

Review comment:
       Just for the Web UI presentation, what about dropping those methods and introducing a parameter `mode = {"LOCAL", "REMOTE"}` that would configure those two benchmarks? As it is now, this would be presented in the UI as `unalignedCheckpointWithRemoteChannel` and `unalignedCheckpointWithLocalChannel` which is a bit lengthy compared to `unalignedCheckpoint.LOCAL` and `unalignedCheckpoint.REMOTE`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] zhijiangW commented on pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
zhijiangW commented on pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#issuecomment-678890642


   > Do you think this can happen? Sources will produce huge number of records immediately (millions of records per second). Even if checkpoint is triggered just a couple of ms after the source started, the task should be already fully back pressured. If this worries you, maybe there is a way of delaying the first checkpoint?
   
   It is also up to how many in-flight buffers we given for network setting. Actually my previous setting of interval 100ms was mainly for delaying the first checkpoint, and ignored the delay of intermediate checkpoints. And I also tested to guarantee that there are input & output inflight buffers spilled during checkpointing.  I would try out other ways to make sure the back pressure happen.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] zhijiangW edited a comment on pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
zhijiangW edited a comment on pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#issuecomment-678890642


   > Do you think this can happen? Sources will produce huge number of records immediately (millions of records per second). Even if checkpoint is triggered just a couple of ms after the source started, the task should be already fully back pressured. If this worries you, maybe there is a way of delaying the first checkpoint?
   
   It is also up to how many in-flight buffers we given for network setting. Actually my previous setting of interval 100ms was mainly for delaying the first checkpoint for back pressure, and ignored the delay of intermediate checkpoints. And I also tested to guarantee that there are input & output inflight buffers spilled during checkpointing.  I would try out other ways to make sure the back pressure happen.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] zhijiangW commented on pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
zhijiangW commented on pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#issuecomment-678969057


   I found that there is a limitation for checkpoint interval which must not be less than 10ms. So it would more make sense if we can guarantee that the checkpointing time is more than 10ms in our benchmark.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] zhijiangW merged pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
zhijiangW merged pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] zhijiangW commented on a change in pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
zhijiangW commented on a change in pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#discussion_r474522794



##########
File path: src/main/java/org/apache/flink/benchmark/UnalignedCheckpointBenchmark.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.io.IOException;
+
+@OperationsPerInvocation(value = UnalignedCheckpointBenchmark.RECORDS_PER_INVOCATION)
+public class UnalignedCheckpointBenchmark extends BenchmarkBase {
+    public static final int RECORDS_PER_INVOCATION = 10_000_000;
+    private static final int NUM_VERTICES = 3;
+    private static final int PARALLELISM = 4;
+    private static final long CHECKPOINT_INTERVAL_MS = 100;
+
+    public static void main(String[] args) throws RunnerException {
+        Options options = new OptionsBuilder()
+            .verbosity(VerboseMode.NORMAL)
+            .include(UnalignedCheckpointBenchmark.class.getCanonicalName())
+            .build();
+
+        new Runner(options).run();
+    }
+
+    @Benchmark
+    public void unalignedCheckpointWithRemoteChannel(UCRemoteEnvironmentContext context) throws Exception {
+        unalignedCheckpoint(context);
+    }
+
+    @Benchmark
+    public void unalignedCheckpointWithLocalChannel(UCLocalEnvironmentContext context) throws Exception {
+        unalignedCheckpoint(context);
+    }

Review comment:
       I will take your option.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] zhijiangW commented on a change in pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
zhijiangW commented on a change in pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#discussion_r475404906



##########
File path: src/main/java/org/apache/flink/benchmark/UnalignedCheckpointBenchmark.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.io.IOException;
+
+@OperationsPerInvocation(value = UnalignedCheckpointBenchmark.RECORDS_PER_INVOCATION)

Review comment:
       It seems normal after adjusting to `HOURS`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] zhijiangW commented on a change in pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
zhijiangW commented on a change in pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#discussion_r478076502



##########
File path: src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
##########
@@ -42,17 +43,22 @@
 
 import static java.util.concurrent.TimeUnit.HOURS;
 
+/**
+ * The benchmark for measuring the time taken to finish the configured number of
+ * unaligned checkpoints.
+ */
 @OutputTimeUnit(HOURS)

Review comment:
       Make sense, already adjusted to `MINUTE`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] pnowojski commented on a change in pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#discussion_r477276471



##########
File path: src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
##########
@@ -42,17 +43,22 @@
 
 import static java.util.concurrent.TimeUnit.HOURS;
 
+/**
+ * The benchmark for measuring the time taken to finish the configured number of
+ * unaligned checkpoints.
+ */
 @OutputTimeUnit(HOURS)

Review comment:
       NIT: maybe per minute? it doesn't matter if the result is reported in similarly sized values as in other benchmarks, while on the other hand `number of checkpoints/minute` seems a bit more natural?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] zhijiangW commented on pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
zhijiangW commented on pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#issuecomment-678116298


   Thanks for the review and discussions, @pnowojski !
   
   > but maybe it would be also better to use FLIP-27 sources here?
   
   Considering the current situation of mostly used legacy source, it is also meaningful to measure the performance in this case. But I agree we can also supplement FLIP-27 source as an follow-up option.
    
   > 1. how fast quickly can we trigger and complete N checkpoints, by setting checkpointing interval to 0ms (or as small number as possible), make sure the number of concurrent checkpoints is set to 1, and just measure how quick we can complete those checkpoints (note, we should also in that case modify @OperationsPerInvocation to N. This with an induced backpressure.
   
   I think my current way is exactly for this purpose, just set `N =5` and the default concurrent cp should be 1 if I remembered correctly. I am not quite sure whether it is really necessary to set even smaller interval since 100ms is already small enough in most practical cases. If we give much smaller values, the backpressure might not be triggered before N checkpoints finished.
   
   > 2. what is the throughput of the job with unaligned checkpoints compared to aligned checkpoints. Set checkpointing interval to some reasonable value and measure how fast can we process N records without any backpressure.
   
   I agree to supplement this case as well, similar with we did in `RemoteBalanceBenchmark`.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] zhijiangW commented on pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
zhijiangW commented on pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#issuecomment-681363205


   @xintongsong , I guess it is a mis-operation, ignore. :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-benchmarks] zhijiangW commented on a change in pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

Posted by GitBox <gi...@apache.org>.
zhijiangW commented on a change in pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#discussion_r474525191



##########
File path: src/main/java/org/apache/flink/benchmark/UnalignedCheckpointBenchmark.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.io.IOException;
+
+@OperationsPerInvocation(value = UnalignedCheckpointBenchmark.RECORDS_PER_INVOCATION)

Review comment:
       Maybe my misuse of `RECORDS_PER_INVOCATION` here. My purpose is only for enlarging the results by a reasonable factor to seem readable and comparable, otherwise the raw results will be small enough.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org