You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/22 04:02:43 UTC

[GitHub] [beam] kennknowles commented on a change in pull request #11792: WIP: Add ValidatesRunner task for local_job_service and Java SDK harness

kennknowles commented on a change in pull request #11792:
URL: https://github.com/apache/beam/pull/11792#discussion_r429029786



##########
File path: runners/portability/java/build.gradle
##########
@@ -1,3 +1,13 @@
+import groovy.json.JsonOutput
+
+import java.nio.file.FileSystems

Review comment:
       TODO(me): remove these imports
   
   I first went through the "normal" route of using all this stuff to watch for the pid file but it was verbose and had race conditions. No point. Just check and sleep, now.

##########
File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactRetrievalService.java
##########
@@ -39,7 +39,7 @@
 public class ArtifactRetrievalService
     extends ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceImplBase implements FnService {
 
-  public static final int DEFAULT_BUFFER_SIZE = 4 << 20; // 4 MB
+  public static final int DEFAULT_BUFFER_SIZE = 2 << 20; // 2 MB

Review comment:
       @robertwb @lukecwik first thing I hit putting this together was exceeding message size limit

##########
File path: runners/portability/java/build.gradle
##########
@@ -31,9 +45,123 @@ dependencies {
   compile project(path: ":sdks:java:harness", configuration: "shadow")
   compile library.java.vendored_grpc_1_26_0
   compile library.java.slf4j_api
+
   testCompile project(path: ":runners:core-construction-java", configuration: "testRuntime")
   testCompile library.java.hamcrest_core
   testCompile library.java.junit
   testCompile library.java.mockito_core
   testCompile library.java.slf4j_jdk14
+
+  validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
+  validatesRunner project(path: ":runners:core-java", configuration: "testRuntime")
+  validatesRunner project(path: project.path, configuration: "testRuntime")
+}
+
+
+project.evaluationDependsOn(":sdks:java:core")
+project.evaluationDependsOn(":runners:core-java")
+
+ext.virtualenvDir = "${project.buildDir}/virtualenv"
+ext.localJobServicePidFile = "${project.buildDir}/local_job_service_pid"
+ext.localJobServicePortFile = project.hasProperty("localJobServicePortFile") ? project.property("localJobServicePortFile") : "${project.buildDir}/local_job_service_port"
+ext.localJobServiceStdoutFile = "${project.buildDir}/local_job_service_stdout"
+
+ext.pythonSdkDir = "${project.rootDir}/sdks/python"
+
+void execInVirtualenv(String... args) {
+  String shellCommand = ". ${virtualenvDir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
+  exec {
+    workingDir pythonSdkDir
+    commandLine "sh", "-c", shellCommand
+  }
 }
+
+void execBackgroundInVirtualenv(String... args) {
+  String shellCommand = ". ${virtualenvDir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
+  println "execBackgroundInVirtualEnv: ${shellCommand}"
+  ProcessBuilder pb = new ProcessBuilder().redirectErrorStream(true).directory(new File(pythonSdkDir)).command(["sh", "-c", shellCommand])
+  Process proc = pb.start();
+
+  // redirectIO does not work for connecting to groovy/gradle stdout
+  BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+  String line
+  while ((line = reader.readLine()) != null) {
+    println line
+  }
+  proc.waitFor();
+}
+
+task virtualenv {
+  doLast {
+    exec {
+      commandLine "virtualenv", virtualenvDir, "--python=python3"
+    }
+    execInVirtualenv "pip", "install", "--retries", "10", "--upgrade", "tox==3.11.1", "--requirement", "${project.rootDir}/sdks/python/build-requirements.txt"
+    execInVirtualenv "python", "setup.py", "build", "--build-base=${buildDir}"
+    execInVirtualenv "pip", "install", "-e", "."
+  }
+}
+
+task startLocalJobService {
+  dependsOn virtualenv
+
+  doLast {
+    execBackgroundInVirtualenv "python",
+        "-m", "apache_beam.runners.portability.local_job_service_main",
+        "--background",
+        "--stdout_file=${localJobServiceStdoutFile}",
+        "--pid_file=${localJobServicePidFile}",
+        "--port_file=${localJobServicePortFile}"

Review comment:
       It would be nicer for the port to be an output of the task and read it in by the other task, but that plumbing seems to be unnatural based on gradle docs I could find.

##########
File path: runners/portability/java/build.gradle
##########
@@ -31,9 +45,123 @@ dependencies {
   compile project(path: ":sdks:java:harness", configuration: "shadow")
   compile library.java.vendored_grpc_1_26_0
   compile library.java.slf4j_api
+
   testCompile project(path: ":runners:core-construction-java", configuration: "testRuntime")
   testCompile library.java.hamcrest_core
   testCompile library.java.junit
   testCompile library.java.mockito_core
   testCompile library.java.slf4j_jdk14
+
+  validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
+  validatesRunner project(path: ":runners:core-java", configuration: "testRuntime")
+  validatesRunner project(path: project.path, configuration: "testRuntime")
+}
+
+
+project.evaluationDependsOn(":sdks:java:core")
+project.evaluationDependsOn(":runners:core-java")
+
+ext.virtualenvDir = "${project.buildDir}/virtualenv"
+ext.localJobServicePidFile = "${project.buildDir}/local_job_service_pid"
+ext.localJobServicePortFile = project.hasProperty("localJobServicePortFile") ? project.property("localJobServicePortFile") : "${project.buildDir}/local_job_service_port"
+ext.localJobServiceStdoutFile = "${project.buildDir}/local_job_service_stdout"
+
+ext.pythonSdkDir = "${project.rootDir}/sdks/python"
+
+void execInVirtualenv(String... args) {
+  String shellCommand = ". ${virtualenvDir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
+  exec {
+    workingDir pythonSdkDir
+    commandLine "sh", "-c", shellCommand
+  }
 }
+
+void execBackgroundInVirtualenv(String... args) {
+  String shellCommand = ". ${virtualenvDir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
+  println "execBackgroundInVirtualEnv: ${shellCommand}"
+  ProcessBuilder pb = new ProcessBuilder().redirectErrorStream(true).directory(new File(pythonSdkDir)).command(["sh", "-c", shellCommand])
+  Process proc = pb.start();
+
+  // redirectIO does not work for connecting to groovy/gradle stdout
+  BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+  String line
+  while ((line = reader.readLine()) != null) {
+    println line
+  }
+  proc.waitFor();
+}
+
+task virtualenv {
+  doLast {
+    exec {
+      commandLine "virtualenv", virtualenvDir, "--python=python3"
+    }
+    execInVirtualenv "pip", "install", "--retries", "10", "--upgrade", "tox==3.11.1", "--requirement", "${project.rootDir}/sdks/python/build-requirements.txt"
+    execInVirtualenv "python", "setup.py", "build", "--build-base=${buildDir}"
+    execInVirtualenv "pip", "install", "-e", "."
+  }
+}
+
+task startLocalJobService {
+  dependsOn virtualenv
+
+  doLast {
+    execBackgroundInVirtualenv "python",
+        "-m", "apache_beam.runners.portability.local_job_service_main",
+        "--background",
+        "--stdout_file=${localJobServiceStdoutFile}",
+        "--pid_file=${localJobServicePidFile}",
+        "--port_file=${localJobServicePortFile}"
+
+    File pidFile = new File(localJobServicePidFile)
+    int totalSleep = 0
+    while (!pidFile.exists()) {
+      sleep(500)
+      totalSleep += 500
+      if (totalSleep > 5000) {
+        throw new RuntimeException("Local job service pid file never showed up");
+      }
+    }
+  }
+}
+
+task stopLocalJobService {
+  doLast {
+    execInVirtualenv "python",
+        "-m", "apache_beam.runners.portability.local_job_service_main",
+        "--stop",
+        "--pid_file=${localJobServicePidFile}"
+  }
+}
+
+startLocalJobService.finalizedBy stopLocalJobService
+
+/**
+ * Runs Java ValidatesRunner tests against the Universal Local Runner (ULR) aka local_job_service_main
+ * with subprocess SDK harness environments.
+ */
+task ulrValidatesRunnerTests(type: Test) {
+  dependsOn ":sdks:java:container:docker"
+
+  if (!project.hasProperty("localJobServicePortFile")) {
+    dependsOn startLocalJobService
+  }
+
+  group = "Verification"
+  description "PortableRunner Java subprocess ValidatesRunner suite"
+  classpath = configurations.validatesRunner
+  systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
+      "--runner=TestUniversalRunner",
+      "--localJobServicePortFile=${localJobServicePortFile}"
+  ])
+  testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
+  useJUnit {
+    includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+  }
+  filter {
+    includeTestsMatching 'ImpulseTest'

Review comment:
       TODO(me): remove this once we get past the sanity checking phase

##########
File path: runners/portability/java/src/main/java/org/apache/beam/runners/portability/testing/TestUniversalRunner.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.portability.testing;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import org.apache.beam.runners.portability.PortableRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.hamcrest.Matchers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link PipelineRunner} a {@link Pipeline} against a {@code JobService}. */
+public class TestUniversalRunner extends PipelineRunner<PipelineResult> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestUniversalRunner.class);
+
+  private final PipelineOptions options;
+
+  private TestUniversalRunner(PipelineOptions options) {
+    this.options = options;
+  }
+
+  /**
+   * Constructs a runner from the provided options.
+   *
+   * @param options Properties which configure the runner.
+   * @return The newly created runner.
+   */
+  public static TestUniversalRunner fromOptions(PipelineOptions options) {

Review comment:
       Had to add this, because `TestPortableRunner` couples "check that the job succeeds" logic with a bunch of other things having to do with launching an existing Java runner as a portable runner, not relevant to actual portable runner services.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org