You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/07/21 14:11:01 UTC

[flink-benchmarks] branch master updated: [FLINK-18631] benchmark for scala ADT source

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e081d5  [FLINK-18631] benchmark for scala ADT source
1e081d5 is described below

commit 1e081d5d052aaf031d270bbae0a9b95ff527d6f8
Author: Grebennikov Roman <gr...@dfdx.me>
AuthorDate: Mon Jul 20 12:06:57 2020 +0200

    [FLINK-18631] benchmark for scala ADT source
---
 pom.xml                                            | 22 +++++++++++++
 .../full/SerializationFrameworkAllBenchmarks.java  | 15 +++++++++
 .../functions/BaseSourceWithKeyRange.java          |  1 +
 .../flink/benchmark/functions/ScalaADTSource.scala | 38 ++++++++++++++++++++++
 4 files changed, 76 insertions(+)

diff --git a/pom.xml b/pom.xml
index 38f9df8..3fbde9e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -435,6 +435,28 @@ under the License.
 		</extensions>
 		<plugins>
 			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<version>4.4.0</version>
+				<executions>
+					<execution>
+						<id>scala-compile-first</id>
+						<phase>process-resources</phase>
+						<goals>
+							<goal>add-source</goal>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+					<execution>
+						<id>scala-test-compile</id>
+						<phase>process-test-resources</phase>
+						<goals>
+							<goal>testCompile</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-compiler-plugin</artifactId>
 				<version>${maven.compiler.version}</version>
diff --git a/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java b/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java
index 5c83e15..d9efbb7 100644
--- a/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java
+++ b/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.benchmark.FlinkEnvironmentContext;
 import org.apache.flink.benchmark.SerializationFrameworkMiniBenchmarks;
 import org.apache.flink.benchmark.functions.BaseSourceWithKeyRange;
+import org.apache.flink.benchmark.functions.ScalaADTSource;
 import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
@@ -117,6 +118,20 @@ public class SerializationFrameworkAllBenchmarks extends SerializationFrameworkM
 
 	@Benchmark
 	@OperationsPerInvocation(value = SerializationFrameworkMiniBenchmarks.RECORDS_PER_INVOCATION)
+	public void serializerScalaADT(FlinkEnvironmentContext context) throws Exception {
+		StreamExecutionEnvironment env = context.env;
+		env.setParallelism(4);
+
+		env.addSource(new ScalaADTSource(RECORDS_PER_INVOCATION), ScalaADTSource.adtTypeInfo())
+				.rebalance()
+				.addSink(new DiscardingSink<>());
+
+		env.execute();
+	}
+
+
+	@Benchmark
+	@OperationsPerInvocation(value = SerializationFrameworkMiniBenchmarks.RECORDS_PER_INVOCATION)
 	public void serializerKryoThrift(FlinkEnvironmentContext context) throws Exception {
 		StreamExecutionEnvironment env = context.env;
 		env.setParallelism(4);
diff --git a/src/main/java/org/apache/flink/benchmark/functions/BaseSourceWithKeyRange.java b/src/main/java/org/apache/flink/benchmark/functions/BaseSourceWithKeyRange.java
index 21cd586..d65f34c 100644
--- a/src/main/java/org/apache/flink/benchmark/functions/BaseSourceWithKeyRange.java
+++ b/src/main/java/org/apache/flink/benchmark/functions/BaseSourceWithKeyRange.java
@@ -20,6 +20,7 @@ package org.apache.flink.benchmark.functions;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 
 /**
  * Abstract base class for sources with a defined number of events and a fixed key range.
diff --git a/src/main/scala/org/apache/flink/benchmark/functions/ScalaADTSource.scala b/src/main/scala/org/apache/flink/benchmark/functions/ScalaADTSource.scala
new file mode 100644
index 0000000..78622e2
--- /dev/null
+++ b/src/main/scala/org/apache/flink/benchmark/functions/ScalaADTSource.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.benchmark.functions
+
+import org.apache.flink.api.scala._
+import org.apache.flink.benchmark.functions.ScalaADTSource.{ADT, ADT1, ADT2}
+
+class ScalaADTSource(numEvents: Int) extends BaseSourceWithKeyRange[ADT](numEvents,2) {
+  override protected def getElement(keyId: Int): ADT = keyId match {
+    case 0 => ADT1("a")
+    case 1 => ADT2(1)
+  }
+}
+
+object ScalaADTSource {
+  sealed trait ADT
+  case class ADT1(a: String) extends ADT
+  case class ADT2(a: Int) extends ADT
+
+  val adtTypeInfo = createTypeInformation[ADT]
+}
+
+