You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/12/12 02:46:00 UTC

[jira] [Commented] (BEAM-2899) Universal Local Runner

    [ https://issues.apache.org/jira/browse/BEAM-2899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16287004#comment-16287004 ] 

ASF GitHub Bot commented on BEAM-2899:
--------------------------------------

tgroh closed pull request #4197: [BEAM-2899] Port DataBufferingOutboundObserver
URL: https://github.com/apache/beam/pull/4197
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
index 5b47a581804..d51318c9351 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
@@ -25,7 +25,7 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.apache.beam.runners.fnexecution.data.FnDataReceiver;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
 
 /**
  * A high-level client for an SDK harness.
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java
index 1be01a7b851..fcbcea10b86 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java
@@ -20,6 +20,7 @@
 
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.fn.data.LogicalEndpoint;
 import org.apache.beam.sdk.util.WindowedValue;
 
diff --git a/sdks/java/fn-execution/build.gradle b/sdks/java/fn-execution/build.gradle
index 34be858783a..45213a75e5f 100644
--- a/sdks/java/fn-execution/build.gradle
+++ b/sdks/java/fn-execution/build.gradle
@@ -26,6 +26,7 @@ dependencies {
   shadow project(path: ":beam-model-parent:beam-model-pipeline", configuration: "shadow")
   shadow project(path: ":beam-model-parent:beam-model-fn-execution", configuration: "shadow")
   shadow project(path: ":beam-sdks-parent:beam-sdks-java-parent:beam-sdks-java-core", configuration: "shadow")
+  shadow library.java.slf4j_api
   shadow library.java.grpc_core
   shadow library.java.grpc_stub
   shadow library.java.grpc_netty
diff --git a/sdks/java/fn-execution/pom.xml b/sdks/java/fn-execution/pom.xml
index 6e32e032bc6..82fc3ec7ce9 100644
--- a/sdks/java/fn-execution/pom.xml
+++ b/sdks/java/fn-execution/pom.xml
@@ -45,6 +45,8 @@
       <artifactId>beam-model-fn-execution</artifactId>
     </dependency>
 
+      <!-- The Core SDK is used for utility code and concepts shared between runner and SDK. It
+      should not be used to refer to any user-defined functions. -->
     <dependency>
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-core</artifactId>
@@ -81,6 +83,11 @@
       <artifactId>guava</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
     <!-- Build dependencies -->
     <dependency>
       <groupId>com.google.auto.value</groupId>
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
new file mode 100644
index 00000000000..21c73509511
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A buffering outbound {@link FnDataReceiver} for the Beam Fn Data API.
+ *
+ * <p>Encodes individually consumed elements with the provided {@link Coder} producing
+ * a single {@link BeamFnApi.Elements} message when the buffer threshold
+ * is surpassed.
+ *
+ * <p>The default buffer threshold can be overridden by specifying the experiment
+ * {@code beam_fn_api_data_buffer_limit=<bytes>}
+ *
+ * <p>TODO: Handle outputting large elements (&gt; 2GiBs). Note that this also applies to the
+ * input side as well.
+ *
+ * <p>TODO: Handle outputting elements that are zero bytes by outputting a single byte as
+ * a marker, detect on the input side that no bytes were read and force reading a single byte.
+ */
+public class BeamFnDataBufferingOutboundObserver<T>
+    implements FnDataReceiver<WindowedValue<T>> {
+  // TODO: Consider moving this constant out of this class
+  public static final String BEAM_FN_API_DATA_BUFFER_LIMIT = "beam_fn_api_data_buffer_limit=";
+  @VisibleForTesting
+  static final int DEFAULT_BUFFER_LIMIT_BYTES = 1_000_000;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BeamFnDataBufferingOutboundObserver.class);
+
+  public static <T> BeamFnDataBufferingOutboundObserver<T> forLocation(
+      LogicalEndpoint endpoint,
+      Coder<WindowedValue<T>> coder,
+      StreamObserver<BeamFnApi.Elements> outboundObserver) {
+    return forLocationWithBufferLimit(
+        DEFAULT_BUFFER_LIMIT_BYTES, endpoint, coder, outboundObserver);
+  }
+
+  public static <T> BeamFnDataBufferingOutboundObserver<T> forLocationWithBufferLimit(
+      int bufferLimit,
+      LogicalEndpoint endpoint,
+      Coder<WindowedValue<T>> coder,
+      StreamObserver<BeamFnApi.Elements> outboundObserver) {
+    return new BeamFnDataBufferingOutboundObserver<>(
+        bufferLimit, endpoint, coder, outboundObserver);
+  }
+
+  private long byteCounter;
+  private long counter;
+  private final int bufferLimit;
+  private final Coder<WindowedValue<T>> coder;
+  private final LogicalEndpoint outputLocation;
+  private final StreamObserver<BeamFnApi.Elements> outboundObserver;
+  private final ByteString.Output bufferedElements;
+
+  private BeamFnDataBufferingOutboundObserver(
+      int bufferLimit,
+      LogicalEndpoint outputLocation,
+      Coder<WindowedValue<T>> coder,
+      StreamObserver<BeamFnApi.Elements> outboundObserver) {
+    this.bufferLimit = bufferLimit;
+    this.outputLocation = outputLocation;
+    this.coder = coder;
+    this.outboundObserver = outboundObserver;
+    this.bufferedElements = ByteString.newOutput();
+  }
+
+  @Override
+  public void close() throws Exception {
+    BeamFnApi.Elements.Builder elements = convertBufferForTransmission();
+    // This will add an empty data block representing the end of stream.
+    elements.addDataBuilder()
+        .setInstructionReference(outputLocation.getInstructionId())
+        .setTarget(outputLocation.getTarget());
+
+    LOG.debug("Closing stream for instruction {} and "
+        + "target {} having transmitted {} values {} bytes",
+        outputLocation.getInstructionId(),
+        outputLocation.getTarget(),
+        counter,
+        byteCounter);
+    outboundObserver.onNext(elements.build());
+  }
+
+  @Override
+  public void accept(WindowedValue<T> t) throws IOException {
+    coder.encode(t, bufferedElements);
+    counter += 1;
+    if (bufferedElements.size() >= bufferLimit) {
+      outboundObserver.onNext(convertBufferForTransmission().build());
+    }
+  }
+
+  private BeamFnApi.Elements.Builder convertBufferForTransmission() {
+    BeamFnApi.Elements.Builder elements = BeamFnApi.Elements.newBuilder();
+    if (bufferedElements.size() == 0) {
+      return elements;
+    }
+
+    elements.addDataBuilder()
+        .setInstructionReference(outputLocation.getInstructionId())
+        .setTarget(outputLocation.getTarget())
+        .setData(bufferedElements.toByteString());
+
+    byteCounter += bufferedElements.size();
+    bufferedElements.reset();
+    return elements;
+  }
+}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataReceiver.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/FnDataReceiver.java
similarity index 75%
rename from runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataReceiver.java
rename to sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/FnDataReceiver.java
index 5573d94f769..4f568429dfa 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataReceiver.java
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/FnDataReceiver.java
@@ -15,13 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.fnexecution.data;
-
-import java.io.Closeable;
+package org.apache.beam.sdk.fn.data;
 
 /**
  * A receiver of streamed data.
+ *
+ * <p>A {@link FnDataReceiver} should have an idempotent {@link #close()} method.
  */
-public interface FnDataReceiver<T> extends Closeable {
+public interface FnDataReceiver<T> extends AutoCloseable {
   void accept(T input) throws Exception;
+
+  /**
+   * {@inheritDoc}.
+   *
+   * <p>{@link #close()} must be idempotent.
+   */
+  void close() throws Exception;
 }
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserverTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserverTest.java
new file mode 100644
index 00000000000..0f92a7cd874
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserverTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.Iterables;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.fn.test.Consumer;
+import org.apache.beam.sdk.fn.test.TestStreams;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link BeamFnDataBufferingOutboundObserver}. */
+@RunWith(JUnit4.class)
+public class BeamFnDataBufferingOutboundObserverTest {
+  private static final LogicalEndpoint OUTPUT_LOCATION =
+      LogicalEndpoint.of(
+          "777L",
+          Target.newBuilder()
+              .setPrimitiveTransformReference("555L")
+              .setName("Test")
+              .build());
+  private static final Coder<WindowedValue<byte[]>> CODER =
+      LengthPrefixCoder.of(WindowedValue.getValueOnlyCoder(ByteArrayCoder.of()));
+
+  @Test
+  public void testWithDefaultBuffer() throws Exception {
+    final Collection<BeamFnApi.Elements> values = new ArrayList<>();
+    final AtomicBoolean onCompletedWasCalled = new AtomicBoolean();
+    FnDataReceiver<WindowedValue<byte[]>> consumer =
+        BeamFnDataBufferingOutboundObserver.forLocation(
+            OUTPUT_LOCATION,
+            CODER,
+            TestStreams.withOnNext(addToValuesConsumer(values))
+                .withOnCompleted(setBooleanToTrue(onCompletedWasCalled))
+                .build());
+
+    // Test that nothing is emitted till the default buffer size is surpassed.
+    consumer.accept(
+        valueInGlobalWindow(
+            new byte[BeamFnDataBufferingOutboundObserver.DEFAULT_BUFFER_LIMIT_BYTES - 50]));
+    assertThat(values, empty());
+
+    // Test that when we cross the buffer, we emit.
+    consumer.accept(valueInGlobalWindow(new byte[50]));
+    assertEquals(
+        messageWithData(
+            new byte[BeamFnDataBufferingOutboundObserver.DEFAULT_BUFFER_LIMIT_BYTES - 50],
+            new byte[50]),
+        Iterables.get(values, 0));
+
+    // Test that nothing is emitted till the default buffer size is surpassed after a reset
+    consumer.accept(
+        valueInGlobalWindow(
+            new byte[BeamFnDataBufferingOutboundObserver.DEFAULT_BUFFER_LIMIT_BYTES - 50]));
+    assertEquals(1, values.size());
+
+    // Test that when we cross the buffer, we emit.
+    consumer.accept(valueInGlobalWindow(new byte[50]));
+    assertEquals(
+        messageWithData(
+            new byte[BeamFnDataBufferingOutboundObserver.DEFAULT_BUFFER_LIMIT_BYTES - 50],
+            new byte[50]),
+        Iterables.get(values, 1));
+
+    // Test that when we close with an empty buffer we only have one end of stream
+    consumer.close();
+    assertEquals(messageWithData(),
+        Iterables.get(values, 2));
+  }
+
+  @Test
+  public void testConfiguredBufferLimit() throws Exception {
+    Collection<BeamFnApi.Elements> values = new ArrayList<>();
+    AtomicBoolean onCompletedWasCalled = new AtomicBoolean();
+    FnDataReceiver<WindowedValue<byte[]>> consumer =
+        BeamFnDataBufferingOutboundObserver.forLocationWithBufferLimit(
+            100,
+            OUTPUT_LOCATION,
+            CODER,
+            TestStreams.withOnNext(addToValuesConsumer(values))
+                .withOnCompleted(setBooleanToTrue(onCompletedWasCalled))
+                .build());
+
+    // Test that nothing is emitted till the default buffer size is surpassed.
+    consumer.accept(valueInGlobalWindow(new byte[51]));
+    assertThat(values, empty());
+
+    // Test that when we cross the buffer, we emit.
+    consumer.accept(valueInGlobalWindow(new byte[49]));
+    assertEquals(
+        messageWithData(new byte[51], new byte[49]),
+        Iterables.get(values, 0));
+
+    // Test that when we close we empty the value, and then the stream terminator as part
+    // of the same message
+    consumer.accept(valueInGlobalWindow(new byte[1]));
+    consumer.close();
+    assertEquals(
+        BeamFnApi.Elements.newBuilder(messageWithData(new byte[1]))
+            .addData(BeamFnApi.Elements.Data.newBuilder()
+                .setInstructionReference(OUTPUT_LOCATION.getInstructionId())
+                .setTarget(OUTPUT_LOCATION.getTarget()))
+            .build(),
+        Iterables.get(values, 1));
+  }
+
+  private static BeamFnApi.Elements messageWithData(byte[] ... datum) throws IOException {
+    ByteString.Output output = ByteString.newOutput();
+    for (byte[] data : datum) {
+      CODER.encode(valueInGlobalWindow(data), output);
+    }
+    return BeamFnApi.Elements.newBuilder()
+        .addData(BeamFnApi.Elements.Data.newBuilder()
+            .setInstructionReference(OUTPUT_LOCATION.getInstructionId())
+            .setTarget(OUTPUT_LOCATION.getTarget())
+            .setData(output.toByteString()))
+        .build();
+  }
+
+  private Consumer<Elements> addToValuesConsumer(final Collection<Elements> values) {
+    return new Consumer<Elements>() {
+      @Override
+      public void accept(Elements item) {
+        values.add(item);
+      }
+    };
+  }
+
+  private Runnable setBooleanToTrue(final AtomicBoolean onCompletedWasCalled) {
+    return new Runnable() {
+      @Override
+      public void run() {
+        onCompletedWasCalled.set(true);
+      }
+    };
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Universal Local Runner
> ----------------------
>
>                 Key: BEAM-2899
>                 URL: https://issues.apache.org/jira/browse/BEAM-2899
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-core
>            Reporter: Henning Rohde
>            Assignee: Thomas Groh
>              Labels: portability
>
> To make the portability effort tractable, we should implement a Universal Local Runner (ULR) in Java that runs in a single server process plus docker containers for the SDK harness containers. It would serve multiple purposes:
>   (1) A reference implementation for other runners. Ideally, any new feature should be implemented in the ULR first.
>   (2) A fully-featured test runner for SDKs who participate in the portability framework. It thus complements the direct runners.
>   (3) A test runner for user code that depends on or customizes the runtime environment. For example, a DoFn that shells out has a dependency that may be satisfied on the user's desktop (and thus works fine on the direct runner), but perhaps not by the container harness image. The ULR allows for an easy way to find out.
> The Java direct runner presumably has lots of pieces that can be reused.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)