You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2019/01/14 16:03:22 UTC
[beam] Diff for: [GitHub] tweise merged pull request #7461: [BEAM-6405] Let
PortableValidatesRunner tests run in EMBEDDED environment
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 4b45b17b8126..126b4c97dd2f 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -251,6 +251,8 @@ class BeamModulePlugin implements Plugin<Project> {
Integer numParallelTests = 1
// Extra options to pass to TestPipeline
String[] pipelineOpts = []
+ // Spin up the Harness inside a DOCKER container
+ Environment environment = Environment.DOCKER
// Categories for tests to run.
Closure testCategories = {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
@@ -260,6 +262,12 @@ class BeamModulePlugin implements Plugin<Project> {
}
// Configuration for the classpath when running the test.
Configuration testClasspathConfiguration
+
+ enum Environment {
+ DOCKER, // Docker-based Harness execution
+ PROCESS, // Process-based Harness execution
+ EMBEDDED, // Execute directly inside the execution engine (testing only)
+ }
}
def isRelease(Project project) {
@@ -1489,6 +1497,9 @@ class BeamModulePlugin implements Plugin<Project> {
"--environmentCacheMillis=10000"
]
beamTestPipelineOptions.addAll(config.pipelineOpts)
+ if (config.environment == PortableValidatesRunnerConfiguration.Environment.EMBEDDED) {
+ beamTestPipelineOptions += "--defaultEnvironmentType=EMBEDDED"
+ }
if (config.jobServerConfig) {
beamTestPipelineOptions.add("--jobServerConfig=${config.jobServerConfig}")
}
@@ -1500,7 +1511,9 @@ class BeamModulePlugin implements Plugin<Project> {
testClassesDirs = project.files(project.project(":beam-sdks-java-core").sourceSets.test.output.classesDirs, project.project(":beam-runners-core-java").sourceSets.test.output.classesDirs)
maxParallelForks config.numParallelTests
useJUnit(config.testCategories)
- dependsOn ':beam-sdks-java-container:docker'
+ if (config.environment == PortableValidatesRunnerConfiguration.Environment.DOCKER) {
+ dependsOn ':beam-sdks-java-container:docker'
+ }
}
}
diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle
index f94b238b0ed0..0611969c1e6b 100644
--- a/runners/flink/job-server/flink_job_server.gradle
+++ b/runners/flink/job-server/flink_job_server.gradle
@@ -1,3 +1,5 @@
+import org.apache.beam.gradle.BeamModulePlugin
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -106,7 +108,10 @@ runShadow {
}
def portableValidatesRunnerTask(String name, Boolean streaming) {
- def pipelineOptions = ["--parallelism=2"]
+ def pipelineOptions = [
+ // Limit resource consumption via parallelism
+ "--parallelism=2",
+ ]
if (streaming) {
pipelineOptions += "--streaming"
}
@@ -117,6 +122,7 @@ def portableValidatesRunnerTask(String name, Boolean streaming) {
testClasspathConfiguration: configurations.validatesPortableRunner,
numParallelTests: 2,
pipelineOpts: pipelineOptions,
+ environment: BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED,
testCategories: {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
index adde82208727..9b9dcf33379b 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
@@ -56,7 +56,8 @@ private static FlinkDefaultExecutableStageContext create(JobInfo jobInfo) {
BeamUrns.getUrn(StandardEnvironments.Environments.EXTERNAL),
new ExternalEnvironmentFactory.Provider(),
Environments.ENVIRONMENT_EMBEDDED, // Non Public urn for testing.
- new EmbeddedEnvironmentFactory.Provider()));
+ new EmbeddedEnvironmentFactory.Provider(
+ PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions()))));
return new FlinkDefaultExecutableStageContext(jobBundleFactory);
}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
index b3a57a03235d..53284f22191b 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
@@ -40,7 +40,6 @@
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -132,6 +131,12 @@ public RemoteEnvironment createEnvironment(Environment environment) throws Excep
/** Provider of EmbeddedEnvironmentFactory. */
public static class Provider implements EnvironmentFactory.Provider {
+ private final PipelineOptions pipelineOptions;
+
+ public Provider(PipelineOptions pipelineOptions) {
+ this.pipelineOptions = pipelineOptions;
+ }
+
@Override
public EnvironmentFactory createEnvironmentFactory(
GrpcFnServer<FnApiControlClientPoolService> controlServer,
@@ -141,7 +146,7 @@ public EnvironmentFactory createEnvironmentFactory(
ControlClientPool clientPool,
IdGenerator idGenerator) {
return EmbeddedEnvironmentFactory.create(
- PipelineOptionsFactory.create(), loggingServer, controlServer, clientPool.getSource());
+ pipelineOptions, loggingServer, controlServer, clientPool.getSource());
}
@Override
With regards,
Apache Git Services