You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/27 16:57:21 UTC

[GitHub] [beam] robertwb commented on a change in pull request #11792: WIP: Add ValidatesRunner task for local_job_service and Java SDK harness

robertwb commented on a change in pull request #11792:
URL: https://github.com/apache/beam/pull/11792#discussion_r430611411



##########
File path: runners/portability/java/build.gradle
##########
@@ -1,3 +1,13 @@
+import groovy.json.JsonOutput
+
+import java.nio.file.FileSystems

Review comment:
       You shouldn't have to watch, when the `--background` flag is set it waits for the service to be up before terminating. 

##########
File path: runners/portability/java/build.gradle
##########
@@ -31,9 +45,123 @@ dependencies {
   compile project(path: ":sdks:java:harness", configuration: "shadow")
   compile library.java.vendored_grpc_1_26_0
   compile library.java.slf4j_api
+
   testCompile project(path: ":runners:core-construction-java", configuration: "testRuntime")
   testCompile library.java.hamcrest_core
   testCompile library.java.junit
   testCompile library.java.mockito_core
   testCompile library.java.slf4j_jdk14
+
+  validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
+  validatesRunner project(path: ":runners:core-java", configuration: "testRuntime")
+  validatesRunner project(path: project.path, configuration: "testRuntime")
+}
+
+
+project.evaluationDependsOn(":sdks:java:core")
+project.evaluationDependsOn(":runners:core-java")
+
+ext.virtualenvDir = "${project.buildDir}/virtualenv"
+ext.localJobServicePidFile = "${project.buildDir}/local_job_service_pid"
+ext.localJobServicePortFile = project.hasProperty("localJobServicePortFile") ? project.property("localJobServicePortFile") : "${project.buildDir}/local_job_service_port"
+ext.localJobServiceStdoutFile = "${project.buildDir}/local_job_service_stdout"
+
+ext.pythonSdkDir = "${project.rootDir}/sdks/python"
+
+void execInVirtualenv(String... args) {
+  String shellCommand = ". ${virtualenvDir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
+  exec {
+    workingDir pythonSdkDir
+    commandLine "sh", "-c", shellCommand
+  }
 }
+
+void execBackgroundInVirtualenv(String... args) {
+  String shellCommand = ". ${virtualenvDir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
+  println "execBackgroundInVirtualEnv: ${shellCommand}"
+  ProcessBuilder pb = new ProcessBuilder().redirectErrorStream(true).directory(new File(pythonSdkDir)).command(["sh", "-c", shellCommand])
+  Process proc = pb.start();
+
+  // redirectIO does not work for connecting to groovy/gradle stdout
+  BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+  String line
+  while ((line = reader.readLine()) != null) {
+    println line
+  }
+  proc.waitFor();
+}
+
+task virtualenv {

Review comment:
       Can we use the existing virtualenv tasks we have? 

##########
File path: runners/portability/java/build.gradle
##########
@@ -31,9 +45,123 @@ dependencies {
   compile project(path: ":sdks:java:harness", configuration: "shadow")
   compile library.java.vendored_grpc_1_26_0
   compile library.java.slf4j_api
+
   testCompile project(path: ":runners:core-construction-java", configuration: "testRuntime")
   testCompile library.java.hamcrest_core
   testCompile library.java.junit
   testCompile library.java.mockito_core
   testCompile library.java.slf4j_jdk14
+
+  validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
+  validatesRunner project(path: ":runners:core-java", configuration: "testRuntime")
+  validatesRunner project(path: project.path, configuration: "testRuntime")
+}
+
+
+project.evaluationDependsOn(":sdks:java:core")
+project.evaluationDependsOn(":runners:core-java")
+
+ext.virtualenvDir = "${project.buildDir}/virtualenv"
+ext.localJobServicePidFile = "${project.buildDir}/local_job_service_pid"
+ext.localJobServicePortFile = project.hasProperty("localJobServicePortFile") ? project.property("localJobServicePortFile") : "${project.buildDir}/local_job_service_port"
+ext.localJobServiceStdoutFile = "${project.buildDir}/local_job_service_stdout"
+
+ext.pythonSdkDir = "${project.rootDir}/sdks/python"
+
+void execInVirtualenv(String... args) {
+  String shellCommand = ". ${virtualenvDir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
+  exec {
+    workingDir pythonSdkDir
+    commandLine "sh", "-c", shellCommand
+  }
 }
+
+void execBackgroundInVirtualenv(String... args) {
+  String shellCommand = ". ${virtualenvDir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
+  println "execBackgroundInVirtualEnv: ${shellCommand}"
+  ProcessBuilder pb = new ProcessBuilder().redirectErrorStream(true).directory(new File(pythonSdkDir)).command(["sh", "-c", shellCommand])
+  Process proc = pb.start();
+
+  // redirectIO does not work for connecting to groovy/gradle stdout
+  BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+  String line
+  while ((line = reader.readLine()) != null) {
+    println line
+  }
+  proc.waitFor();
+}
+
+task virtualenv {
+  doLast {
+    exec {
+      commandLine "virtualenv", virtualenvDir, "--python=python3"
+    }
+    execInVirtualenv "pip", "install", "--retries", "10", "--upgrade", "tox==3.11.1", "--requirement", "${project.rootDir}/sdks/python/build-requirements.txt"
+    execInVirtualenv "python", "setup.py", "build", "--build-base=${buildDir}"
+    execInVirtualenv "pip", "install", "-e", "."
+  }
+}
+
+task startLocalJobService {
+  dependsOn virtualenv
+
+  doLast {
+    execBackgroundInVirtualenv "python",
+        "-m", "apache_beam.runners.portability.local_job_service_main",
+        "--background",
+        "--stdout_file=${localJobServiceStdoutFile}",
+        "--pid_file=${localJobServicePidFile}",
+        "--port_file=${localJobServicePortFile}"
+
+    File pidFile = new File(localJobServicePidFile)
+    int totalSleep = 0
+    while (!pidFile.exists()) {
+      sleep(500)

Review comment:
       We shouldn't have to wait, once the above exec completes it should be there (or not). Does the above task error if the return code is non-zero?

##########
File path: sdks/python/apache_beam/runners/portability/local_job_service_main.py
##########
@@ -99,11 +105,23 @@ def run(argv):
       options.port_file = os.path.splitext(options.pid_file)[0] + '.port'
       argv.append('--port_file')
       argv.append(options.port_file)
+
+    if not options.stdout_file:
+      raise RuntimeError('--stdout_file must be specified with --background')
+    stdout_dest = open(options.stdout_file, mode='w')
+
+    if options.stderr_file:
+      stderr_dest=open(options.stderr_file, mode='w')
+    else:
+      stderr_dest=subprocess.STDOUT
+
     subprocess.Popen([
         sys.executable,
         '-m',
         'apache_beam.runners.portability.local_job_service_main'
-    ] + argv)
+    ] + argv,
+        stderr=stderr_dest,

Review comment:
       I'm not sure what happens here when this process exits (and possibly tries to close these files?). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org