You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/09/01 07:58:25 UTC

[flink-benchmarks] branch master updated (de04957 -> af7b427)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git.


    from de04957  [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints
     new 479f7f7  [hotfix] Do not restart benchmarks in case of failures
     new 9499394  [FLINK-18905] Provide basic benchmarks for MultipleInputStreamOperator
     new af7b427  Bump default Flink version o 1.12

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pom.xml                                            |  2 +-
 .../flink/benchmark/FlinkEnvironmentContext.java   |  3 +-
 ...tBenchmark.java => MultipleInputBenchmark.java} | 65 ++++++++++----------
 .../apache/flink/benchmark/TwoInputBenchmark.java  |  4 +-
 .../operators/MultiplyByTwoOperatorFactory.java    | 70 ++++++++++++++++++++++
 5 files changed, 108 insertions(+), 36 deletions(-)
 copy src/main/java/org/apache/flink/benchmark/{TwoInputBenchmark.java => MultipleInputBenchmark.java} (60%)
 create mode 100644 src/main/java/org/apache/flink/benchmark/operators/MultiplyByTwoOperatorFactory.java


[flink-benchmarks] 02/03: [FLINK-18905] Provide basic benchmarks for MultipleInputStreamOperator

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git

commit 94993944130e7975e0a4b99855b89dfd2e59a158
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Feb 21 16:36:05 2020 +0100

    [FLINK-18905] Provide basic benchmarks for MultipleInputStreamOperator
---
 ...tBenchmark.java => MultipleInputBenchmark.java} | 65 ++++++++++----------
 .../apache/flink/benchmark/TwoInputBenchmark.java  |  4 +-
 .../operators/MultiplyByTwoOperatorFactory.java    | 70 ++++++++++++++++++++++
 3 files changed, 105 insertions(+), 34 deletions(-)

diff --git a/src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java b/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
similarity index 60%
copy from src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java
copy to src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
index d9291a8..f070c59 100644
--- a/src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
@@ -18,13 +18,15 @@
 
 package org.apache.flink.benchmark;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.benchmark.functions.LongSource;
 import org.apache.flink.benchmark.functions.QueuingLongSource;
-import org.apache.flink.benchmark.operators.MultiplyByTwoCoStreamMap;
+import org.apache.flink.benchmark.operators.MultiplyByTwoOperatorFactory;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
 
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.OperationsPerInvocation;
@@ -34,67 +36,68 @@ import org.openjdk.jmh.runner.options.Options;
 import org.openjdk.jmh.runner.options.OptionsBuilder;
 import org.openjdk.jmh.runner.options.VerboseMode;
 
-public class TwoInputBenchmark extends BenchmarkBase {
+public class MultipleInputBenchmark extends BenchmarkBase {
 
-	public static final int RECORDS_PER_INVOCATION = 25_000_000;
-
-	public static final int ONE_IDLE_RECORDS_PER_INVOCATION = 15_000_000;
-
-	private static final long CHECKPOINT_INTERVAL_MS = 100;
+	public static final int RECORDS_PER_INVOCATION = TwoInputBenchmark.RECORDS_PER_INVOCATION;
+	public static final int ONE_IDLE_RECORDS_PER_INVOCATION = TwoInputBenchmark.ONE_IDLE_RECORDS_PER_INVOCATION;
+	public static final long CHECKPOINT_INTERVAL_MS = TwoInputBenchmark.CHECKPOINT_INTERVAL_MS;
 
 	public static void main(String[] args)
 		throws RunnerException {
 		Options options = new OptionsBuilder()
 			.verbosity(VerboseMode.NORMAL)
-			.include(".*" + TwoInputBenchmark.class.getCanonicalName() + ".*")
+			.include(".*" + MultipleInputBenchmark.class.getSimpleName() + ".*")
 			.build();
 
 		new Runner(options).run();
 	}
 
 	@Benchmark
-	@OperationsPerInvocation(value = TwoInputBenchmark.RECORDS_PER_INVOCATION)
-	public void twoInputMapSink(FlinkEnvironmentContext context) throws Exception {
+	@OperationsPerInvocation(RECORDS_PER_INVOCATION)
+	public void multiInputMapSink(FlinkEnvironmentContext context) throws Exception {
 
 		StreamExecutionEnvironment env = context.env;
-
 		env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
-		env.setParallelism(1);
-
-		// Setting buffer timeout to 1 is an attempt to improve twoInputMapSink benchmark stability.
-		// Without 1ms buffer timeout, some JVM forks are much slower then others, making results
-		// unstable and unreliable.
-		env.setBufferTimeout(1);
 
 		long numRecordsPerInput = RECORDS_PER_INVOCATION / 2;
 		DataStreamSource<Long> source1 = env.addSource(new LongSource(numRecordsPerInput));
 		DataStreamSource<Long> source2 = env.addSource(new LongSource(numRecordsPerInput));
-
-		source1
-			.connect(source2)
-			.transform("custom operator", TypeInformation.of(Long.class), new MultiplyByTwoCoStreamMap())
-			.addSink(new DiscardingSink<>());
+		connectAndDiscard(env, source1, source2);
 
 		env.execute();
 	}
 
 	@Benchmark
-	@OperationsPerInvocation(value = TwoInputBenchmark.ONE_IDLE_RECORDS_PER_INVOCATION)
-	public void twoInputOneIdleMapSink(FlinkEnvironmentContext context) throws Exception {
+	@OperationsPerInvocation(ONE_IDLE_RECORDS_PER_INVOCATION)
+	public void multiInputOneIdleMapSink(FlinkEnvironmentContext context) throws Exception {
 
 		StreamExecutionEnvironment env = context.env;
 		env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
-		env.setParallelism(1);
 
 		QueuingLongSource.reset();
 		DataStreamSource<Long> source1 = env.addSource(new QueuingLongSource(1, ONE_IDLE_RECORDS_PER_INVOCATION - 1));
 		DataStreamSource<Long> source2 = env.addSource(new QueuingLongSource(2, 1));
-
-		source1
-				.connect(source2)
-				.transform("custom operator", TypeInformation.of(Long.class), new MultiplyByTwoCoStreamMap())
-				.addSink(new DiscardingSink<>());
+		connectAndDiscard(env, source1, source2);
 
 		env.execute();
 	}
+
+	private static void connectAndDiscard(
+			StreamExecutionEnvironment env,
+			DataStreamSource<Long> source1,
+			DataStreamSource<Long> source2) {
+		MultipleInputTransformation<Long> transform = new MultipleInputTransformation<>(
+				"custom operator",
+				new MultiplyByTwoOperatorFactory(),
+				BasicTypeInfo.LONG_TYPE_INFO,
+				1);
+
+		transform.addInput(source1.getTransformation());
+		transform.addInput(source2.getTransformation());
+
+		env.addOperator(transform);
+		new MultipleConnectedStreams(env)
+				.transform(transform)
+				.addSink(new DiscardingSink<>());
+	}
 }
diff --git a/src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java b/src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java
index d9291a8..fe39549 100644
--- a/src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java
@@ -37,10 +37,8 @@ import org.openjdk.jmh.runner.options.VerboseMode;
 public class TwoInputBenchmark extends BenchmarkBase {
 
 	public static final int RECORDS_PER_INVOCATION = 25_000_000;
-
 	public static final int ONE_IDLE_RECORDS_PER_INVOCATION = 15_000_000;
-
-	private static final long CHECKPOINT_INTERVAL_MS = 100;
+	public static final long CHECKPOINT_INTERVAL_MS = 100;
 
 	public static void main(String[] args)
 		throws RunnerException {
diff --git a/src/main/java/org/apache/flink/benchmark/operators/MultiplyByTwoOperatorFactory.java b/src/main/java/org/apache/flink/benchmark/operators/MultiplyByTwoOperatorFactory.java
new file mode 100644
index 0000000..657bc3f
--- /dev/null
+++ b/src/main/java/org/apache/flink/benchmark/operators/MultiplyByTwoOperatorFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark.operators;
+
+import org.apache.flink.streaming.api.operators.AbstractInput;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Arrays;
+import java.util.List;
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class MultiplyByTwoOperatorFactory extends AbstractStreamOperatorFactory<Long> {
+	@Override
+	public <T extends StreamOperator<Long>> T createStreamOperator(StreamOperatorParameters<Long> parameters) {
+		return (T) new MultiplyByTwoOperator(parameters);
+	}
+
+	@Override
+	public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+		return MultiplyByTwoOperator.class;
+	}
+
+	public static class MultiplyByTwoOperator extends AbstractStreamOperatorV2<Long> implements MultipleInputStreamOperator<Long> {
+		public MultiplyByTwoOperator(StreamOperatorParameters<Long> parameters) {
+			super(parameters, 2);
+		}
+
+		@Override
+		public List<Input> getInputs() {
+			return Arrays.asList(
+					new MultiplyByTwoOperator.MultiplyByTwoInput(this, 1),
+					new MultiplyByTwoOperator.MultiplyByTwoInput(this, 2));
+		}
+
+		private static class MultiplyByTwoInput extends AbstractInput<Long, Long> {
+			MultiplyByTwoInput(
+					AbstractStreamOperatorV2<Long> owner,
+					int inputId) {
+				super(owner, inputId);
+			}
+
+			@Override
+			public void processElement(StreamRecord<Long> element) {
+				output.collect(element.replace(element.getValue() * 2));
+			}
+		}
+	}
+}


[flink-benchmarks] 01/03: [hotfix] Do not restart benchmarks in case of failures

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git

commit 479f7f7c19507b72f03d777df5bf615720907068
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Aug 31 15:46:54 2020 +0200

    [hotfix] Do not restart benchmarks in case of failures
    
    This is in order to prevent faulty benchmarkes being restarted indefinetely
    effectively dead/livelocking a benchmark.
---
 src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
index c3d1b2c..c6bdbe8 100644
--- a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
+++ b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.benchmark;
 
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -49,7 +50,7 @@ public class FlinkEnvironmentContext {
         if (objectReuse) {
             env.getConfig().enableObjectReuse();
         }
-
+        env.setRestartStrategy(RestartStrategies.noRestart());
         env.setStateBackend(new MemoryStateBackend());
     }
 


[flink-benchmarks] 03/03: Bump default Flink version o 1.12

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git

commit af7b4275d28c20efc036eedff7f0184891900ec9
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Wed Aug 26 16:48:23 2020 +0200

    Bump default Flink version o 1.12
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 3fbde9e..cfe0de8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,7 +30,7 @@ under the License.
 
 	<properties>
 		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-		<flink.version>1.11-SNAPSHOT</flink.version>
+		<flink.version>1.12-SNAPSHOT</flink.version>
 		<flink.shaded.version>10.0</flink.shaded.version>
 		<netty.tcnative.version>2.0.25.Final</netty.tcnative.version>
 		<java.version>1.8</java.version>