You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/06/07 20:24:15 UTC

[GitHub] [beam] ibzib commented on a change in pull request #14942: [BEAM-8137] Add Main method to ExternalWorkerService

ibzib commented on a change in pull request #14942:
URL: https://github.com/apache/beam/pull/14942#discussion_r646907646



##########
File path: runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java
##########
@@ -38,6 +44,7 @@
 public class ExternalWorkerService extends BeamFnExternalWorkerPoolImplBase implements FnService {
 
   private static final Logger LOG = LoggerFactory.getLogger(ExternalWorkerService.class);
+  private static final String PIPELINE_OPTIONS = "PIPELINE_OPTIONS";

Review comment:
       Nit: we can rename this variable to make its usage more obvious.
   ```suggestion
     private static final String PIPELINE_OPTIONS_ENV_VAR = "PIPELINE_OPTIONS";
   ```

##########
File path: runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java
##########
@@ -90,10 +97,55 @@ public void stopWorker(
   public void close() {}
 
   public GrpcFnServer<ExternalWorkerService> start() throws Exception {
-    GrpcFnServer<ExternalWorkerService> server =
-        GrpcFnServer.allocatePortAndCreateFor(this, serverFactory);
+    final String externalServiceAddress =
+        Environments.getExternalServiceAddress(options.as(PortablePipelineOptions.class));
+    GrpcFnServer<ExternalWorkerService> server;
+    if (externalServiceAddress.isEmpty()) {
+      server = GrpcFnServer.allocatePortAndCreateFor(this, serverFactory);
+    } else {
+      server =
+          GrpcFnServer.create(
+              this,
+              Endpoints.ApiServiceDescriptor.newBuilder().setUrl(externalServiceAddress).build(),
+              serverFactory);
+    }
     LOG.debug(
         "Listening for worker start requests at {}.", server.getApiServiceDescriptor().getUrl());
     return server;
   }
+
+  /**
+   * Worker pool entry point.
+   *
+   * <p>The worker pool exposes an RPC service that is used with EXTERNAL environment to start and
+   * stop the SDK workers.
+   *
+   * <p>The worker pool uses threads for parallelism;
+   *
+   * <p>This entry point is used by the Java SDK container in worker pool mode.
+   */
+  public static void main(String[] args) throws Exception {
+    main(System::getenv);
+  }
+
+  public static void main(Function<String, String> environmentVarGetter) throws Exception {
+    System.out.format("Starting external worker service%n");

Review comment:
       Why `System.out` instead of the logger?

##########
File path: runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java
##########
@@ -90,10 +97,55 @@ public void stopWorker(
   public void close() {}
 
   public GrpcFnServer<ExternalWorkerService> start() throws Exception {
-    GrpcFnServer<ExternalWorkerService> server =
-        GrpcFnServer.allocatePortAndCreateFor(this, serverFactory);
+    final String externalServiceAddress =
+        Environments.getExternalServiceAddress(options.as(PortablePipelineOptions.class));
+    GrpcFnServer<ExternalWorkerService> server;
+    if (externalServiceAddress.isEmpty()) {
+      server = GrpcFnServer.allocatePortAndCreateFor(this, serverFactory);
+    } else {
+      server =
+          GrpcFnServer.create(
+              this,
+              Endpoints.ApiServiceDescriptor.newBuilder().setUrl(externalServiceAddress).build(),
+              serverFactory);
+    }
     LOG.debug(
         "Listening for worker start requests at {}.", server.getApiServiceDescriptor().getUrl());
     return server;
   }
+
+  /**
+   * Worker pool entry point.
+   *
+   * <p>The worker pool exposes an RPC service that is used with EXTERNAL environment to start and
+   * stop the SDK workers.
+   *
+   * <p>The worker pool uses threads for parallelism;
+   *
+   * <p>This entry point is used by the Java SDK container in worker pool mode.
+   */
+  public static void main(String[] args) throws Exception {
+    main(System::getenv);
+  }
+
+  public static void main(Function<String, String> environmentVarGetter) throws Exception {
+    System.out.format("Starting external worker service%n");
+    System.out.format("Pipeline options %s%n", environmentVarGetter.apply(PIPELINE_OPTIONS));
+    PipelineOptions options =
+        PipelineOptionsTranslation.fromJson(environmentVarGetter.apply(PIPELINE_OPTIONS));
+
+    try (GrpcFnServer<ExternalWorkerService> server = new ExternalWorkerService(options).start()) {
+      System.out.format(
+          "External worker service started at address: %s",
+          server.getApiServiceDescriptor().getUrl());
+      while (true) {
+        // Wait indefinitely to keep ExternalWorkerService running
+        Sleeper.DEFAULT.sleep(60 * 60 * 24 * 1000);

Review comment:
       Any reason to use a `while` loop * 24hr sleep instead of simply a very long sleep?

##########
File path: runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java
##########
@@ -90,10 +97,55 @@ public void stopWorker(
   public void close() {}
 
   public GrpcFnServer<ExternalWorkerService> start() throws Exception {
-    GrpcFnServer<ExternalWorkerService> server =
-        GrpcFnServer.allocatePortAndCreateFor(this, serverFactory);
+    final String externalServiceAddress =
+        Environments.getExternalServiceAddress(options.as(PortablePipelineOptions.class));
+    GrpcFnServer<ExternalWorkerService> server;
+    if (externalServiceAddress.isEmpty()) {
+      server = GrpcFnServer.allocatePortAndCreateFor(this, serverFactory);
+    } else {
+      server =
+          GrpcFnServer.create(
+              this,
+              Endpoints.ApiServiceDescriptor.newBuilder().setUrl(externalServiceAddress).build(),
+              serverFactory);
+    }
     LOG.debug(
         "Listening for worker start requests at {}.", server.getApiServiceDescriptor().getUrl());
     return server;
   }
+
+  /**
+   * Worker pool entry point.
+   *
+   * <p>The worker pool exposes an RPC service that is used with EXTERNAL environment to start and
+   * stop the SDK workers.
+   *
+   * <p>The worker pool uses threads for parallelism;
+   *
+   * <p>This entry point is used by the Java SDK container in worker pool mode.
+   */
+  public static void main(String[] args) throws Exception {
+    main(System::getenv);
+  }
+
+  public static void main(Function<String, String> environmentVarGetter) throws Exception {
+    System.out.format("Starting external worker service%n");
+    System.out.format("Pipeline options %s%n", environmentVarGetter.apply(PIPELINE_OPTIONS));

Review comment:
       This code assumes pipeline options are known ahead of time. Ideally, a worker pool should be logically uncoupled from a single pipeline, and able to serve multiple pipelines with varying options. It may not be possible, however, without modifying StartWorkerRequest to include pipeline options.
   
   This wasn't a problem in Python. The difference is that in Java, FnHarness takes pipeline options as an argument. sdk_worker.py, as far as I can tell, does not use pipeline options.
   
   It may be acceptable to take pipeline options up-front, but it should be clear from documentation that   `PIPELINE_OPTIONS` should match the pipeline options used by the job, or else there could be confusion about which set of options is actually read in various places.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org