You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/06/10 12:22:19 UTC
[flink] branch master updated: [FLINK-17795][example] Add
MatrixVectorMul example
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0c9e7b2 [FLINK-17795][example] Add MatrixVectorMul example
0c9e7b2 is described below
commit 0c9e7b21897cc3b4258ff21b295c9e0e8d7cd13f
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Mon May 25 18:31:41 2020 +0800
[FLINK-17795][example] Add MatrixVectorMul example
This closes #12398.
---
flink-dist/src/main/assemblies/bin.xml | 1 +
flink-examples/flink-examples-streaming/pom.xml | 68 ++++++
.../streaming/examples/gpu/MatrixVectorMul.java | 244 +++++++++++++++++++++
3 files changed, 313 insertions(+)
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index bc8bb4f..c68f9d6 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -235,6 +235,7 @@ under the License.
<excludes>
<exclude>flink-examples-streaming*.jar</exclude>
<exclude>original-*.jar</exclude>
+ <exclude>MatrixVectorMul.jar</exclude>
</excludes>
</fileSet>
diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml
index a055d5e..0bd841e 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -34,6 +34,11 @@ under the License.
<packaging>jar</packaging>
+ <!-- Allow users to pass custom jcuda versions -->
+ <properties>
+ <jcuda.version>10.0.0</jcuda.version>
+ </properties>
+
<dependencies>
<!-- core dependencies -->
@@ -88,6 +93,33 @@ under the License.
<version>${project.version}</version>
</dependency>
+ <!-- Dependencies for MatrixVectorMul. We exclude native libraries
+ because it is not available in all the operating systems and architectures. Moreover,
+ we also want to enable users to compile and run MatrixVectorMul in different runtime environments.-->
+ <dependency>
+ <groupId>org.jcuda</groupId>
+ <artifactId>jcuda</artifactId>
+ <version>${jcuda.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jcuda</groupId>
+ <artifactId>jcuda-natives</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.jcuda</groupId>
+ <artifactId>jcublas</artifactId>
+ <version>${jcuda.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jcuda</groupId>
+ <artifactId>jcublas-natives</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
</dependencies>
<build>
@@ -365,6 +397,42 @@ under the License.
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>MatrixVectorMul</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <shadeTestJar>false</shadeTestJar>
+ <finalName>MatrixVectorMul</finalName>
+ <artifactSet>
+ <includes>
+ <include>org.jcuda:*</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>org.apache.flink:*</artifact>
+ <includes>
+ <include>org/apache/flink/streaming/examples/gpu/MatrixVectorMul.class</include>
+ <include>org/apache/flink/streaming/examples/gpu/MatrixVectorMul$*.class</include>
+ </includes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.flink.streaming.examples.gpu.MatrixVectorMul</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
<!-- Scala Compiler -->
<plugin>
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/gpu/MatrixVectorMul.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/gpu/MatrixVectorMul.java
new file mode 100644
index 0000000..a0177c2
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/gpu/MatrixVectorMul.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.gpu;
+
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.util.Preconditions;
+
+import jcuda.Pointer;
+import jcuda.Sizeof;
+import jcuda.jcublas.JCublas;
+import jcuda.runtime.JCuda;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Implements the matrix-vector multiplication program that shows how to use GPU resources in Flink.
+ *
+ * <p>The input is a vector stream from a {@link RandomVectorSource}, which will generate random vectors with specified
+ * dimension. The data size of the vector stream could be specified by user. Each vector will be multiplied with a random
+ * dimension * dimension matrix in {@link Multiplier} and the result would be emitted to output.
+ *
+ * <p>Usage: MatrixVectorMul [--output <path>] [--dimension <dimension> --data-size <data_size>]
+ *
+ * <p>If no parameters are provided, the program is run with default vector dimension 10 and data size 100.
+ *
+ * <p>This example shows how to:
+ * <ul>
+ * <li>leverage external resource in operators,
+ * <li>accelerate complex calculation with GPU resources.
+ * </ul>
+ *
+ * <p>Notice that you need to add JCuda natives libraries in your Flink distribution by the following steps:
+ * <ul>
+ * <li>download the JCuda native libraries bundle for your CUDA version from http://www.jcuda.org/downloads/
+ * <li>copy the native libraries jcuda-natives and jcublas-natives for your CUDA version, operating system and architecture
+ * to the "lib/" folder of your Flink distribution
+ * </ul>
+ */
+public class MatrixVectorMul {
+
+ private static final int DEFAULT_DIM = 10;
+ private static final int DEFAULT_DATA_SIZE = 100;
+ private static final String DEFAULT_RESOURCE_NAME = "gpu";
+
+ public static void main(String[] args) throws Exception {
+
+ // Checking input parameters
+ final ParameterTool params = ParameterTool.fromArgs(args);
+ System.out.println("Usage: MatrixVectorMul [--output <path>] [--dimension <dimension> --data-size <data_size>] [--resource-name <resource_name>]");
+
+ // Set up the execution environment
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // Make parameters available in the web interface
+ env.getConfig().setGlobalJobParameters(params);
+
+ final int dimension = params.getInt("dimension", DEFAULT_DIM);
+ final int dataSize = params.getInt("data-size", DEFAULT_DATA_SIZE);
+ final String resourceName = params.get("resource-name", DEFAULT_RESOURCE_NAME);
+
+ DataStream<List<Float>> result = env.addSource(new RandomVectorSource(dimension, dataSize))
+ .map(new Multiplier(dimension, resourceName));
+
+ // Emit result
+ if (params.has("output")) {
+ result.addSink(StreamingFileSink.forRowFormat(new Path(params.get("output")),
+ new SimpleStringEncoder<List<Float>>()).build());
+ } else {
+ System.out.println("Printing result to stdout. Use --output to specify output path.");
+ result.print();
+ }
+ // Execute program
+ env.execute("Matrix-Vector Multiplication");
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /**
+ * Random vector source which generates random vectors with specified dimension and total data size.
+ */
+ private static final class RandomVectorSource extends RichSourceFunction<List<Float>> {
+
+ private transient volatile boolean running;
+ private final int dimension;
+ private final int dataSize;
+
+ RandomVectorSource(int dimension, int dataSize) {
+ this.dimension = dimension;
+ this.dataSize = dataSize;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ running = true;
+ }
+
+ @Override
+ public void run(SourceContext<List<Float>> ctx) {
+ int count = 0;
+ while (running && count < dataSize) {
+ List<Float> randomRecord = new ArrayList<>();
+ for (int i = 0; i < dimension; ++i) {
+ randomRecord.add((float) Math.random());
+ }
+ ctx.collect(randomRecord);
+ count += 1;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }
+
+ /**
+ * Matrix-Vector multiplier using CUBLAS library.
+ */
+ private static final class Multiplier extends RichMapFunction<List<Float>, List<Float>> {
+ private final int dimension;
+ private final String resourceName;
+ private Pointer matrixPointer;
+
+ Multiplier(int dimension, String resourceName) {
+ this.dimension = dimension;
+ this.resourceName = resourceName;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ // When multiple instances of this class and JCuda exist in different class loaders, then we will get UnsatisfiedLinkError.
+ // To avoid that, we need to temporarily override the java.io.tmpdir, where the JCuda store its native library, with a random path.
+ // For more details please refer to https://issues.apache.org/jira/browse/FLINK-5408 and the discussion in http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Classloader-and-removal-of-native-libraries-td14808.html
+ final String originTempDir = System.getProperty("java.io.tmpdir");
+ final String newTempDir = originTempDir + "/jcuda-" + UUID.randomUUID();
+ System.setProperty("java.io.tmpdir", newTempDir);
+
+ final Set<ExternalResourceInfo> externalResourceInfos = getRuntimeContext().getExternalResourceInfos(resourceName);
+ Preconditions.checkState(!externalResourceInfos.isEmpty(), "The MatrixVectorMul needs at least one GPU device while finding 0 GPU.");
+ final Optional<String> firstIndexOptional = externalResourceInfos.iterator().next().getProperty("index");
+ Preconditions.checkState(firstIndexOptional.isPresent());
+
+ matrixPointer = new Pointer();
+ final float[] matrix = new float[dimension * dimension];
+ // Initialize a random matrix
+ for (int i = 0; i < dimension * dimension; ++i) {
+ matrix[i] = (float) Math.random();
+ }
+
+ // Set the CUDA device
+ JCuda.cudaSetDevice(Integer.parseInt(firstIndexOptional.get()));
+
+ // Initialize JCublas
+ JCublas.cublasInit();
+
+ // Allocate device memory for the matrix
+ JCublas.cublasAlloc(dimension * dimension, Sizeof.FLOAT, matrixPointer);
+ JCublas.cublasSetVector(dimension * dimension, Sizeof.FLOAT, Pointer.to(matrix), 1, matrixPointer, 1);
+
+ // Change the java.io.tmpdir back to its original value.
+ System.setProperty("java.io.tmpdir", originTempDir);
+ }
+
+ @Override
+ public List<Float> map(List<Float> value) {
+ final float[] input = new float[dimension];
+ final float[] output = new float[dimension];
+ final Pointer inputPointer = new Pointer();
+ final Pointer outputPointer = new Pointer();
+
+ // Fill the input and output vector
+ for (int i = 0; i < dimension; i++) {
+ input[i] = value.get(i);
+ output[i] = 0;
+ }
+
+ // Allocate device memory for the input and output
+ JCublas.cublasAlloc(dimension, Sizeof.FLOAT, inputPointer);
+ JCublas.cublasAlloc(dimension, Sizeof.FLOAT, outputPointer);
+
+ // Initialize the device matrices
+ JCublas.cublasSetVector(dimension, Sizeof.FLOAT, Pointer.to(input), 1, inputPointer, 1);
+ JCublas.cublasSetVector(dimension, Sizeof.FLOAT, Pointer.to(output), 1, outputPointer, 1);
+
+ // Performs operation using JCublas
+ JCublas.cublasSgemv('n', dimension, dimension, 1.0f,
+ matrixPointer, dimension, inputPointer, 1, 0.0f, outputPointer, 1);
+
+ // Read the result back
+ JCublas.cublasGetVector(dimension, Sizeof.FLOAT, outputPointer, 1, Pointer.to(output), 1);
+
+ // Memory clean up
+ JCublas.cublasFree(inputPointer);
+ JCublas.cublasFree(outputPointer);
+
+ List<Float> outputList = new ArrayList<>();
+ for (int i = 0; i < dimension; ++i) {
+ outputList.add(output[i]);
+ }
+
+ return outputList;
+ }
+
+ @Override
+ public void close() {
+ // Memory clean up
+ JCublas.cublasFree(matrixPointer);
+
+ // Shutdown cublas
+ JCublas.cublasShutdown();
+ }
+ }
+}