You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/11/03 01:25:26 UTC
[2/4] beam git commit: Add a runners/java-fn-execution module
Add a runners/java-fn-execution module
This contains libraries for runner authors to create Fn API services and
RPCs.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/927a8db1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/927a8db1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/927a8db1
Branch: refs/heads/master
Commit: 927a8db1397bc43c6cb253d6ca856afdbfa472a3
Parents: fdd5971
Author: Thomas Groh <tg...@google.com>
Authored: Wed Oct 11 18:47:51 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Nov 2 18:25:12 2017 -0700
----------------------------------------------------------------------
pom.xml | 6 +
runners/java-fn-execution/pom.xml | 105 +++++++++++++++
.../beam/runners/fnexecution/ServerFactory.java | 104 +++++++++++++++
.../beam/runners/fnexecution/package-info.java | 23 ++++
.../runners/fnexecution/ServerFactoryTest.java | 128 +++++++++++++++++++
runners/pom.xml | 1 +
6 files changed, 367 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/927a8db1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f1eee91..b2ab5d7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -684,6 +684,12 @@
<dependency>
<groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-java-fn-execution</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
<artifactId>beam-runners-reference-job-orchestrator</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/927a8db1/runners/java-fn-execution/pom.xml
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/pom.xml b/runners/java-fn-execution/pom.xml
new file mode 100644
index 0000000..bd4fcf0
--- /dev/null
+++ b/runners/java-fn-execution/pom.xml
@@ -0,0 +1,105 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-parent</artifactId>
+ <version>2.3.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>beam-runners-java-fn-execution</artifactId>
+
+ <name>Apache Beam :: Runners :: Java Fn Execution</name>
+
+ <packaging>jar</packaging>
+
+ <build>
+ <plugins>
+ <plugin>
+ <!-- Override Beam parent to allow Java8 -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-model-pipeline</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-model-fn-execution</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-fn-execution</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-fn-execution</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/927a8db1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
new file mode 100644
index 0000000..918672a
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.net.HostAndPort;
+import io.grpc.BindableService;
+import io.grpc.Server;
+import io.grpc.netty.NettyServerBuilder;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import org.apache.beam.harness.channel.SocketAddressFactory;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+
+/**
+ * A {@link Server gRPC server} factory.
+ */
+public abstract class ServerFactory {
+ /**
+ * Create a default {@link ServerFactory}.
+ */
+ public static ServerFactory createDefault() {
+ return new InetSocketAddressServerFactory();
+ }
+
+ /**
+ * Creates an instance of this server using an ephemeral port chosen automatically. The chosen
+ * port is accessible to the caller from the URL set in the input {@link
+ * Endpoints.ApiServiceDescriptor.Builder}.
+ */
+ public abstract Server allocatePortAndCreate(
+ BindableService service, Endpoints.ApiServiceDescriptor.Builder builder) throws IOException;
+
+ /**
+ * Creates an instance of this server at the address specified by the given service descriptor.
+ */
+ public abstract Server create(
+ BindableService service, Endpoints.ApiServiceDescriptor serviceDescriptor) throws IOException;
+
+ /**
+ * Creates a {@link Server gRPC Server} using the default server factory.
+ *
+ * <p>The server is created listening any open port on "localhost".
+ */
+ public static class InetSocketAddressServerFactory extends ServerFactory {
+ private InetSocketAddressServerFactory() {}
+
+ @Override
+ public Server allocatePortAndCreate(
+ BindableService service, Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor)
+ throws IOException {
+ InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
+ Server server = createServer(service, address);
+ apiServiceDescriptor.setUrl(
+ HostAndPort.fromParts(address.getHostName(), server.getPort()).toString());
+ return server;
+ }
+
+ @Override
+ public Server create(BindableService service, Endpoints.ApiServiceDescriptor serviceDescriptor)
+ throws IOException {
+ SocketAddress socketAddress = SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
+ checkArgument(
+ socketAddress instanceof InetSocketAddress,
+ "%s %s requires a host:port socket address, got %s",
+ getClass().getSimpleName(), ServerFactory.class.getSimpleName(),
+ serviceDescriptor.getUrl());
+ return createServer(service, (InetSocketAddress) socketAddress);
+ }
+
+ private static Server createServer(BindableService service, InetSocketAddress socket)
+ throws IOException {
+ Server server =
+ NettyServerBuilder.forPort(socket.getPort())
+ .addService(service)
+ // Set the message size to max value here. The actual size is governed by the
+ // buffer size in the layers above.
+ .maxMessageSize(Integer.MAX_VALUE)
+ .build();
+ server.start();
+ return server;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/927a8db1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/package-info.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/package-info.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/package-info.java
new file mode 100644
index 0000000..bc36f5e
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utilities used by runners to interact with the fn execution components of the Beam Portability
+ * Framework.
+ */
+package org.apache.beam.runners.fnexecution;
http://git-wip-us.apache.org/repos/asf/beam/blob/927a8db1/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
new file mode 100644
index 0000000..aa8d246
--- /dev/null
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution;
+
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.net.HostAndPort;
+import com.google.common.util.concurrent.Uninterruptibles;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.beam.harness.channel.ManagedChannelFactory;
+import org.apache.beam.harness.test.TestStreams;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
+import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.junit.Test;
+
+/**
+ * Tests for {@link ServerFactory}.
+ */
+public class ServerFactoryTest {
+
+ private static final BeamFnApi.Elements CLIENT_DATA = BeamFnApi.Elements.newBuilder()
+ .addData(BeamFnApi.Elements.Data.newBuilder().setInstructionReference("1"))
+ .build();
+ private static final BeamFnApi.Elements SERVER_DATA = BeamFnApi.Elements.newBuilder()
+ .addData(BeamFnApi.Elements.Data.newBuilder().setInstructionReference("1"))
+ .build();
+
+ @Test
+ public void testCreatingDefaultServer() throws Exception {
+ Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+ runTestUsing(ServerFactory.createDefault(), ManagedChannelFactory.createDefault());
+ HostAndPort hostAndPort = HostAndPort.fromString(apiServiceDescriptor.getUrl());
+ assertThat(hostAndPort.getHost(), anyOf(
+ equalTo(InetAddress.getLoopbackAddress().getHostName()),
+ equalTo(InetAddress.getLoopbackAddress().getHostAddress())));
+ assertThat(hostAndPort.getPort(), allOf(greaterThan(0), lessThan(65536)));
+ }
+
+ private Endpoints.ApiServiceDescriptor runTestUsing(
+ ServerFactory serverFactory, ManagedChannelFactory channelFactory) throws Exception {
+ Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder =
+ Endpoints.ApiServiceDescriptor.newBuilder();
+
+ Collection<Elements> serverElements = new ArrayList<>();
+ CountDownLatch clientHangedUp = new CountDownLatch(1);
+ CallStreamObserver<Elements> serverInboundObserver =
+ TestStreams.withOnNext(serverElements::add)
+ .withOnCompleted(clientHangedUp::countDown)
+ .build();
+ TestDataService service = new TestDataService(serverInboundObserver);
+ Server server = serverFactory.allocatePortAndCreate(service, apiServiceDescriptorBuilder);
+ assertFalse(server.isShutdown());
+
+ ManagedChannel channel = channelFactory.forDescriptor(apiServiceDescriptorBuilder.build());
+ BeamFnDataGrpc.BeamFnDataStub stub = BeamFnDataGrpc.newStub(channel);
+ Collection<BeamFnApi.Elements> clientElements = new ArrayList<>();
+ CountDownLatch serverHangedUp = new CountDownLatch(1);
+ CallStreamObserver<BeamFnApi.Elements> clientInboundObserver =
+ TestStreams.withOnNext(clientElements::add)
+ .withOnCompleted(serverHangedUp::countDown)
+ .build();
+
+ StreamObserver<Elements> clientOutboundObserver = stub.data(clientInboundObserver);
+ StreamObserver<BeamFnApi.Elements> serverOutboundObserver = service.outboundObservers.take();
+
+ clientOutboundObserver.onNext(CLIENT_DATA);
+ serverOutboundObserver.onNext(SERVER_DATA);
+ clientOutboundObserver.onCompleted();
+ clientHangedUp.await();
+ serverOutboundObserver.onCompleted();
+ serverHangedUp.await();
+
+ assertThat(clientElements, contains(SERVER_DATA));
+ assertThat(serverElements, contains(CLIENT_DATA));
+
+ return apiServiceDescriptorBuilder.build();
+ }
+
+ /** A test gRPC service that uses the provided inbound observer for all clients. */
+ private static class TestDataService extends BeamFnDataGrpc.BeamFnDataImplBase {
+ private final LinkedBlockingQueue<StreamObserver<BeamFnApi.Elements>> outboundObservers;
+ private final StreamObserver<BeamFnApi.Elements> inboundObserver;
+ private TestDataService(StreamObserver<BeamFnApi.Elements> inboundObserver) {
+ this.inboundObserver = inboundObserver;
+ this.outboundObservers = new LinkedBlockingQueue<>();
+ }
+
+ @Override
+ public StreamObserver<BeamFnApi.Elements> data(
+ StreamObserver<BeamFnApi.Elements> outboundObserver) {
+ Uninterruptibles.putUninterruptibly(outboundObservers, outboundObserver);
+ return inboundObserver;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/927a8db1/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index 164d1b3..df3faa9 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -63,6 +63,7 @@
<jdk>[1.8,)</jdk>
</activation>
<modules>
+ <module>java-fn-execution</module>
<module>gearpump</module>
</modules>
</profile>