You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2019/09/11 19:12:36 UTC
[beam] branch master updated: [BEAM-7967] Execute portable Flink
application jar
This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 81c44b4 [BEAM-7967] Execute portable Flink application jar
new c4674b6 Merge pull request #9408 from ibzib/flink-execute-jar
81c44b4 is described below
commit 81c44b446d40eff6812f45ed7c4e78e845f2eee2
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Tue Aug 27 13:39:02 2019 -0700
[BEAM-7967] Execute portable Flink application jar
---
.../job_PostCommit_PortableJar_Flink.groovy | 38 ++++++
runners/flink/job-server/flink_job_server.gradle | 17 +++
runners/flink/job-server/test_pipeline_jar.sh | 121 +++++++++++++++++++
.../beam/runners/flink/FlinkPipelineRunner.java | 74 ++++++++++++
.../jobsubmission/PortablePipelineJarUtils.java | 130 +++++++++++++++++++++
5 files changed, 380 insertions(+)
diff --git a/.test-infra/jenkins/job_PostCommit_PortableJar_Flink.groovy b/.test-infra/jenkins/job_PostCommit_PortableJar_Flink.groovy
new file mode 100644
index 0000000..a2bc53e
--- /dev/null
+++ b/.test-infra/jenkins/job_PostCommit_PortableJar_Flink.groovy
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as commonJobProperties
+import PostcommitJobBuilder
+
+// Tests creation and execution of portable pipeline Jars on the Flink runner.
+PostcommitJobBuilder.postCommitJob('beam_PostCommit_PortableJar_Flink',
+ 'Run PortableJar_Flink PostCommit', 'Flink Portable Jar Tests', this) {
+ description('Tests creation and execution of portable pipeline Jars on the Flink runner.')
+
+ // Set common parameters.
+ commonJobProperties.setTopLevelMainJobProperties(delegate)
+
+ // Gradle goals for this job.
+ steps {
+ gradle {
+ rootBuildScriptDir(commonJobProperties.checkoutDir)
+ tasks(':runners:flink:1.8:job-server:testPipelineJar')
+ commonJobProperties.setGradleSwitches(delegate)
+ }
+ }
+}
diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle
index 0f555fb..3789007 100644
--- a/runners/flink/job-server/flink_job_server.gradle
+++ b/runners/flink/job-server/flink_job_server.gradle
@@ -177,3 +177,20 @@ project.ext.validatesCrossLanguageRunner = createCrossLanguageValidatesRunnerTas
"--shutdownSourcesOnFinalWatermark",
]
)
+
+task testPipelineJar() {
+ dependsOn shadowJar
+ dependsOn ":sdks:python:container:py35:docker"
+ doLast{
+ exec {
+ executable "sh"
+ def options = [
+ "--flink_job_server_jar ${shadowJar.archivePath}",
+ "--env_dir ${project.rootProject.buildDir}/gradleenv/${project.path.hashCode()}",
+ "--python_root_dir ${project.rootDir}/sdks/python",
+ "--python_version 3.5"
+ ]
+ args "-c", "../../job-server/test_pipeline_jar.sh ${options.join(' ')}"
+ }
+ }
+}
diff --git a/runners/flink/job-server/test_pipeline_jar.sh b/runners/flink/job-server/test_pipeline_jar.sh
new file mode 100755
index 0000000..c59facf
--- /dev/null
+++ b/runners/flink/job-server/test_pipeline_jar.sh
@@ -0,0 +1,121 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set -e
+set -v
+
+while [[ $# -gt 0 ]]
+do
+key="$1"
+case $key in
+ --flink_job_server_jar)
+ FLINK_JOB_SERVER_JAR="$2"
+ shift # past argument
+ shift # past value
+ ;;
+ --env_dir)
+ ENV_DIR="$2"
+ shift # past argument
+ shift # past value
+ ;;
+ --python_root_dir)
+ PYTHON_ROOT_DIR="$2"
+ shift # past argument
+ shift # past value
+ ;;
+ --python_version)
+ PYTHON_VERSION="$2"
+ shift # past argument
+ shift # past value
+ ;;
+ *) # unknown option
+ echo "Unknown option: $1"
+ exit 1
+ ;;
+esac
+done
+
+# Go to the root of the repository
+cd $(git rev-parse --show-toplevel)
+
+# Verify docker command exists
+command -v docker
+docker -v
+
+CONTAINER=$USER-docker-apache.bintray.io/beam/python$PYTHON_VERSION
+TAG=latest
+# Verify container has already been built
+docker images $CONTAINER:$TAG | grep $TAG
+
+# Set up Python environment
+virtualenv -p python$PYTHON_VERSION $ENV_DIR
+. $ENV_DIR/bin/activate
+pip install --retries 10 -e $PYTHON_ROOT_DIR
+
+PIPELINE_PY="
+import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.transforms import Create
+from apache_beam.transforms import Map
+
+# To test that our main session is getting plumbed through artifact staging
+# correctly, create a global variable. If the main session is not plumbed
+# through properly, global_var will be undefined and the pipeline will fail.
+global_var = 1
+
+pipeline_options = PipelineOptions()
+pipeline_options.view_as(SetupOptions).save_main_session = True
+pipeline = beam.Pipeline(options=pipeline_options)
+pcoll = (pipeline
+ | Create([0, 1, 2])
+ | Map(lambda x: x + global_var))
+assert_that(pcoll, equal_to([1, 2, 3]))
+
+result = pipeline.run()
+result.wait_until_finish()
+"
+
+# Create the jar
+OUTPUT_JAR=flink-test-$(date +%Y%m%d-%H%M%S).jar
+(python -c "$PIPELINE_PY" \
+ --runner FlinkRunner \
+ --flink_job_server_jar $FLINK_JOB_SERVER_JAR \
+ --output_executable_path $OUTPUT_JAR \
+ --parallelism 1 \
+ --sdk_worker_parallelism 1 \
+ --environment_type DOCKER \
+ --environment_config=$CONTAINER:$TAG \
+) || TEST_EXIT_CODE=$? # don't fail fast here; clean up before exiting
+
+if [[ "$TEST_EXIT_CODE" -eq 0 ]]; then
+ # Execute the jar
+ java -jar $OUTPUT_JAR || TEST_EXIT_CODE=$?
+fi
+
+rm -rf $ENV_DIR
+rm -f $OUTPUT_JAR
+
+if [[ "$TEST_EXIT_CODE" -eq 0 ]]; then
+ echo ">>> SUCCESS"
+else
+ echo ">>> FAILURE"
+fi
+exit $TEST_EXIT_CODE
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
index e61cb12..f33af5c0 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -17,25 +17,37 @@
*/
package org.apache.beam.runners.flink;
+import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.hasUnboundedPCollections;
+import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
import org.apache.beam.runners.core.construction.graph.PipelineTrimmer;
import org.apache.beam.runners.core.metrics.MetricsPusher;
+import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineJarUtils;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineResult;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.client.program.DetachedEnvironment;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -120,4 +132,66 @@ public class FlinkPipelineRunner implements PortablePipelineRunner {
return flinkRunnerResult;
}
}
+
+ /**
+ * Main method to be called only as the entry point to an executable jar with structure as defined
+ * in {@link PortablePipelineJarUtils}.
+ */
+ public static void main(String[] args) throws Exception {
+ // Register standard file systems.
+ FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
+
+ FlinkPipelineRunnerConfiguration configuration = parseArgs(args);
+ Pipeline pipeline = PortablePipelineJarUtils.getPipelineFromClasspath();
+ Struct options = PortablePipelineJarUtils.getPipelineOptionsFromClasspath();
+ FlinkPipelineOptions flinkOptions =
+ PipelineOptionsTranslation.fromProto(options).as(FlinkPipelineOptions.class);
+ String invocationId =
+ String.format("%s_%s", flinkOptions.getJobName(), UUID.randomUUID().toString());
+ ProxyManifest proxyManifest = PortablePipelineJarUtils.getArtifactManifestFromClassPath();
+ String retrievalToken =
+ PortablePipelineJarUtils.stageArtifacts(
+ proxyManifest, flinkOptions, invocationId, configuration.artifactStagingPath);
+
+ FlinkPipelineRunner runner =
+ new FlinkPipelineRunner(
+ flinkOptions,
+ configuration.flinkConfDir,
+ detectClassPathResourcesToStage(FlinkPipelineRunner.class.getClassLoader()));
+ JobInfo jobInfo =
+ JobInfo.create(invocationId, flinkOptions.getJobName(), retrievalToken, options);
+ try {
+ runner.run(pipeline, jobInfo);
+ } catch (Exception e) {
+ throw new RuntimeException(String.format("Job %s failed.", invocationId), e);
+ }
+ LOG.info("Job {} finished successfully.", invocationId);
+ }
+
+ private static class FlinkPipelineRunnerConfiguration {
+ @Option(name = "--artifacts-dir", usage = "The location to store staged artifact files")
+ private String artifactStagingPath =
+ Paths.get(System.getProperty("java.io.tmpdir"), "beam-artifact-staging").toString();
+
+ @Option(
+ name = "--flink-conf-dir",
+ usage =
+ "Directory containing Flink YAML configuration files. "
+ + "These properties will be set to all jobs submitted to Flink and take precedence "
+ + "over configurations in FLINK_CONF_DIR.")
+ private String flinkConfDir = null;
+ }
+
+ private static FlinkPipelineRunnerConfiguration parseArgs(String[] args) {
+ FlinkPipelineRunnerConfiguration configuration = new FlinkPipelineRunnerConfiguration();
+ CmdLineParser parser = new CmdLineParser(configuration);
+ try {
+ parser.parseArgument(args);
+ } catch (CmdLineException e) {
+ LOG.error("Unable to parse command line arguments.", e);
+ parser.printUsage(System.err);
+ throw new IllegalArgumentException("Unable to parse command line arguments.", e);
+ }
+ return configuration;
+ }
}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineJarUtils.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineJarUtils.java
index af712ad..5045f3b 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineJarUtils.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineJarUtils.java
@@ -17,6 +17,38 @@
*/
package org.apache.beam.runners.fnexecution.jobsubmission;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest.Location;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.runners.core.construction.ArtifactServiceStager;
+import org.apache.beam.runners.core.construction.ArtifactServiceStager.StagedFile;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.InProcessServerFactory;
+import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Message.Builder;
+import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct;
+import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.util.JsonFormat;
+import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
+import org.apache.commons.compress.utils.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Contains common code for writing and reading portable pipeline jars.
*
@@ -51,4 +83,102 @@ public abstract class PortablePipelineJarUtils {
ARTIFACT_STAGING_FOLDER_PATH + "/artifact-manifest.json";
static final String PIPELINE_PATH = PIPELINE_FOLDER_PATH + "/pipeline.json";
static final String PIPELINE_OPTIONS_PATH = PIPELINE_FOLDER_PATH + "/pipeline-options.json";
+
+ private static final Logger LOG = LoggerFactory.getLogger(PortablePipelineJarCreator.class);
+
+ private static InputStream getResourceFromClassPath(String resourcePath) throws IOException {
+ InputStream inputStream = PortablePipelineJarUtils.class.getResourceAsStream(resourcePath);
+ if (inputStream == null) {
+ throw new FileNotFoundException(
+ String.format("Resource %s not found on classpath.", resourcePath));
+ }
+ return inputStream;
+ }
+
+ /** Populates {@code builder} using the JSON resource specified by {@code resourcePath}. */
+ private static void parseJsonResource(String resourcePath, Builder builder) throws IOException {
+ try (InputStream inputStream = getResourceFromClassPath(resourcePath)) {
+ String contents = new String(ByteStreams.toByteArray(inputStream), StandardCharsets.UTF_8);
+ JsonFormat.parser().merge(contents, builder);
+ }
+ }
+
+ public static Pipeline getPipelineFromClasspath() throws IOException {
+ Pipeline.Builder builder = Pipeline.newBuilder();
+ parseJsonResource("/" + PIPELINE_PATH, builder);
+ return builder.build();
+ }
+
+ public static Struct getPipelineOptionsFromClasspath() throws IOException {
+ Struct.Builder builder = Struct.newBuilder();
+ parseJsonResource("/" + PIPELINE_OPTIONS_PATH, builder);
+ return builder.build();
+ }
+
+ public static ProxyManifest getArtifactManifestFromClassPath() throws IOException {
+ ProxyManifest.Builder builder = ProxyManifest.newBuilder();
+ parseJsonResource("/" + ARTIFACT_MANIFEST_PATH, builder);
+ return builder.build();
+ }
+
+ /** Writes artifacts listed in {@code proxyManifest}. */
+ public static String stageArtifacts(
+ ProxyManifest proxyManifest,
+ PipelineOptions options,
+ String invocationId,
+ String artifactStagingPath)
+ throws Exception {
+ Collection<StagedFile> filesToStage =
+ prepareArtifactsForStaging(proxyManifest, options, invocationId);
+ try (GrpcFnServer artifactServer =
+ GrpcFnServer.allocatePortAndCreateFor(
+ new BeamFileSystemArtifactStagingService(), InProcessServerFactory.create())) {
+ ManagedChannel grpcChannel =
+ InProcessManagedChannelFactory.create()
+ .forDescriptor(artifactServer.getApiServiceDescriptor());
+ ArtifactServiceStager stager = ArtifactServiceStager.overChannel(grpcChannel);
+ String stagingSessionToken =
+ BeamFileSystemArtifactStagingService.generateStagingSessionToken(
+ invocationId, artifactStagingPath);
+ String retrievalToken = stager.stage(stagingSessionToken, filesToStage);
+ // Clean up.
+ for (StagedFile file : filesToStage) {
+ if (!file.getFile().delete()) {
+ LOG.warn("Failed to delete file {}", file.getFile());
+ }
+ }
+ grpcChannel.shutdown();
+ return retrievalToken;
+ }
+ }
+
+ /**
+ * Artifacts are expected to exist as resources on the classpath, located using {@code
+ * proxyManifest}. Write them to tmp files so they can be staged.
+ */
+ private static Collection<StagedFile> prepareArtifactsForStaging(
+ ProxyManifest proxyManifest, PipelineOptions options, String invocationId)
+ throws IOException {
+ List<StagedFile> filesToStage = new ArrayList<>();
+ Path outputFolderPath =
+ Paths.get(
+ MoreObjects.firstNonNull(
+ options.getTempLocation(), System.getProperty("java.io.tmpdir")),
+ invocationId);
+ if (!outputFolderPath.toFile().mkdir()) {
+ throw new IOException("Failed to create folder " + outputFolderPath);
+ }
+ for (Location location : proxyManifest.getLocationList()) {
+ try (InputStream inputStream = getResourceFromClassPath(location.getUri())) {
+ Path outputPath = outputFolderPath.resolve(UUID.randomUUID().toString());
+ LOG.trace("Writing artifact {} to file {}", location.getName(), outputPath);
+ File file = outputPath.toFile();
+ try (FileOutputStream outputStream = new FileOutputStream(file)) {
+ IOUtils.copy(inputStream, outputStream);
+ filesToStage.add(StagedFile.of(file, location.getName()));
+ }
+ }
+ }
+ return filesToStage;
+ }
}