You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2019/03/05 08:45:51 UTC

[beam] branch master updated: [BEAM-6731] Move ExpansionService to core-construction-java

This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a803102  [BEAM-6731] Move ExpansionService to core-construction-java
     new ee96f66  Merge pull request #7981: [BEAM-6731] Move ExpansionService to core-construction-java
a803102 is described below

commit a80310223cb61f8fba5bf8cf317521f013f6b4e1
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Mon Mar 4 14:04:00 2019 +0100

    [BEAM-6731] Move ExpansionService to core-construction-java
    
    This moves the previously relocated ExpansionServer back to
    core-construction-java.
---
 runners/core-construction-java/build.gradle        | 26 +++++++
 .../construction/expansion/ExpansionServer.java    | 81 ++++++++++++++++++++++
 .../construction}/expansion/ExpansionService.java  |  5 +-
 .../core/construction}/expansion/package-info.java |  2 +-
 .../expansion/ExpansionServiceTest.java            |  4 +-
 .../expansion/TestExpansionService.java            |  6 +-
 .../beam/runners/flink/FlinkJobServerDriver.java   | 14 ++--
 runners/java-fn-execution/build.gradle             | 27 --------
 .../fnexecution/jobsubmission/JobServerDriver.java | 42 ++++-------
 sdks/python/build.gradle                           |  8 +--
 10 files changed, 139 insertions(+), 76 deletions(-)

diff --git a/runners/core-construction-java/build.gradle b/runners/core-construction-java/build.gradle
index 563c0cf..d1b1825 100644
--- a/runners/core-construction-java/build.gradle
+++ b/runners/core-construction-java/build.gradle
@@ -51,3 +51,29 @@ dependencies {
   shadowTest project(path: ":beam-model-fn-execution", configuration: "shadow")
 }
 
+task runExpansionService (type: JavaExec) {
+  main = "org.apache.beam.runners.core.construction.expansion.ExpansionService"
+  classpath = sourceSets.main.runtimeClasspath
+  args = [project.findProperty("constructionService.port") ?: "8097"]
+}
+
+task runTestExpansionService (type: JavaExec) {
+  main = "org.apache.beam.runners.core.construction.expansion.TestExpansionService"
+  classpath = sourceSets.test.runtimeClasspath
+  args = [project.findProperty("constructionService.port") ?: "8097"]
+}
+
+task buildTestExpansionServiceJar(type: Jar) {
+  dependsOn = [shadowJar, shadowTestJar]
+  appendix = "testExpansionService"
+  // Use zip64 mode to avoid "Archive contains more than 65535 entries".
+  zip64 = true
+  manifest {
+    attributes(
+            'Main-Class': 'org.apache.beam.runners.core.construction.expansion.TestExpansionService'
+    )
+  }
+  from { configurations.testRuntime.collect { it.isDirectory() ? it : zipTree(it) }}
+  from sourceSets.main.output
+  from sourceSets.test.output
+}
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionServer.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionServer.java
new file mode 100644
index 0000000..d152e76
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionServer.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.expansion;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Server;
+import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.netty.NettyServerBuilder;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
+
+/** A {@link Server gRPC Server} for an ExpansionService. */
+public class ExpansionServer implements AutoCloseable {
+  /**
+   * Create a {@link ExpansionServer} for the provided ExpansionService running on an arbitrary
+   * port.
+   *
+   * <p>If port is 0, a free port will be assigned.
+   */
+  public static ExpansionServer create(ExpansionService service, String host, int port)
+      throws IOException {
+    return new ExpansionServer(service, host, port);
+  }
+
+  private final String host;
+  private final Server server;
+  private final ExpansionService service;
+
+  private ExpansionServer(ExpansionService service, String host, int port) throws IOException {
+    this.service = Preconditions.checkNotNull(service);
+    this.host = Preconditions.checkNotNull(host);
+    this.server =
+        NettyServerBuilder.forAddress(new InetSocketAddress(host, port))
+            .addService(service)
+            .build()
+            .start();
+  }
+
+  /** Get the ExpansionService exposed by this {@link ExpansionServer}. */
+  public ExpansionService getService() {
+    return service;
+  }
+
+  /** Get the host that this {@link ExpansionServer} is bound to. */
+  public String getHost() {
+    return host;
+  }
+
+  /** Get the port that this {@link ExpansionServer} is bound to. */
+  public int getPort() {
+    return server.getPort();
+  }
+
+  @Override
+  public void close() throws Exception {
+    try {
+      // The server has been closed, and should not respond to any new incoming calls.
+      server.shutdown();
+      service.close();
+      server.awaitTermination(60, TimeUnit.SECONDS);
+    } finally {
+      server.shutdownNow();
+      server.awaitTermination();
+    }
+  }
+}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/expansion/ExpansionService.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java
similarity index 98%
rename from runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/expansion/ExpansionService.java
rename to runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java
index 7c0b988..a49e8ad 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/expansion/ExpansionService.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.fnexecution.expansion;
+package org.apache.beam.runners.core.construction.expansion;
 
 import com.google.auto.service.AutoService;
 import java.io.IOException;
@@ -32,7 +32,6 @@ import org.apache.beam.runners.core.construction.Environments;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.runners.core.construction.RehydratedComponents;
 import org.apache.beam.runners.core.construction.SdkComponents;
-import org.apache.beam.runners.fnexecution.FnService;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
@@ -52,7 +51,7 @@ import org.slf4j.LoggerFactory;
 
 /** A service that allows pipeline expand transforms from a remote SDK. */
 public class ExpansionService extends ExpansionServiceGrpc.ExpansionServiceImplBase
-    implements FnService {
+    implements AutoCloseable {
 
   private static final Logger LOG = LoggerFactory.getLogger(ExpansionService.class);
 
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/expansion/package-info.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/package-info.java
similarity index 93%
rename from runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/expansion/package-info.java
rename to runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/package-info.java
index 1d5625c..6006092 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/expansion/package-info.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/package-info.java
@@ -17,4 +17,4 @@
  */
 
 /** Classes used to expand cross-language transforms. */
-package org.apache.beam.runners.fnexecution.expansion;
+package org.apache.beam.runners.core.construction.expansion;
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/expansion/ExpansionServiceTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/ExpansionServiceTest.java
similarity index 96%
rename from runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/expansion/ExpansionServiceTest.java
rename to runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/ExpansionServiceTest.java
index a108c77..100a4e4 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/expansion/ExpansionServiceTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/ExpansionServiceTest.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.fnexecution.expansion;
+package org.apache.beam.runners.core.construction.expansion;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -35,7 +35,7 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
 import org.junit.Test;
 
-/** Tests for {@link org.apache.beam.runners.fnexecution.expansion.ExpansionService}. */
+/** Tests for {@link ExpansionService}. */
 public class ExpansionServiceTest {
 
   private static final String TEST_URN = "test:beam:transforms:count";
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/expansion/TestExpansionService.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/TestExpansionService.java
similarity index 93%
rename from runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/expansion/TestExpansionService.java
rename to runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/TestExpansionService.java
index 5207c80..3ec867a 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/expansion/TestExpansionService.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/TestExpansionService.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.fnexecution.expansion;
+package org.apache.beam.runners.core.construction.expansion;
 
 import com.google.auto.service.AutoService;
 import java.util.Map;
@@ -25,7 +25,9 @@ import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Server;
 import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ServerBuilder;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
 
-/** An {@link ExpansionService} useful for tests. */
+/**
+ * An {@link org.apache.beam.runners.core.construction.expansion.ExpansionService} useful for tests.
+ */
 public class TestExpansionService {
 
   private static final String TEST_COUNT_URN = "pytest:beam:transforms:count";
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
index 0b4689d..2f3f981 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
@@ -89,25 +89,21 @@ public class FlinkJobServerDriver extends JobServerDriver {
     return create(
         configuration,
         createJobServerFactory(configuration),
-        createArtifactServerFactory(configuration),
-        createExpansionServerFactory(configuration));
+        createArtifactServerFactory(configuration));
   }
 
   public static FlinkJobServerDriver create(
       FlinkServerConfiguration configuration,
       ServerFactory jobServerFactory,
-      ServerFactory artifactServerFactory,
-      ServerFactory expansionServerFactory) {
-    return new FlinkJobServerDriver(
-        configuration, jobServerFactory, artifactServerFactory, expansionServerFactory);
+      ServerFactory artifactServerFactory) {
+    return new FlinkJobServerDriver(configuration, jobServerFactory, artifactServerFactory);
   }
 
   private FlinkJobServerDriver(
       FlinkServerConfiguration configuration,
       ServerFactory jobServerFactory,
-      ServerFactory artifactServerFactory,
-      ServerFactory expansionServerFactory) {
-    super(configuration, jobServerFactory, artifactServerFactory, expansionServerFactory);
+      ServerFactory artifactServerFactory) {
+    super(configuration, jobServerFactory, artifactServerFactory);
   }
 
   @Override
diff --git a/runners/java-fn-execution/build.gradle b/runners/java-fn-execution/build.gradle
index 5d48782..7c5cb14 100644
--- a/runners/java-fn-execution/build.gradle
+++ b/runners/java-fn-execution/build.gradle
@@ -58,30 +58,3 @@ task testDocker(type: Test) {
     includeCategories "org.apache.beam.runners.fnexecution.environment.testing.NeedsDocker"
   }
 }
-
-task runExpansionService (type: JavaExec) {
-  main = "org.apache.beam.runners.fnexecution.expansion.ExpansionService"
-  classpath = sourceSets.main.runtimeClasspath
-  args = [project.findProperty("constructionService.port") ?: "8097"]
-}
-
-task runTestExpansionService (type: JavaExec) {
-  main = "org.apache.beam.runners.fnexecution.expansion.TestExpansionService"
-  classpath = sourceSets.test.runtimeClasspath
-  args = [project.findProperty("constructionService.port") ?: "8097"]
-}
-
-task buildTestExpansionServiceJar(type: Jar) {
-  dependsOn = [shadowJar, shadowTestJar]
-  appendix = "testExpansionService"
-  // Use zip64 mode to avoid "Archive contains more than 65535 entries".
-  zip64 = true
-  manifest {
-    attributes(
-            'Main-Class': 'org.apache.beam.runners.fnexecution.expansion.TestExpansionService'
-    )
-  }
-  from { configurations.testRuntime.collect { it.isDirectory() ? it : zipTree(it) }}
-  from sourceSets.main.output
-  from sourceSets.test.output
-}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
index c4b37da..daa9b56 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
@@ -20,10 +20,11 @@ package org.apache.beam.runners.fnexecution.jobsubmission;
 import java.io.IOException;
 import java.nio.file.Paths;
 import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.runners.core.construction.expansion.ExpansionServer;
+import org.apache.beam.runners.core.construction.expansion.ExpansionService;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.ServerFactory;
 import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
-import org.apache.beam.runners.fnexecution.expansion.ExpansionService;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
 import org.kohsuke.args4j.Option;
 import org.slf4j.Logger;
@@ -38,11 +39,10 @@ public abstract class JobServerDriver implements Runnable {
 
   private final ServerFactory jobServerFactory;
   private final ServerFactory artifactServerFactory;
-  private final ServerFactory expansionServerFactory;
 
   private volatile GrpcFnServer<InMemoryJobService> jobServer;
   private volatile GrpcFnServer<BeamFileSystemArtifactStagingService> artifactStagingServer;
-  private volatile GrpcFnServer<ExpansionService> expansionServer;
+  private volatile ExpansionServer expansionServer;
 
   protected abstract JobInvoker createJobInvoker();
 
@@ -133,19 +133,13 @@ public abstract class JobServerDriver implements Runnable {
     return ServerFactory.createWithPortSupplier(() -> configuration.artifactPort);
   }
 
-  protected static ServerFactory createExpansionServerFactory(ServerConfiguration configuration) {
-    return ServerFactory.createWithPortSupplier(() -> configuration.expansionPort);
-  }
-
   protected JobServerDriver(
       ServerConfiguration configuration,
       ServerFactory jobServerFactory,
-      ServerFactory artifactServerFactory,
-      ServerFactory expansionServerFactory) {
+      ServerFactory artifactServerFactory) {
     this.configuration = configuration;
     this.jobServerFactory = jobServerFactory;
     this.artifactServerFactory = artifactServerFactory;
-    this.expansionServerFactory = expansionServerFactory;
   }
 
   // This method is executed by TestPortableRunner via Reflection
@@ -195,7 +189,8 @@ public abstract class JobServerDriver implements Runnable {
     if (expansionServer != null) {
       try {
         expansionServer.close();
-        LOG.info("Expansion stopped on {}", expansionServer.getApiServiceDescriptor().getUrl());
+        LOG.info(
+            "Expansion stopped on {}:{}", expansionServer.getHost(), expansionServer.getPort());
         expansionServer = null;
       } catch (Exception e) {
         LOG.error("Error while closing the Expansion Service.", e);
@@ -244,23 +239,14 @@ public abstract class JobServerDriver implements Runnable {
     return artifactStagingService;
   }
 
-  private GrpcFnServer<ExpansionService> createExpansionService() throws IOException {
-    ExpansionService service = new ExpansionService();
-    GrpcFnServer<ExpansionService> expansionServiceGrpcFnServer;
-    if (configuration.expansionPort == 0) {
-      expansionServiceGrpcFnServer =
-          GrpcFnServer.allocatePortAndCreateFor(service, expansionServerFactory);
-    } else {
-      Endpoints.ApiServiceDescriptor descriptor =
-          Endpoints.ApiServiceDescriptor.newBuilder()
-              .setUrl(configuration.host + ":" + configuration.expansionPort)
-              .build();
-      expansionServiceGrpcFnServer =
-          GrpcFnServer.create(service, descriptor, expansionServerFactory);
-    }
+  private ExpansionServer createExpansionService() throws IOException {
+    ExpansionServer expansionServer =
+        ExpansionServer.create(
+            new ExpansionService(), configuration.host, configuration.expansionPort);
     LOG.info(
-        "Java ExpansionService started on {}",
-        expansionServiceGrpcFnServer.getApiServiceDescriptor().getUrl());
-    return expansionServiceGrpcFnServer;
+        "Java ExpansionService started on {}:{}",
+        expansionServer.getHost(),
+        expansionServer.getPort());
+    return expansionServer;
   }
 }
diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle
index adb97ad..0ac62e9 100644
--- a/sdks/python/build.gradle
+++ b/sdks/python/build.gradle
@@ -210,7 +210,7 @@ task portableWordCount {
 def portableWordCountTask(name, streaming, crossLanguage) {
   tasks.create(name) {
     dependsOn = ['installGcpTest']
-    mustRunAfter = [':beam-runners-flink_2.11-job-server-container:docker', ':beam-sdks-python-container:docker', ':beam-sdks-java-container:docker', ':beam-runners-java-fn-execution:buildTestExpansionServiceJar']
+    mustRunAfter = [':beam-runners-flink_2.11-job-server-container:docker', ':beam-sdks-python-container:docker', ':beam-sdks-java-container:docker', ':beam-runners-core-construction-java:buildTestExpansionServiceJar']
     doLast {
       // TODO: Figure out GCS credentials and use real GCS input and output.
       def options = [
@@ -227,7 +227,7 @@ def portableWordCountTask(name, streaming, crossLanguage) {
         // workaround for local file output in docker container
         options += ["--environment_cache_millis=10000"]
       if (crossLanguage)
-        options += ["--expansion_service_jar=${project(":beam-runners-java-fn-execution:").buildTestExpansionServiceJar.archivePath}"]
+        options += ["--expansion_service_jar=${project(":beam-runners-core-construction-java:").buildTestExpansionServiceJar.archivePath}"]
       if (project.hasProperty("jobEndpoint"))
         options += ["--job_endpoint=${project.property('jobEndpoint')}"]
       def wordcountMain = crossLanguage ? "apache_beam.examples.wordcount_xlang" : "apache_beam.examples.wordcount"
@@ -455,11 +455,11 @@ project.task('createProcessWorker') {
 project.task('crossLanguagePythonJava') {
   dependsOn 'setupVirtualenv'
   dependsOn ':beam-sdks-java-container:docker'
-  dependsOn ':beam-runners-java-fn-execution:buildTestExpansionServiceJar'
+  dependsOn ':beam-runners-core-construction-java:buildTestExpansionServiceJar'
   doLast {
     exec {
       executable 'sh'
-      args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.transforms.external_test --expansion_service_jar=${project(":beam-runners-java-fn-execution:").buildTestExpansionServiceJar.archivePath}"
+      args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.transforms.external_test --expansion_service_jar=${project(":beam-runners-core-construction-java:").buildTestExpansionServiceJar.archivePath}"
     }
   }
 }