You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/10 18:21:25 UTC

[GitHub] [beam] kennknowles commented on a change in pull request #11792: [BEAM-10435] Add ValidatesRunner task for local_job_service and Java SDK harness

kennknowles commented on a change in pull request #11792:
URL: https://github.com/apache/beam/pull/11792#discussion_r452988197



##########
File path: runners/portability/java/build.gradle
##########
@@ -31,9 +38,161 @@ dependencies {
   compile project(path: ":sdks:java:harness", configuration: "shadow")
   compile library.java.vendored_grpc_1_26_0
   compile library.java.slf4j_api
+
   testCompile project(path: ":runners:core-construction-java", configuration: "testRuntime")
   testCompile library.java.hamcrest_core
   testCompile library.java.junit
   testCompile library.java.mockito_core
   testCompile library.java.slf4j_jdk14
+
+  validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
+  validatesRunner project(path: ":runners:core-java", configuration: "testRuntime")
+  validatesRunner project(path: project.path, configuration: "testRuntime")
+}
+
+
+project.evaluationDependsOn(":sdks:java:core")
+project.evaluationDependsOn(":runners:core-java")
+
+ext.localJobServicePidFile = "${project.buildDir}/local_job_service_pid"
+ext.localJobServicePortFile = project.hasProperty("localJobServicePortFile") ? project.property("localJobServicePortFile") : "${project.buildDir}/local_job_service_port"
+ext.localJobServiceStdoutFile = "${project.buildDir}/local_job_service_stdout"
+
+void execInVirtualenv(String... args) {
+  String shellCommand = ". ${project.ext.envdir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
+  exec {
+    workingDir pythonSdkDir
+    commandLine "sh", "-c", shellCommand
+  }
+}
+
+// Does not background the process, but allows the process to daemonize itself
+void execBackgroundInVirtualenv(String... args) {
+  String shellCommand = ". ${project.ext.envdir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
+  println "execBackgroundInVirtualEnv: ${shellCommand}"
+  ProcessBuilder pb = new ProcessBuilder().redirectErrorStream(true).directory(new File(pythonSdkDir)).command(["sh", "-c", shellCommand])
+  Process proc = pb.start();
+
+  // redirectIO does not work for connecting to groovy/gradle stdout
+  BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+  String line
+  while ((line = reader.readLine()) != null) {
+    println line
+  }
+  proc.waitFor();
+}
+
+task startLocalJobService {
+  dependsOn setupVirtualenv
+
+  doLast {
+    execBackgroundInVirtualenv "python",
+        "-m", "apache_beam.runners.portability.local_job_service_main",
+        "--background",
+        "--stdout_file=${localJobServiceStdoutFile}",
+        "--pid_file=${localJobServicePidFile}",
+        "--port_file=${localJobServicePortFile}"
+//

Review comment:
       Done

##########
File path: runners/portability/java/src/main/java/org/apache/beam/runners/portability/testing/TestUniversalRunner.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.portability.testing;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import org.apache.beam.runners.portability.PortableRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.hamcrest.Matchers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link PipelineRunner} a {@link Pipeline} against a {@code JobService}. */
+public class TestUniversalRunner extends PipelineRunner<PipelineResult> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestUniversalRunner.class);
+
+  private final PipelineOptions options;
+
+  private TestUniversalRunner(PipelineOptions options) {
+    this.options = options;
+  }
+
+  /**
+   * Constructs a runner from the provided options.
+   *
+   * @param options Properties which configure the runner.
+   * @return The newly created runner.
+   */
+  public static TestUniversalRunner fromOptions(PipelineOptions options) {
+    return new TestUniversalRunner(options);
+  }
+
+  @Override
+  public PipelineResult run(Pipeline pipeline) {
+    try {
+      Options testOptions = options.as(Options.class);
+      String endpoint =
+          "localhost:"
+              + new String(
+                      Files.readAllBytes(Paths.get(testOptions.getLocalJobServicePortFile())),
+                      Charsets.UTF_8)
+                  .trim();
+
+      PortablePipelineOptions portableOptions = options.as(PortablePipelineOptions.class);
+      portableOptions.setJobEndpoint(endpoint);
+      portableOptions.setRunner(PortableRunner.class);
+      PortableRunner runner = PortableRunner.fromOptions(portableOptions);
+      PipelineResult result = runner.run(pipeline);
+      assertThat(
+          "Pipeline did not succeed.",
+          result.waitUntilFinish(),
+          Matchers.is(PipelineResult.State.DONE));
+      return result;
+    } catch (IOException e) {

Review comment:
       Done

##########
File path: sdks/python/apache_beam/runners/portability/local_job_service_main.py
##########
@@ -99,11 +104,23 @@ def run(argv):
       options.port_file = os.path.splitext(options.pid_file)[0] + '.port'
       argv.append('--port_file')
       argv.append(options.port_file)
+
+    if not options.stdout_file:
+      raise RuntimeError('--stdout_file must be specified with --background')
+    stdout_dest = open(options.stdout_file, mode='w')

Review comment:
       Yea I think my rebasing clobbered that thread. It is not an issue. Parent file descriptors are not closed. You can find some links I think on the PR front page it will still have the prior conversation.
   
   (I won't rebase from here on out, until review is done)

##########
File path: runners/portability/java/build.gradle
##########
@@ -31,9 +38,161 @@ dependencies {
   compile project(path: ":sdks:java:harness", configuration: "shadow")
   compile library.java.vendored_grpc_1_26_0
   compile library.java.slf4j_api
+
   testCompile project(path: ":runners:core-construction-java", configuration: "testRuntime")
   testCompile library.java.hamcrest_core
   testCompile library.java.junit
   testCompile library.java.mockito_core
   testCompile library.java.slf4j_jdk14
+
+  validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
+  validatesRunner project(path: ":runners:core-java", configuration: "testRuntime")
+  validatesRunner project(path: project.path, configuration: "testRuntime")
+}
+
+
+project.evaluationDependsOn(":sdks:java:core")
+project.evaluationDependsOn(":runners:core-java")
+
+ext.localJobServicePidFile = "${project.buildDir}/local_job_service_pid"
+ext.localJobServicePortFile = project.hasProperty("localJobServicePortFile") ? project.property("localJobServicePortFile") : "${project.buildDir}/local_job_service_port"
+ext.localJobServiceStdoutFile = "${project.buildDir}/local_job_service_stdout"
+
+void execInVirtualenv(String... args) {
+  String shellCommand = ". ${project.ext.envdir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
+  exec {
+    workingDir pythonSdkDir
+    commandLine "sh", "-c", shellCommand
+  }
+}
+
+// Does not background the process, but allows the process to daemonize itself
+void execBackgroundInVirtualenv(String... args) {
+  String shellCommand = ". ${project.ext.envdir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
+  println "execBackgroundInVirtualEnv: ${shellCommand}"
+  ProcessBuilder pb = new ProcessBuilder().redirectErrorStream(true).directory(new File(pythonSdkDir)).command(["sh", "-c", shellCommand])
+  Process proc = pb.start();
+
+  // redirectIO does not work for connecting to groovy/gradle stdout
+  BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+  String line
+  while ((line = reader.readLine()) != null) {
+    println line
+  }
+  proc.waitFor();
+}
+
+task startLocalJobService {
+  dependsOn setupVirtualenv
+
+  doLast {
+    execBackgroundInVirtualenv "python",
+        "-m", "apache_beam.runners.portability.local_job_service_main",
+        "--background",
+        "--stdout_file=${localJobServiceStdoutFile}",
+        "--pid_file=${localJobServicePidFile}",
+        "--port_file=${localJobServicePortFile}"
+//
+//    File pidFile = new File(localJobServicePidFile)
+//    int totalSleep = 0
+//    while (!pidFile.exists()) {
+//      sleep(500)
+//      totalSleep += 500
+//      if (totalSleep > 5000) {
+//        throw new RuntimeException("Local job service pid file never showed up");
+//      }
+//    }
+  }
 }
+
+task stopLocalJobService {
+  doLast {
+    execInVirtualenv "python",
+        "-m", "apache_beam.runners.portability.local_job_service_main",
+        "--stop",
+        "--pid_file=${localJobServicePidFile}"
+  }
+}
+
+startLocalJobService.finalizedBy stopLocalJobService
+
+/**
+ * Runs Java ValidatesRunner tests against the Universal Local Runner (ULR) aka local_job_service_main
+ * with subprocess SDK harness environments.
+ */
+task ulrValidatesRunnerTests(type: Test) {
+  dependsOn ":sdks:java:container:docker"
+
+  if (!project.hasProperty("localJobServicePortFile")) {
+    dependsOn startLocalJobService
+  }
+
+  group = "Verification"
+  description "PortableRunner Java subprocess ValidatesRunner suite"
+  classpath = configurations.validatesRunner
+  systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
+      "--runner=TestUniversalRunner",
+      "--experiments=beam_fn_api",
+      "--localJobServicePortFile=${localJobServicePortFile}"
+  ])

Review comment:
       I rather specifically want the docker dependency, to have a test of the true Java SDK harness container without the complexity of a production runner. But that can be postcommit and if LOOPBACK is faster and easier to debug that's good for precommit. I'd like to leave as-is to avoid churning this PR, but will follow up and create a LOOPBACK version prior to creating any Jenkins job.

##########
File path: runners/portability/java/build.gradle
##########
@@ -31,9 +38,161 @@ dependencies {
   compile project(path: ":sdks:java:harness", configuration: "shadow")
   compile library.java.vendored_grpc_1_26_0
   compile library.java.slf4j_api
+
   testCompile project(path: ":runners:core-construction-java", configuration: "testRuntime")
   testCompile library.java.hamcrest_core
   testCompile library.java.junit
   testCompile library.java.mockito_core
   testCompile library.java.slf4j_jdk14
+
+  validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
+  validatesRunner project(path: ":runners:core-java", configuration: "testRuntime")
+  validatesRunner project(path: project.path, configuration: "testRuntime")
+}
+
+
+project.evaluationDependsOn(":sdks:java:core")
+project.evaluationDependsOn(":runners:core-java")
+
+ext.localJobServicePidFile = "${project.buildDir}/local_job_service_pid"
+ext.localJobServicePortFile = project.hasProperty("localJobServicePortFile") ? project.property("localJobServicePortFile") : "${project.buildDir}/local_job_service_port"
+ext.localJobServiceStdoutFile = "${project.buildDir}/local_job_service_stdout"
+
+void execInVirtualenv(String... args) {
+  String shellCommand = ". ${project.ext.envdir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
+  exec {
+    workingDir pythonSdkDir
+    commandLine "sh", "-c", shellCommand
+  }
+}
+
+// Does not background the process, but allows the process to daemonize itself
+void execBackgroundInVirtualenv(String... args) {
+  String shellCommand = ". ${project.ext.envdir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
+  println "execBackgroundInVirtualEnv: ${shellCommand}"
+  ProcessBuilder pb = new ProcessBuilder().redirectErrorStream(true).directory(new File(pythonSdkDir)).command(["sh", "-c", shellCommand])
+  Process proc = pb.start();
+
+  // redirectIO does not work for connecting to groovy/gradle stdout
+  BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+  String line
+  while ((line = reader.readLine()) != null) {
+    println line
+  }
+  proc.waitFor();
+}
+
+task startLocalJobService {
+  dependsOn setupVirtualenv
+
+  doLast {
+    execBackgroundInVirtualenv "python",
+        "-m", "apache_beam.runners.portability.local_job_service_main",
+        "--background",
+        "--stdout_file=${localJobServiceStdoutFile}",
+        "--pid_file=${localJobServicePidFile}",
+        "--port_file=${localJobServicePortFile}"
+//
+//    File pidFile = new File(localJobServicePidFile)
+//    int totalSleep = 0
+//    while (!pidFile.exists()) {
+//      sleep(500)
+//      totalSleep += 500
+//      if (totalSleep > 5000) {
+//        throw new RuntimeException("Local job service pid file never showed up");
+//      }
+//    }
+  }
 }
+
+task stopLocalJobService {
+  doLast {
+    execInVirtualenv "python",
+        "-m", "apache_beam.runners.portability.local_job_service_main",
+        "--stop",
+        "--pid_file=${localJobServicePidFile}"
+  }
+}
+
+startLocalJobService.finalizedBy stopLocalJobService
+
+/**
+ * Runs Java ValidatesRunner tests against the Universal Local Runner (ULR) aka local_job_service_main
+ * with subprocess SDK harness environments.
+ */
+task ulrValidatesRunnerTests(type: Test) {
+  dependsOn ":sdks:java:container:docker"
+
+  if (!project.hasProperty("localJobServicePortFile")) {
+    dependsOn startLocalJobService
+  }
+
+  group = "Verification"
+  description "PortableRunner Java subprocess ValidatesRunner suite"
+  classpath = configurations.validatesRunner
+  systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
+      "--runner=TestUniversalRunner",
+      "--experiments=beam_fn_api",
+      "--localJobServicePortFile=${localJobServicePortFile}"
+  ])
+  testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
+  useJUnit {
+    includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+    excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
+    excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
+    excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
+    excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
+    excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithMultipleStages'
+    excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
+  }
+  filter {
+    // There is not currently a category for excluding these _only_ in committed mode

Review comment:
       Done. Also added a new Jira component `runner-universal` since I did not find one, in case there's a need to search for these.

##########
File path: runners/portability/java/src/main/java/org/apache/beam/runners/portability/testing/TestUniversalRunner.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.portability.testing;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import org.apache.beam.runners.portability.PortableRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.hamcrest.Matchers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link PipelineRunner} a {@link Pipeline} against a {@code JobService}. */
+public class TestUniversalRunner extends PipelineRunner<PipelineResult> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestUniversalRunner.class);
+
+  private final PipelineOptions options;
+
+  private TestUniversalRunner(PipelineOptions options) {
+    this.options = options;
+  }
+
+  /**
+   * Constructs a runner from the provided options.
+   *
+   * @param options Properties which configure the runner.
+   * @return The newly created runner.
+   */
+  public static TestUniversalRunner fromOptions(PipelineOptions options) {
+    return new TestUniversalRunner(options);
+  }
+
+  @Override
+  public PipelineResult run(Pipeline pipeline) {
+    try {
+      Options testOptions = options.as(Options.class);
+      String endpoint =
+          "localhost:"
+              + new String(
+                      Files.readAllBytes(Paths.get(testOptions.getLocalJobServicePortFile())),
+                      Charsets.UTF_8)
+                  .trim();
+
+      PortablePipelineOptions portableOptions = options.as(PortablePipelineOptions.class);
+      portableOptions.setJobEndpoint(endpoint);
+      portableOptions.setRunner(PortableRunner.class);
+      PortableRunner runner = PortableRunner.fromOptions(portableOptions);
+      PipelineResult result = runner.run(pipeline);
+      assertThat(
+          "Pipeline did not succeed.",
+          result.waitUntilFinish(),
+          Matchers.is(PipelineResult.State.DONE));
+      return result;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public interface Options extends TestPipelineOptions, PortablePipelineOptions {
+    /**
+     * A file containing the job service port, since Gradle needs to know this filename statically
+     * to provide it in Beam testing options.
+     */
+    @Description("File containing local job service port.")

Review comment:
       Yea, this sort of thing is what took 90% of the time for this PR actually. Scraping around Gradle's docs and the internet for ways to insert that little bit of logic, because realizing it was sort of against the grain. Pipeline options are passed as a Java system property, and those are set up in the Gradle graph construction phase. More generally, there's not a Gradle graph execution-time slot for free-form code that also re-uses the Test task type. Perhaps they expect you to use inheritance and make a new Task type. Which I would rather not do ;_;
   
   It would be fine to have two pipeline options, so that simple use could be simple.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org