You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/11/03 01:25:26 UTC

[2/4] beam git commit: Add a runners/java-fn-execution module

Add a runners/java-fn-execution module

This contains libraries for runner authors to create Fn API services and
RPCs.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/927a8db1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/927a8db1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/927a8db1

Branch: refs/heads/master
Commit: 927a8db1397bc43c6cb253d6ca856afdbfa472a3
Parents: fdd5971
Author: Thomas Groh <tg...@google.com>
Authored: Wed Oct 11 18:47:51 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Nov 2 18:25:12 2017 -0700

----------------------------------------------------------------------
 pom.xml                                         |   6 +
 runners/java-fn-execution/pom.xml               | 105 +++++++++++++++
 .../beam/runners/fnexecution/ServerFactory.java | 104 +++++++++++++++
 .../beam/runners/fnexecution/package-info.java  |  23 ++++
 .../runners/fnexecution/ServerFactoryTest.java  | 128 +++++++++++++++++++
 runners/pom.xml                                 |   1 +
 6 files changed, 367 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/927a8db1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f1eee91..b2ab5d7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -684,6 +684,12 @@
 
       <dependency>
         <groupId>org.apache.beam</groupId>
+        <artifactId>beam-runners-java-fn-execution</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.beam</groupId>
         <artifactId>beam-runners-reference-job-orchestrator</artifactId>
         <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/927a8db1/runners/java-fn-execution/pom.xml
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/pom.xml b/runners/java-fn-execution/pom.xml
new file mode 100644
index 0000000..bd4fcf0
--- /dev/null
+++ b/runners/java-fn-execution/pom.xml
@@ -0,0 +1,105 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-runners-parent</artifactId>
+    <version>2.3.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-runners-java-fn-execution</artifactId>
+
+  <name>Apache Beam :: Runners :: Java Fn Execution</name>
+
+  <packaging>jar</packaging>
+
+  <build>
+    <plugins>
+      <plugin>
+        <!--  Override Beam parent to allow Java8 -->
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.8</source>
+          <target>1.8</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-model-pipeline</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-model-fn-execution</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-fn-execution</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-stub</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-netty</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-fn-execution</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/927a8db1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
new file mode 100644
index 0000000..918672a
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.net.HostAndPort;
+import io.grpc.BindableService;
+import io.grpc.Server;
+import io.grpc.netty.NettyServerBuilder;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import org.apache.beam.harness.channel.SocketAddressFactory;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+
+/**
+ * A {@link Server gRPC server} factory.
+ */
+public abstract class ServerFactory {
+  /**
+   * Create a default {@link ServerFactory}.
+   */
+  public static ServerFactory createDefault() {
+    return new InetSocketAddressServerFactory();
+  }
+
+  /**
+   * Creates an instance of this server using an ephemeral port chosen automatically. The chosen
+   * port is accessible to the caller from the URL set in the input {@link
+   * Endpoints.ApiServiceDescriptor.Builder}.
+   */
+  public abstract Server allocatePortAndCreate(
+      BindableService service, Endpoints.ApiServiceDescriptor.Builder builder) throws IOException;
+
+  /**
+   * Creates an instance of this server at the address specified by the given service descriptor.
+   */
+  public abstract Server create(
+      BindableService service, Endpoints.ApiServiceDescriptor serviceDescriptor) throws IOException;
+
+  /**
+   * Creates a {@link Server gRPC Server} using the default server factory.
+   *
+   * <p>The server is created listening any open port on "localhost".
+   */
+  public static class InetSocketAddressServerFactory extends ServerFactory {
+    private InetSocketAddressServerFactory() {}
+
+    @Override
+    public Server allocatePortAndCreate(
+        BindableService service, Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor)
+        throws IOException {
+      InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
+      Server server = createServer(service, address);
+      apiServiceDescriptor.setUrl(
+          HostAndPort.fromParts(address.getHostName(), server.getPort()).toString());
+      return server;
+    }
+
+    @Override
+    public Server create(BindableService service, Endpoints.ApiServiceDescriptor serviceDescriptor)
+        throws IOException {
+      SocketAddress socketAddress = SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
+      checkArgument(
+          socketAddress instanceof InetSocketAddress,
+          "%s %s requires a host:port socket address, got %s",
+          getClass().getSimpleName(), ServerFactory.class.getSimpleName(),
+          serviceDescriptor.getUrl());
+      return createServer(service, (InetSocketAddress) socketAddress);
+    }
+
+    private static Server createServer(BindableService service, InetSocketAddress socket)
+        throws IOException {
+      Server server =
+          NettyServerBuilder.forPort(socket.getPort())
+              .addService(service)
+              // Set the message size to max value here. The actual size is governed by the
+              // buffer size in the layers above.
+              .maxMessageSize(Integer.MAX_VALUE)
+              .build();
+      server.start();
+      return server;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/927a8db1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/package-info.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/package-info.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/package-info.java
new file mode 100644
index 0000000..bc36f5e
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utilities used by runners to interact with the fn execution components of the Beam Portability
+ * Framework.
+ */
+package org.apache.beam.runners.fnexecution;

http://git-wip-us.apache.org/repos/asf/beam/blob/927a8db1/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
new file mode 100644
index 0000000..aa8d246
--- /dev/null
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution;
+
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.net.HostAndPort;
+import com.google.common.util.concurrent.Uninterruptibles;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.beam.harness.channel.ManagedChannelFactory;
+import org.apache.beam.harness.test.TestStreams;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
+import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.junit.Test;
+
+/**
+ * Tests for {@link ServerFactory}.
+ */
+public class ServerFactoryTest {
+
+  private static final BeamFnApi.Elements CLIENT_DATA = BeamFnApi.Elements.newBuilder()
+      .addData(BeamFnApi.Elements.Data.newBuilder().setInstructionReference("1"))
+      .build();
+  private static final BeamFnApi.Elements SERVER_DATA = BeamFnApi.Elements.newBuilder()
+      .addData(BeamFnApi.Elements.Data.newBuilder().setInstructionReference("1"))
+      .build();
+
+  @Test
+  public void testCreatingDefaultServer() throws Exception {
+    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+        runTestUsing(ServerFactory.createDefault(), ManagedChannelFactory.createDefault());
+    HostAndPort hostAndPort = HostAndPort.fromString(apiServiceDescriptor.getUrl());
+    assertThat(hostAndPort.getHost(), anyOf(
+        equalTo(InetAddress.getLoopbackAddress().getHostName()),
+        equalTo(InetAddress.getLoopbackAddress().getHostAddress())));
+    assertThat(hostAndPort.getPort(), allOf(greaterThan(0), lessThan(65536)));
+  }
+
+  private Endpoints.ApiServiceDescriptor runTestUsing(
+      ServerFactory serverFactory, ManagedChannelFactory channelFactory) throws Exception {
+    Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder =
+        Endpoints.ApiServiceDescriptor.newBuilder();
+
+    Collection<Elements> serverElements = new ArrayList<>();
+    CountDownLatch clientHangedUp = new CountDownLatch(1);
+    CallStreamObserver<Elements> serverInboundObserver =
+        TestStreams.withOnNext(serverElements::add)
+        .withOnCompleted(clientHangedUp::countDown)
+        .build();
+    TestDataService service = new TestDataService(serverInboundObserver);
+    Server server = serverFactory.allocatePortAndCreate(service, apiServiceDescriptorBuilder);
+    assertFalse(server.isShutdown());
+
+    ManagedChannel channel = channelFactory.forDescriptor(apiServiceDescriptorBuilder.build());
+    BeamFnDataGrpc.BeamFnDataStub stub = BeamFnDataGrpc.newStub(channel);
+    Collection<BeamFnApi.Elements> clientElements = new ArrayList<>();
+    CountDownLatch serverHangedUp = new CountDownLatch(1);
+    CallStreamObserver<BeamFnApi.Elements> clientInboundObserver =
+        TestStreams.withOnNext(clientElements::add)
+        .withOnCompleted(serverHangedUp::countDown)
+        .build();
+
+    StreamObserver<Elements> clientOutboundObserver = stub.data(clientInboundObserver);
+    StreamObserver<BeamFnApi.Elements> serverOutboundObserver = service.outboundObservers.take();
+
+    clientOutboundObserver.onNext(CLIENT_DATA);
+    serverOutboundObserver.onNext(SERVER_DATA);
+    clientOutboundObserver.onCompleted();
+    clientHangedUp.await();
+    serverOutboundObserver.onCompleted();
+    serverHangedUp.await();
+
+    assertThat(clientElements, contains(SERVER_DATA));
+    assertThat(serverElements, contains(CLIENT_DATA));
+
+    return apiServiceDescriptorBuilder.build();
+  }
+
+  /** A test gRPC service that uses the provided inbound observer for all clients. */
+  private static class TestDataService extends BeamFnDataGrpc.BeamFnDataImplBase {
+    private final LinkedBlockingQueue<StreamObserver<BeamFnApi.Elements>> outboundObservers;
+    private final StreamObserver<BeamFnApi.Elements> inboundObserver;
+    private TestDataService(StreamObserver<BeamFnApi.Elements> inboundObserver) {
+      this.inboundObserver = inboundObserver;
+      this.outboundObservers = new LinkedBlockingQueue<>();
+    }
+
+    @Override
+    public StreamObserver<BeamFnApi.Elements> data(
+        StreamObserver<BeamFnApi.Elements> outboundObserver) {
+      Uninterruptibles.putUninterruptibly(outboundObservers, outboundObserver);
+      return inboundObserver;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/927a8db1/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index 164d1b3..df3faa9 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -63,6 +63,7 @@
         <jdk>[1.8,)</jdk>
       </activation>
       <modules>
+        <module>java-fn-execution</module>
         <module>gearpump</module>
       </modules>
     </profile>