You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/07/21 14:11:01 UTC
[flink-benchmarks] branch master updated: [FLINK-18631] benchmark
for scala ADT source
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git
The following commit(s) were added to refs/heads/master by this push:
new 1e081d5 [FLINK-18631] benchmark for scala ADT source
1e081d5 is described below
commit 1e081d5d052aaf031d270bbae0a9b95ff527d6f8
Author: Grebennikov Roman <gr...@dfdx.me>
AuthorDate: Mon Jul 20 12:06:57 2020 +0200
[FLINK-18631] benchmark for scala ADT source
---
pom.xml | 22 +++++++++++++
.../full/SerializationFrameworkAllBenchmarks.java | 15 +++++++++
.../functions/BaseSourceWithKeyRange.java | 1 +
.../flink/benchmark/functions/ScalaADTSource.scala | 38 ++++++++++++++++++++++
4 files changed, 76 insertions(+)
diff --git a/pom.xml b/pom.xml
index 38f9df8..3fbde9e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -435,6 +435,28 @@ under the License.
</extensions>
<plugins>
<plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>4.4.0</version>
+ <executions>
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>add-source</goal>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>scala-test-compile</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.compiler.version}</version>
diff --git a/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java b/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java
index 5c83e15..d9efbb7 100644
--- a/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java
+++ b/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.benchmark.FlinkEnvironmentContext;
import org.apache.flink.benchmark.SerializationFrameworkMiniBenchmarks;
import org.apache.flink.benchmark.functions.BaseSourceWithKeyRange;
+import org.apache.flink.benchmark.functions.ScalaADTSource;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
@@ -117,6 +118,20 @@ public class SerializationFrameworkAllBenchmarks extends SerializationFrameworkM
@Benchmark
@OperationsPerInvocation(value = SerializationFrameworkMiniBenchmarks.RECORDS_PER_INVOCATION)
+ public void serializerScalaADT(FlinkEnvironmentContext context) throws Exception {
+ StreamExecutionEnvironment env = context.env;
+ env.setParallelism(4);
+
+ env.addSource(new ScalaADTSource(RECORDS_PER_INVOCATION), ScalaADTSource.adtTypeInfo())
+ .rebalance()
+ .addSink(new DiscardingSink<>());
+
+ env.execute();
+ }
+
+
+ @Benchmark
+ @OperationsPerInvocation(value = SerializationFrameworkMiniBenchmarks.RECORDS_PER_INVOCATION)
public void serializerKryoThrift(FlinkEnvironmentContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
env.setParallelism(4);
diff --git a/src/main/java/org/apache/flink/benchmark/functions/BaseSourceWithKeyRange.java b/src/main/java/org/apache/flink/benchmark/functions/BaseSourceWithKeyRange.java
index 21cd586..d65f34c 100644
--- a/src/main/java/org/apache/flink/benchmark/functions/BaseSourceWithKeyRange.java
+++ b/src/main/java/org/apache/flink/benchmark/functions/BaseSourceWithKeyRange.java
@@ -20,6 +20,7 @@ package org.apache.flink.benchmark.functions;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
/**
* Abstract base class for sources with a defined number of events and a fixed key range.
diff --git a/src/main/scala/org/apache/flink/benchmark/functions/ScalaADTSource.scala b/src/main/scala/org/apache/flink/benchmark/functions/ScalaADTSource.scala
new file mode 100644
index 0000000..78622e2
--- /dev/null
+++ b/src/main/scala/org/apache/flink/benchmark/functions/ScalaADTSource.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.benchmark.functions
+
+import org.apache.flink.api.scala._
+import org.apache.flink.benchmark.functions.ScalaADTSource.{ADT, ADT1, ADT2}
+
+class ScalaADTSource(numEvents: Int) extends BaseSourceWithKeyRange[ADT](numEvents,2) {
+ override protected def getElement(keyId: Int): ADT = keyId match {
+ case 0 => ADT1("a")
+ case 1 => ADT2(1)
+ }
+}
+
+object ScalaADTSource {
+ sealed trait ADT
+ case class ADT1(a: String) extends ADT
+ case class ADT2(a: Int) extends ADT
+
+ val adtTypeInfo = createTypeInformation[ADT]
+}
+
+