You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2020/06/23 23:57:55 UTC

[beam] branch master updated: [BEAM-9872] Moved Spark validates tests to shared file (#12002)

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 858f0b9  [BEAM-9872] Moved Spark validates tests to shared file (#12002)
858f0b9 is described below

commit 858f0b9b0e55f3ea66429dc36721d9f3ab17f084
Author: annaqin418 <an...@google.com>
AuthorDate: Tue Jun 23 19:57:38 2020 -0400

    [BEAM-9872] Moved Spark validates tests to shared file (#12002)
    
    * [BEAM-9872] Moved Spark validates tests to shared file
    
    * combine python versions in wrapper task
    
    * spark test only python 3
    
    * changed env type
    
    * changed env config
    
    * changed environment_config
    
    * wrapper task in beam/build
    
    * update groovy file to run wrapper
---
 ..._PostCommit_Python_ValidatesRunner_Spark.groovy |   2 +-
 build.gradle                                       |   7 ++
 sdks/python/test-suites/portable/common.gradle     | 109 +++++++++++++++++----
 sdks/python/test-suites/portable/py2/build.gradle  |  48 ---------
 4 files changed, 98 insertions(+), 68 deletions(-)

diff --git a/.test-infra/jenkins/job_PostCommit_Python_ValidatesRunner_Spark.groovy b/.test-infra/jenkins/job_PostCommit_Python_ValidatesRunner_Spark.groovy
index 59b45f7..1aad6d1 100644
--- a/.test-infra/jenkins/job_PostCommit_Python_ValidatesRunner_Spark.groovy
+++ b/.test-infra/jenkins/job_PostCommit_Python_ValidatesRunner_Spark.groovy
@@ -31,7 +31,7 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_VR_Spark',
   steps {
     gradle {
       rootBuildScriptDir(commonJobProperties.checkoutDir)
-      tasks(':sdks:python:test-suites:portable:py2:sparkValidatesRunner')
+      tasks(':pythonSparkPostCommit')
       commonJobProperties.setGradleSwitches(delegate)
     }
   }
diff --git a/build.gradle b/build.gradle
index 489a9d9..3ee0eac 100644
--- a/build.gradle
+++ b/build.gradle
@@ -280,6 +280,13 @@ task portablePythonPreCommit() {
   dependsOn ":sdks:python:test-suites:portable:py37:preCommitPy37"
 }
 
+task pythonSparkPostCommit() {
+  dependsOn ":sdks:python:test-suites:portable:py2:sparkValidatesRunner"
+  dependsOn ":sdks:python:test-suites:portable:py35:sparkValidatesRunner"
+  dependsOn ":sdks:python:test-suites:portable:py36:sparkValidatesRunner"
+  dependsOn ":sdks:python:test-suites:portable:py37:sparkValidatesRunner"
+}
+
 task websitePreCommit() {
   dependsOn ":website:preCommit"
 }
diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle
index 0d692b2..a29bdb4 100644
--- a/sdks/python/test-suites/portable/common.gradle
+++ b/sdks/python/test-suites/portable/common.gradle
@@ -1,3 +1,5 @@
+import org.apache.tools.ant.taskdefs.condition.Os
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -110,11 +112,11 @@ task crossLanguagePythonJavaKafkaIOFlink {
   doLast {
     def kafkaJar = project(":sdks:java:testing:kafka-service:").buildTestKafkaServiceJar.archivePath
     def options = [
-        "--runner=FlinkRunner",
-        "--parallelism=2",
-        "--environment_type=DOCKER",
-        "--environment_cache_millis=10000",
-        "--experiment=beam_fn_api",
+            "--runner=FlinkRunner",
+            "--parallelism=2",
+            "--environment_type=DOCKER",
+            "--environment_cache_millis=10000",
+            "--experiment=beam_fn_api",
     ]
     exec {
       environment "LOCAL_KAFKA_JAR", kafkaJar
@@ -131,34 +133,103 @@ task crossLanguagePythonJavaKafkaIOFlink {
   }
 }
 
+task createProcessWorker {
+  dependsOn ':sdks:python:container:build'
+  dependsOn 'setupVirtualenv'
+  def sdkWorkerFile = file("${buildDir}/sdk_worker.sh")
+  def osType = 'linux'
+  if (Os.isFamily(Os.FAMILY_MAC))
+    osType = 'darwin'
+  def workerScript = "${project(":sdks:python:container:").buildDir.absolutePath}/target/launcher/${osType}_amd64/boot"
+  def sdkWorkerFileCode = "sh -c \"pip=`which pip` . ${envdir}/bin/activate && ${workerScript} \$* \""
+  outputs.file sdkWorkerFile
+  doLast {
+    sdkWorkerFile.write sdkWorkerFileCode
+    exec {
+      commandLine('sh', '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test]")
+    }
+    exec {
+      commandLine('chmod', '+x', sdkWorkerFile)
+    }
+  }
+}
+
+def sparkCompatibilityMatrix = {
+  def config = it ? it as CompatibilityMatrixConfig : new CompatibilityMatrixConfig()
+  def workerType = config.workerType.name()
+  def streaming = config.streaming
+  def environment_config = config.workerType == CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS ? "--environment_config='{\"command\": \"${buildDir.absolutePath}/sdk_worker.sh\"}'" : ""
+  def name = "sparkCompatibilityMatrix${streaming ? 'Streaming' : 'Batch'}${config.preOptimize ? 'PreOptimize' : ''}${workerType}"
+  tasks.create(name: name) {
+    dependsOn 'createProcessWorker'
+    dependsOn 'setupVirtualenv'
+    dependsOn ':runners:spark:job-server:shadowJar'
+    doLast {
+      def argMap = [
+              "environment_type"    : workerType,
+              "spark_job_server_jar": project(":runners:spark:job-server:").shadowJar.archivePath,
+              "environment_cache_millis": 10000,
+      ]
+      def argString = mapToArgString(argMap)
+
+      // Optionally specify test function names separated by space e.g.:
+      // ./gradlew :sdks:python:test-suites:portable:py2:sparkValidatesRunner -Ptests="test_external_transforms test_read"
+      // Otherwise run all test functions under SparkRunnerTest
+      def tests = project.hasProperty('tests') ?
+              project.property('tests').split().collect{ "SparkRunnerTest.$it" }.join(' ') : ''
+
+      exec {
+        executable 'sh'
+        args '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test] && python -m apache_beam.runners.portability.spark_runner_test $tests $argString ${environment_config}"
+      }
+    }
+  }
+}
+
+task sparkCompatibilityMatrixDocker() {
+  dependsOn sparkCompatibilityMatrix(streaming: false)
+}
+
+task sparkCompatibilityMatrixProcess() {
+  dependsOn sparkCompatibilityMatrix(streaming: false, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS)
+}
+
+task sparkCompatibilityMatrixLoopback() {
+  dependsOn sparkCompatibilityMatrix(streaming: false, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK)
+}
+
+task sparkValidatesRunner() {
+  dependsOn 'sparkCompatibilityMatrixLoopback'
+}
+
 project.task("preCommitPy${pythonVersionSuffix}") {
-      dependsOn = [":sdks:python:container:py${pythonVersionSuffix}:docker",
-      ':runners:flink:1.10:job-server:shadowJar',
-      'portableWordCountFlinkRunnerBatch',
-      'portableWordCountFlinkRunnerStreaming']
+  dependsOn = [":sdks:python:container:py${pythonVersionSuffix}:docker",
+               ':runners:flink:1.10:job-server:shadowJar',
+               'portableWordCountFlinkRunnerBatch',
+               'portableWordCountFlinkRunnerStreaming']
 }
 
 project.task("postCommitPy${pythonVersionSuffix}") {
-      dependsOn = ['setupVirtualenv',
-      "postCommitPy${pythonVersionSuffix}IT",
-      ':runners:spark:job-server:shadowJar',
-      'portableWordCountSparkRunnerBatch']
+  dependsOn = ['setupVirtualenv',
+               "postCommitPy${pythonVersionSuffix}IT",
+               ':runners:spark:job-server:shadowJar',
+               'portableWordCountSparkRunnerBatch']
 }
 
 project.task("postCommitPy${pythonVersionSuffix}IT") {
   dependsOn = ['setupVirtualenv',
-  'installGcpTest',
-  ':runners:flink:1.10:job-server:shadowJar']
+               'installGcpTest',
+               ':runners:flink:1.10:job-server:shadowJar']
 
   doLast {
     def tests = [
-        "apache_beam.io.gcp.bigquery_read_it_test",
+            "apache_beam.io.gcp.bigquery_read_it_test",
     ]
     def testOpts = ["--tests=${tests.join(',')}"]
     def cmdArgs = mapToArgString([
-        "test_opts": testOpts,
-        "suite": "postCommitIT-flink-py${pythonVersionSuffix}",
-        "pipeline_opts": "--runner=FlinkRunner --project=apache-beam-testing --environment_type=LOOPBACK --temp_location=gs://temp-storage-for-end-to-end-tests/temp-it",
+            "test_opts": testOpts,
+            "suite": "postCommitIT-flink-py${pythonVersionSuffix}",
+            "pipeline_opts": "--runner=FlinkRunner --project=apache-beam-testing --environment_type=LOOPBACK --temp_location=gs://temp-storage-for-end-to-end-tests/temp-it",
     ])
     exec {
       executable 'sh'
diff --git a/sdks/python/test-suites/portable/py2/build.gradle b/sdks/python/test-suites/portable/py2/build.gradle
index 70373fd..2d9090a 100644
--- a/sdks/python/test-suites/portable/py2/build.gradle
+++ b/sdks/python/test-suites/portable/py2/build.gradle
@@ -132,52 +132,4 @@ task chicagoTaxiExample {
 
 /*************************************************************************************************/
 
-task createProcessWorker {
-  dependsOn ':sdks:python:container:build'
-  dependsOn 'setupVirtualenv'
-  def sdkWorkerFile = file("${buildDir}/sdk_worker.sh")
-  def osType = 'linux'
-  if (Os.isFamily(Os.FAMILY_MAC))
-    osType = 'darwin'
-  def workerScript = "${project(":sdks:python:container:").buildDir.absolutePath}/target/launcher/${osType}_amd64/boot"
-  def sdkWorkerFileCode = "sh -c \"pip=`which pip` . ${envdir}/bin/activate && ${workerScript} \$* \""
-  outputs.file sdkWorkerFile
-  doLast {
-    sdkWorkerFile.write sdkWorkerFileCode
-    exec {
-      commandLine('sh', '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test]")
-    }
-    exec {
-      commandLine('chmod', '+x', sdkWorkerFile)
-    }
-  }
-}
-
-task sparkValidatesRunner() {
-  dependsOn 'createProcessWorker'
-  dependsOn 'setupVirtualenv'
-  dependsOn ':runners:spark:job-server:shadowJar'
-  doLast {
-    def environment_config = "'{\"command\": \"${buildDir.absolutePath}/sdk_worker.sh\"}'"
-    def argMap = [
-        "environment_type"    : "PROCESS",
-        "spark_job_server_jar": project(":runners:spark:job-server:").shadowJar.archivePath,
-        "environment_config": environment_config,
-        "environment_cache_millis": 10000,
-    ]
-    def argString = mapToArgString(argMap)
-
-    // Optionally specify test function names separated by space e.g.:
-    // ./gradlew :sdks:python:test-suites:portable:py2:sparkValidatesRunner -Ptests="test_external_transforms test_read"
-    // Otherwise run all test functions under SparkRunnerTest
-    def tests = project.hasProperty('tests') ?
-        project.property('tests').split().collect{ "SparkRunnerTest.$it" }.join(' ') : ''
-
-    exec {
-      executable 'sh'
-      args '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test] && python -m apache_beam.runners.portability.spark_runner_test $tests $argString"
-    }
-  }
-}
-
 apply from: "../common.gradle"