You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2019/01/14 16:03:22 UTC

[beam] Diff for: [GitHub] tweise merged pull request #7461: [BEAM-6405] Let PortableValidatesRunner tests run in EMBEDDED environment

diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 4b45b17b8126..126b4c97dd2f 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -251,6 +251,8 @@ class BeamModulePlugin implements Plugin<Project> {
     Integer numParallelTests = 1
     // Extra options to pass to TestPipeline
     String[] pipelineOpts = []
+    // Spin up the Harness inside a DOCKER container
+    Environment environment = Environment.DOCKER
     // Categories for tests to run.
     Closure testCategories = {
       includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
@@ -260,6 +262,12 @@ class BeamModulePlugin implements Plugin<Project> {
     }
     // Configuration for the classpath when running the test.
     Configuration testClasspathConfiguration
+
+    enum Environment {
+      DOCKER,   // Docker-based Harness execution
+      PROCESS,  // Process-based Harness execution
+      EMBEDDED, // Execute directly inside the execution engine (testing only)
+    }
   }
 
   def isRelease(Project project) {
@@ -1489,6 +1497,9 @@ class BeamModulePlugin implements Plugin<Project> {
         "--environmentCacheMillis=10000"
       ]
       beamTestPipelineOptions.addAll(config.pipelineOpts)
+      if (config.environment == PortableValidatesRunnerConfiguration.Environment.EMBEDDED) {
+        beamTestPipelineOptions += "--defaultEnvironmentType=EMBEDDED"
+      }
       if (config.jobServerConfig) {
         beamTestPipelineOptions.add("--jobServerConfig=${config.jobServerConfig}")
       }
@@ -1500,7 +1511,9 @@ class BeamModulePlugin implements Plugin<Project> {
         testClassesDirs = project.files(project.project(":beam-sdks-java-core").sourceSets.test.output.classesDirs, project.project(":beam-runners-core-java").sourceSets.test.output.classesDirs)
         maxParallelForks config.numParallelTests
         useJUnit(config.testCategories)
-        dependsOn ':beam-sdks-java-container:docker'
+        if (config.environment == PortableValidatesRunnerConfiguration.Environment.DOCKER) {
+          dependsOn ':beam-sdks-java-container:docker'
+        }
       }
     }
 
diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle
index f94b238b0ed0..0611969c1e6b 100644
--- a/runners/flink/job-server/flink_job_server.gradle
+++ b/runners/flink/job-server/flink_job_server.gradle
@@ -1,3 +1,5 @@
+import org.apache.beam.gradle.BeamModulePlugin
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -106,7 +108,10 @@ runShadow {
 }
 
 def portableValidatesRunnerTask(String name, Boolean streaming) {
-  def pipelineOptions = ["--parallelism=2"]
+  def pipelineOptions = [
+      // Limit resource consumption via parallelism
+      "--parallelism=2",
+  ]
   if (streaming) {
     pipelineOptions += "--streaming"
   }
@@ -117,6 +122,7 @@ def portableValidatesRunnerTask(String name, Boolean streaming) {
     testClasspathConfiguration: configurations.validatesPortableRunner,
       numParallelTests: 2,
       pipelineOpts: pipelineOptions,
+      environment: BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED,
       testCategories: {
         includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
         excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
index adde82208727..9b9dcf33379b 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
@@ -56,7 +56,8 @@ private static FlinkDefaultExecutableStageContext create(JobInfo jobInfo) {
                 BeamUrns.getUrn(StandardEnvironments.Environments.EXTERNAL),
                 new ExternalEnvironmentFactory.Provider(),
                 Environments.ENVIRONMENT_EMBEDDED, // Non Public urn for testing.
-                new EmbeddedEnvironmentFactory.Provider()));
+                new EmbeddedEnvironmentFactory.Provider(
+                    PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions()))));
     return new FlinkDefaultExecutableStageContext(jobBundleFactory);
   }
 
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
index b3a57a03235d..53284f22191b 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
@@ -40,7 +40,6 @@
 import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
 import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -132,6 +131,12 @@ public RemoteEnvironment createEnvironment(Environment environment) throws Excep
   /** Provider of EmbeddedEnvironmentFactory. */
   public static class Provider implements EnvironmentFactory.Provider {
 
+    private final PipelineOptions pipelineOptions;
+
+    public Provider(PipelineOptions pipelineOptions) {
+      this.pipelineOptions = pipelineOptions;
+    }
+
     @Override
     public EnvironmentFactory createEnvironmentFactory(
         GrpcFnServer<FnApiControlClientPoolService> controlServer,
@@ -141,7 +146,7 @@ public EnvironmentFactory createEnvironmentFactory(
         ControlClientPool clientPool,
         IdGenerator idGenerator) {
       return EmbeddedEnvironmentFactory.create(
-          PipelineOptionsFactory.create(), loggingServer, controlServer, clientPool.getSource());
+          pipelineOptions, loggingServer, controlServer, clientPool.getSource());
     }
 
     @Override


With regards,
Apache Git Services