You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2019/03/05 08:45:51 UTC
[beam] branch master updated: [BEAM-6731] Move ExpansionService to
core-construction-java
This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a803102 [BEAM-6731] Move ExpansionService to core-construction-java
new ee96f66 Merge pull request #7981: [BEAM-6731] Move ExpansionService to core-construction-java
a803102 is described below
commit a80310223cb61f8fba5bf8cf317521f013f6b4e1
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Mon Mar 4 14:04:00 2019 +0100
[BEAM-6731] Move ExpansionService to core-construction-java
This moves the previously relocated ExpansionServer back to
core-construction-java.
---
runners/core-construction-java/build.gradle | 26 +++++++
.../construction/expansion/ExpansionServer.java | 81 ++++++++++++++++++++++
.../construction}/expansion/ExpansionService.java | 5 +-
.../core/construction}/expansion/package-info.java | 2 +-
.../expansion/ExpansionServiceTest.java | 4 +-
.../expansion/TestExpansionService.java | 6 +-
.../beam/runners/flink/FlinkJobServerDriver.java | 14 ++--
runners/java-fn-execution/build.gradle | 27 --------
.../fnexecution/jobsubmission/JobServerDriver.java | 42 ++++-------
sdks/python/build.gradle | 8 +--
10 files changed, 139 insertions(+), 76 deletions(-)
diff --git a/runners/core-construction-java/build.gradle b/runners/core-construction-java/build.gradle
index 563c0cf..d1b1825 100644
--- a/runners/core-construction-java/build.gradle
+++ b/runners/core-construction-java/build.gradle
@@ -51,3 +51,29 @@ dependencies {
shadowTest project(path: ":beam-model-fn-execution", configuration: "shadow")
}
+task runExpansionService (type: JavaExec) {
+ main = "org.apache.beam.runners.core.construction.expansion.ExpansionService"
+ classpath = sourceSets.main.runtimeClasspath
+ args = [project.findProperty("constructionService.port") ?: "8097"]
+}
+
+task runTestExpansionService (type: JavaExec) {
+ main = "org.apache.beam.runners.core.construction.expansion.TestExpansionService"
+ classpath = sourceSets.test.runtimeClasspath
+ args = [project.findProperty("constructionService.port") ?: "8097"]
+}
+
+task buildTestExpansionServiceJar(type: Jar) {
+ dependsOn = [shadowJar, shadowTestJar]
+ appendix = "testExpansionService"
+ // Use zip64 mode to avoid "Archive contains more than 65535 entries".
+ zip64 = true
+ manifest {
+ attributes(
+ 'Main-Class': 'org.apache.beam.runners.core.construction.expansion.TestExpansionService'
+ )
+ }
+ from { configurations.testRuntime.collect { it.isDirectory() ? it : zipTree(it) }}
+ from sourceSets.main.output
+ from sourceSets.test.output
+}
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionServer.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionServer.java
new file mode 100644
index 0000000..d152e76
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionServer.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.expansion;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Server;
+import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.netty.NettyServerBuilder;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
+
+/** A {@link Server gRPC Server} for an ExpansionService. */
+public class ExpansionServer implements AutoCloseable {
+ /**
+ * Create a {@link ExpansionServer} for the provided ExpansionService running on an arbitrary
+ * port.
+ *
+ * <p>If port is 0, a free port will be assigned.
+ */
+ public static ExpansionServer create(ExpansionService service, String host, int port)
+ throws IOException {
+ return new ExpansionServer(service, host, port);
+ }
+
+ private final String host;
+ private final Server server;
+ private final ExpansionService service;
+
+ private ExpansionServer(ExpansionService service, String host, int port) throws IOException {
+ this.service = Preconditions.checkNotNull(service);
+ this.host = Preconditions.checkNotNull(host);
+ this.server =
+ NettyServerBuilder.forAddress(new InetSocketAddress(host, port))
+ .addService(service)
+ .build()
+ .start();
+ }
+
+ /** Get the ExpansionService exposed by this {@link ExpansionServer}. */
+ public ExpansionService getService() {
+ return service;
+ }
+
+ /** Get the host that this {@link ExpansionServer} is bound to. */
+ public String getHost() {
+ return host;
+ }
+
+ /** Get the port that this {@link ExpansionServer} is bound to. */
+ public int getPort() {
+ return server.getPort();
+ }
+
+ @Override
+ public void close() throws Exception {
+ try {
+ // The server has been closed, and should not respond to any new incoming calls.
+ server.shutdown();
+ service.close();
+ server.awaitTermination(60, TimeUnit.SECONDS);
+ } finally {
+ server.shutdownNow();
+ server.awaitTermination();
+ }
+ }
+}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/expansion/ExpansionService.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java
similarity index 98%
rename from runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/expansion/ExpansionService.java
rename to runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java
index 7c0b988..a49e8ad 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/expansion/ExpansionService.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.fnexecution.expansion;
+package org.apache.beam.runners.core.construction.expansion;
import com.google.auto.service.AutoService;
import java.io.IOException;
@@ -32,7 +32,6 @@ import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.SdkComponents;
-import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
@@ -52,7 +51,7 @@ import org.slf4j.LoggerFactory;
/** A service that allows pipeline expand transforms from a remote SDK. */
public class ExpansionService extends ExpansionServiceGrpc.ExpansionServiceImplBase
- implements FnService {
+ implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(ExpansionService.class);
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/expansion/package-info.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/package-info.java
similarity index 93%
rename from runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/expansion/package-info.java
rename to runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/package-info.java
index 1d5625c..6006092 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/expansion/package-info.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/package-info.java
@@ -17,4 +17,4 @@
*/
/** Classes used to expand cross-language transforms. */
-package org.apache.beam.runners.fnexecution.expansion;
+package org.apache.beam.runners.core.construction.expansion;
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/expansion/ExpansionServiceTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/ExpansionServiceTest.java
similarity index 96%
rename from runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/expansion/ExpansionServiceTest.java
rename to runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/ExpansionServiceTest.java
index a108c77..100a4e4 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/expansion/ExpansionServiceTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/ExpansionServiceTest.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.fnexecution.expansion;
+package org.apache.beam.runners.core.construction.expansion;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
@@ -35,7 +35,7 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.junit.Test;
-/** Tests for {@link org.apache.beam.runners.fnexecution.expansion.ExpansionService}. */
+/** Tests for {@link ExpansionService}. */
public class ExpansionServiceTest {
private static final String TEST_URN = "test:beam:transforms:count";
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/expansion/TestExpansionService.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/TestExpansionService.java
similarity index 93%
rename from runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/expansion/TestExpansionService.java
rename to runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/TestExpansionService.java
index 5207c80..3ec867a 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/expansion/TestExpansionService.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/TestExpansionService.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.fnexecution.expansion;
+package org.apache.beam.runners.core.construction.expansion;
import com.google.auto.service.AutoService;
import java.util.Map;
@@ -25,7 +25,9 @@ import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ServerBuilder;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
-/** An {@link ExpansionService} useful for tests. */
+/**
+ * An {@link org.apache.beam.runners.core.construction.expansion.ExpansionService} useful for tests.
+ */
public class TestExpansionService {
private static final String TEST_COUNT_URN = "pytest:beam:transforms:count";
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
index 0b4689d..2f3f981 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
@@ -89,25 +89,21 @@ public class FlinkJobServerDriver extends JobServerDriver {
return create(
configuration,
createJobServerFactory(configuration),
- createArtifactServerFactory(configuration),
- createExpansionServerFactory(configuration));
+ createArtifactServerFactory(configuration));
}
public static FlinkJobServerDriver create(
FlinkServerConfiguration configuration,
ServerFactory jobServerFactory,
- ServerFactory artifactServerFactory,
- ServerFactory expansionServerFactory) {
- return new FlinkJobServerDriver(
- configuration, jobServerFactory, artifactServerFactory, expansionServerFactory);
+ ServerFactory artifactServerFactory) {
+ return new FlinkJobServerDriver(configuration, jobServerFactory, artifactServerFactory);
}
private FlinkJobServerDriver(
FlinkServerConfiguration configuration,
ServerFactory jobServerFactory,
- ServerFactory artifactServerFactory,
- ServerFactory expansionServerFactory) {
- super(configuration, jobServerFactory, artifactServerFactory, expansionServerFactory);
+ ServerFactory artifactServerFactory) {
+ super(configuration, jobServerFactory, artifactServerFactory);
}
@Override
diff --git a/runners/java-fn-execution/build.gradle b/runners/java-fn-execution/build.gradle
index 5d48782..7c5cb14 100644
--- a/runners/java-fn-execution/build.gradle
+++ b/runners/java-fn-execution/build.gradle
@@ -58,30 +58,3 @@ task testDocker(type: Test) {
includeCategories "org.apache.beam.runners.fnexecution.environment.testing.NeedsDocker"
}
}
-
-task runExpansionService (type: JavaExec) {
- main = "org.apache.beam.runners.fnexecution.expansion.ExpansionService"
- classpath = sourceSets.main.runtimeClasspath
- args = [project.findProperty("constructionService.port") ?: "8097"]
-}
-
-task runTestExpansionService (type: JavaExec) {
- main = "org.apache.beam.runners.fnexecution.expansion.TestExpansionService"
- classpath = sourceSets.test.runtimeClasspath
- args = [project.findProperty("constructionService.port") ?: "8097"]
-}
-
-task buildTestExpansionServiceJar(type: Jar) {
- dependsOn = [shadowJar, shadowTestJar]
- appendix = "testExpansionService"
- // Use zip64 mode to avoid "Archive contains more than 65535 entries".
- zip64 = true
- manifest {
- attributes(
- 'Main-Class': 'org.apache.beam.runners.fnexecution.expansion.TestExpansionService'
- )
- }
- from { configurations.testRuntime.collect { it.isDirectory() ? it : zipTree(it) }}
- from sourceSets.main.output
- from sourceSets.test.output
-}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
index c4b37da..daa9b56 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
@@ -20,10 +20,11 @@ package org.apache.beam.runners.fnexecution.jobsubmission;
import java.io.IOException;
import java.nio.file.Paths;
import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.runners.core.construction.expansion.ExpansionServer;
+import org.apache.beam.runners.core.construction.expansion.ExpansionService;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
-import org.apache.beam.runners.fnexecution.expansion.ExpansionService;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
@@ -38,11 +39,10 @@ public abstract class JobServerDriver implements Runnable {
private final ServerFactory jobServerFactory;
private final ServerFactory artifactServerFactory;
- private final ServerFactory expansionServerFactory;
private volatile GrpcFnServer<InMemoryJobService> jobServer;
private volatile GrpcFnServer<BeamFileSystemArtifactStagingService> artifactStagingServer;
- private volatile GrpcFnServer<ExpansionService> expansionServer;
+ private volatile ExpansionServer expansionServer;
protected abstract JobInvoker createJobInvoker();
@@ -133,19 +133,13 @@ public abstract class JobServerDriver implements Runnable {
return ServerFactory.createWithPortSupplier(() -> configuration.artifactPort);
}
- protected static ServerFactory createExpansionServerFactory(ServerConfiguration configuration) {
- return ServerFactory.createWithPortSupplier(() -> configuration.expansionPort);
- }
-
protected JobServerDriver(
ServerConfiguration configuration,
ServerFactory jobServerFactory,
- ServerFactory artifactServerFactory,
- ServerFactory expansionServerFactory) {
+ ServerFactory artifactServerFactory) {
this.configuration = configuration;
this.jobServerFactory = jobServerFactory;
this.artifactServerFactory = artifactServerFactory;
- this.expansionServerFactory = expansionServerFactory;
}
// This method is executed by TestPortableRunner via Reflection
@@ -195,7 +189,8 @@ public abstract class JobServerDriver implements Runnable {
if (expansionServer != null) {
try {
expansionServer.close();
- LOG.info("Expansion stopped on {}", expansionServer.getApiServiceDescriptor().getUrl());
+ LOG.info(
+ "Expansion stopped on {}:{}", expansionServer.getHost(), expansionServer.getPort());
expansionServer = null;
} catch (Exception e) {
LOG.error("Error while closing the Expansion Service.", e);
@@ -244,23 +239,14 @@ public abstract class JobServerDriver implements Runnable {
return artifactStagingService;
}
- private GrpcFnServer<ExpansionService> createExpansionService() throws IOException {
- ExpansionService service = new ExpansionService();
- GrpcFnServer<ExpansionService> expansionServiceGrpcFnServer;
- if (configuration.expansionPort == 0) {
- expansionServiceGrpcFnServer =
- GrpcFnServer.allocatePortAndCreateFor(service, expansionServerFactory);
- } else {
- Endpoints.ApiServiceDescriptor descriptor =
- Endpoints.ApiServiceDescriptor.newBuilder()
- .setUrl(configuration.host + ":" + configuration.expansionPort)
- .build();
- expansionServiceGrpcFnServer =
- GrpcFnServer.create(service, descriptor, expansionServerFactory);
- }
+ private ExpansionServer createExpansionService() throws IOException {
+ ExpansionServer expansionServer =
+ ExpansionServer.create(
+ new ExpansionService(), configuration.host, configuration.expansionPort);
LOG.info(
- "Java ExpansionService started on {}",
- expansionServiceGrpcFnServer.getApiServiceDescriptor().getUrl());
- return expansionServiceGrpcFnServer;
+ "Java ExpansionService started on {}:{}",
+ expansionServer.getHost(),
+ expansionServer.getPort());
+ return expansionServer;
}
}
diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle
index adb97ad..0ac62e9 100644
--- a/sdks/python/build.gradle
+++ b/sdks/python/build.gradle
@@ -210,7 +210,7 @@ task portableWordCount {
def portableWordCountTask(name, streaming, crossLanguage) {
tasks.create(name) {
dependsOn = ['installGcpTest']
- mustRunAfter = [':beam-runners-flink_2.11-job-server-container:docker', ':beam-sdks-python-container:docker', ':beam-sdks-java-container:docker', ':beam-runners-java-fn-execution:buildTestExpansionServiceJar']
+ mustRunAfter = [':beam-runners-flink_2.11-job-server-container:docker', ':beam-sdks-python-container:docker', ':beam-sdks-java-container:docker', ':beam-runners-core-construction-java:buildTestExpansionServiceJar']
doLast {
// TODO: Figure out GCS credentials and use real GCS input and output.
def options = [
@@ -227,7 +227,7 @@ def portableWordCountTask(name, streaming, crossLanguage) {
// workaround for local file output in docker container
options += ["--environment_cache_millis=10000"]
if (crossLanguage)
- options += ["--expansion_service_jar=${project(":beam-runners-java-fn-execution:").buildTestExpansionServiceJar.archivePath}"]
+ options += ["--expansion_service_jar=${project(":beam-runners-core-construction-java:").buildTestExpansionServiceJar.archivePath}"]
if (project.hasProperty("jobEndpoint"))
options += ["--job_endpoint=${project.property('jobEndpoint')}"]
def wordcountMain = crossLanguage ? "apache_beam.examples.wordcount_xlang" : "apache_beam.examples.wordcount"
@@ -455,11 +455,11 @@ project.task('createProcessWorker') {
project.task('crossLanguagePythonJava') {
dependsOn 'setupVirtualenv'
dependsOn ':beam-sdks-java-container:docker'
- dependsOn ':beam-runners-java-fn-execution:buildTestExpansionServiceJar'
+ dependsOn ':beam-runners-core-construction-java:buildTestExpansionServiceJar'
doLast {
exec {
executable 'sh'
- args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.transforms.external_test --expansion_service_jar=${project(":beam-runners-java-fn-execution:").buildTestExpansionServiceJar.archivePath}"
+ args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.transforms.external_test --expansion_service_jar=${project(":beam-runners-core-construction-java:").buildTestExpansionServiceJar.archivePath}"
}
}
}