You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/10/13 18:14:46 UTC

[beam] branch master updated: [BEAM-9309] Remove the READ urn/payload from the Java SDK harness.

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 56313cc  [BEAM-9309] Remove the READ urn/payload from the Java SDK harness.
     new 0f7d423  Merge pull request #13074 from lukecwik/beam9309
56313cc is described below

commit 56313cc509be7ec54601a4635a4e9a5506b67647
Author: Luke Cwik <lc...@google.com>
AuthorDate: Mon Oct 12 09:16:13 2020 -0700

    [BEAM-9309] Remove the READ urn/payload from the Java SDK harness.
    
    This only covers part of what is necessary for BEAM-9309.
---
 .../beam/fn/harness/BoundedSourceRunner.java       | 184 ------------------
 .../beam/fn/harness/BoundedSourceRunnerTest.java   | 211 ---------------------
 2 files changed, 395 deletions(-)

diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
deleted file mode 100644
index ee63a6f..0000000
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.fn.harness;
-
-import com.google.auto.service.AutoService;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-import org.apache.beam.fn.harness.control.BundleSplitListener;
-import org.apache.beam.fn.harness.control.ProcessBundleHandler;
-import org.apache.beam.fn.harness.data.BeamFnDataClient;
-import org.apache.beam.fn.harness.data.BeamFnTimerClient;
-import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
-import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
-import org.apache.beam.fn.harness.state.BeamFnStateClient;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Coder;
-import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
-import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
-import org.apache.beam.model.pipeline.v1.RunnerApi.ReadPayload;
-import org.apache.beam.runners.core.construction.PTransformTranslation;
-import org.apache.beam.runners.core.construction.ReadTranslation;
-import org.apache.beam.sdk.fn.data.FnDataReceiver;
-import org.apache.beam.sdk.function.ThrowingRunnable;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Source.Reader;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-
-/**
- * A runner which creates {@link Reader}s for each {@link BoundedSource} sent as an input and
- * executes the {@link Reader}s read loop.
- */
-public class BoundedSourceRunner<InputT extends BoundedSource<OutputT>, OutputT> {
-
-  /** A registrar which provides a factory to handle Java {@link BoundedSource}s. */
-  @AutoService(PTransformRunnerFactory.Registrar.class)
-  public static class Registrar implements PTransformRunnerFactory.Registrar {
-
-    @Override
-    public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
-      return ImmutableMap.of(
-          ProcessBundleHandler.JAVA_SOURCE_URN, new Factory(),
-          PTransformTranslation.READ_TRANSFORM_URN, new Factory());
-    }
-  }
-
-  /** A factory for {@link BoundedSourceRunner}. */
-  static class Factory<InputT extends BoundedSource<OutputT>, OutputT>
-      implements PTransformRunnerFactory<BoundedSourceRunner<InputT, OutputT>> {
-    @Override
-    public BoundedSourceRunner<InputT, OutputT> createRunnerForPTransform(
-        PipelineOptions pipelineOptions,
-        BeamFnDataClient beamFnDataClient,
-        BeamFnStateClient beamFnStateClient,
-        BeamFnTimerClient beamFnTimerClient,
-        String pTransformId,
-        PTransform pTransform,
-        Supplier<String> processBundleInstructionId,
-        Map<String, PCollection> pCollections,
-        Map<String, Coder> coders,
-        Map<String, RunnerApi.WindowingStrategy> windowingStrategies,
-        PCollectionConsumerRegistry pCollectionConsumerRegistry,
-        PTransformFunctionRegistry startFunctionRegistry,
-        PTransformFunctionRegistry finishFunctionRegistry,
-        Consumer<ThrowingRunnable> addResetFunction,
-        Consumer<ThrowingRunnable> tearDownFunctions,
-        Consumer<ProgressRequestCallback> addProgressRequestCallback,
-        BundleSplitListener splitListener,
-        BundleFinalizer bundleFinalizer) {
-      ImmutableList.Builder<FnDataReceiver<WindowedValue<?>>> consumers = ImmutableList.builder();
-      for (String pCollectionId : pTransform.getOutputsMap().values()) {
-        consumers.add(pCollectionConsumerRegistry.getMultiplexingConsumer(pCollectionId));
-      }
-
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      BoundedSourceRunner<InputT, OutputT> runner =
-          new BoundedSourceRunner(pipelineOptions, pTransform.getSpec(), consumers.build());
-
-      // TODO: Remove and replace with source being sent across gRPC port
-      startFunctionRegistry.register(pTransformId, runner::start);
-
-      FnDataReceiver runReadLoop = (FnDataReceiver<WindowedValue<InputT>>) runner::runReadLoop;
-      for (String pCollectionId : pTransform.getInputsMap().values()) {
-        pCollectionConsumerRegistry.register(pCollectionId, pTransformId, runReadLoop);
-      }
-
-      return runner;
-    }
-  }
-
-  private final PipelineOptions pipelineOptions;
-  private final RunnerApi.FunctionSpec definition;
-  private final Collection<FnDataReceiver<WindowedValue<OutputT>>> consumers;
-
-  BoundedSourceRunner(
-      PipelineOptions pipelineOptions,
-      RunnerApi.FunctionSpec definition,
-      Collection<FnDataReceiver<WindowedValue<OutputT>>> consumers) {
-    this.pipelineOptions = pipelineOptions;
-    this.definition = definition;
-    this.consumers = consumers;
-  }
-
-  /**
-   * @deprecated The runner harness is meant to send the source over the Beam Fn Data API which
-   *     would be consumed by the {@link #runReadLoop}. Drop this method once the runner harness
-   *     sends the source instead of unpacking it from the data block of the function specification.
-   */
-  @Deprecated
-  public void start() throws Exception {
-    try {
-      // The representation here is defined as the java serialized representation of the
-      // bounded source object in a ByteString wrapper.
-      InputT boundedSource;
-      if (definition.getUrn().equals(ProcessBundleHandler.JAVA_SOURCE_URN)) {
-        byte[] bytes = definition.getPayload().toByteArray();
-        @SuppressWarnings("unchecked")
-        InputT boundedSource0 =
-            (InputT) SerializableUtils.deserializeFromByteArray(bytes, definition.toString());
-        boundedSource = boundedSource0;
-      } else if (definition.getUrn().equals(PTransformTranslation.READ_TRANSFORM_URN)) {
-        ReadPayload readPayload = ReadPayload.parseFrom(definition.getPayload());
-        boundedSource = (InputT) ReadTranslation.boundedSourceFromProto(readPayload);
-      } else {
-        throw new IllegalArgumentException("Unknown source URN: " + definition.getUrn());
-      }
-      runReadLoop(WindowedValue.valueInGlobalWindow(boundedSource));
-    } catch (InvalidProtocolBufferException e) {
-      throw new IOException(String.format("Failed to decode %s", definition.getUrn()), e);
-    }
-  }
-
-  /**
-   * Creates a {@link Reader} for each {@link BoundedSource} and executes the {@link Reader}s read
-   * loop. See {@link Reader} for further details of the read loop.
-   *
-   * <p>Propagates any exceptions caused during reading or processing via a consumer to the caller.
-   */
-  public void runReadLoop(WindowedValue<InputT> value) throws Exception {
-    try (Reader<OutputT> reader = value.getValue().createReader(pipelineOptions)) {
-      if (!reader.start()) {
-        // Reader has no data, immediately return
-        return;
-      }
-      do {
-        // TODO: Should this use the input window as the window for all the outputs?
-        WindowedValue<OutputT> nextValue =
-            WindowedValue.timestampedValueInGlobalWindow(
-                reader.getCurrent(), reader.getCurrentTimestamp());
-        for (FnDataReceiver<WindowedValue<OutputT>> consumer : consumers) {
-          consumer.accept(nextValue);
-        }
-      } while (reader.advance());
-    }
-  }
-
-  @Override
-  public String toString() {
-    return definition.toString();
-  }
-}
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
deleted file mode 100644
index 99528f5..0000000
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.fn.harness;
-
-import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.collection.IsEmptyCollection.empty;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.ServiceLoader;
-import org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar;
-import org.apache.beam.fn.harness.control.ProcessBundleHandler;
-import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
-import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
-import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
-import org.apache.beam.sdk.fn.data.FnDataReceiver;
-import org.apache.beam.sdk.function.ThrowingRunnable;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.CountingSource;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import org.hamcrest.Matchers;
-import org.hamcrest.collection.IsMapContaining;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link BoundedSourceRunner}. */
-@RunWith(JUnit4.class)
-public class BoundedSourceRunnerTest {
-
-  public static final String URN = "beam:source:java:0.1";
-
-  @Test
-  public void testRunReadLoopWithMultipleSources() throws Exception {
-    List<WindowedValue<Long>> out1Values = new ArrayList<>();
-    List<WindowedValue<Long>> out2Values = new ArrayList<>();
-    Collection<FnDataReceiver<WindowedValue<Long>>> consumers =
-        ImmutableList.of(out1Values::add, out2Values::add);
-
-    BoundedSourceRunner<BoundedSource<Long>, Long> runner =
-        new BoundedSourceRunner<>(
-            PipelineOptionsFactory.create(),
-            RunnerApi.FunctionSpec.getDefaultInstance(),
-            consumers);
-
-    runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(2)));
-    runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(1)));
-
-    assertThat(
-        out1Values,
-        contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(0L)));
-    assertThat(
-        out2Values,
-        contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(0L)));
-  }
-
-  @Test
-  public void testRunReadLoopWithEmptySource() throws Exception {
-    List<WindowedValue<Long>> outValues = new ArrayList<>();
-    Collection<FnDataReceiver<WindowedValue<Long>>> consumers = ImmutableList.of(outValues::add);
-
-    BoundedSourceRunner<BoundedSource<Long>, Long> runner =
-        new BoundedSourceRunner<>(
-            PipelineOptionsFactory.create(),
-            RunnerApi.FunctionSpec.getDefaultInstance(),
-            consumers);
-
-    runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(0)));
-
-    assertThat(outValues, empty());
-  }
-
-  @Test
-  public void testStart() throws Exception {
-    List<WindowedValue<Long>> outValues = new ArrayList<>();
-    Collection<FnDataReceiver<WindowedValue<Long>>> consumers = ImmutableList.of(outValues::add);
-
-    ByteString encodedSource =
-        ByteString.copyFrom(SerializableUtils.serializeToByteArray(CountingSource.upTo(3)));
-
-    BoundedSourceRunner<BoundedSource<Long>, Long> runner =
-        new BoundedSourceRunner<>(
-            PipelineOptionsFactory.create(),
-            RunnerApi.FunctionSpec.newBuilder()
-                .setUrn(ProcessBundleHandler.JAVA_SOURCE_URN)
-                .setPayload(encodedSource)
-                .build(),
-            consumers);
-
-    runner.start();
-
-    assertThat(
-        outValues,
-        contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(2L)));
-  }
-
-  @Test
-  public void testCreatingAndProcessingSourceFromFactory() throws Exception {
-    List<WindowedValue<String>> outputValues = new ArrayList<>();
-
-    MetricsContainerStepMap metricsContainerRegistry = new MetricsContainerStepMap();
-    PCollectionConsumerRegistry consumers =
-        new PCollectionConsumerRegistry(
-            metricsContainerRegistry, mock(ExecutionStateTracker.class));
-    consumers.register(
-        "outputPC",
-        "pTransformId",
-        (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) outputValues::add);
-    PTransformFunctionRegistry startFunctionRegistry =
-        new PTransformFunctionRegistry(
-            mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
-    PTransformFunctionRegistry finishFunctionRegistry =
-        new PTransformFunctionRegistry(
-            mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "finish");
-    List<ThrowingRunnable> teardownFunctions = new ArrayList<>();
-
-    RunnerApi.FunctionSpec functionSpec =
-        RunnerApi.FunctionSpec.newBuilder()
-            .setUrn("beam:source:java:0.1")
-            .setPayload(
-                ByteString.copyFrom(SerializableUtils.serializeToByteArray(CountingSource.upTo(3))))
-            .build();
-
-    RunnerApi.PTransform pTransform =
-        RunnerApi.PTransform.newBuilder()
-            .setSpec(functionSpec)
-            .putInputs("input", "inputPC")
-            .putOutputs("output", "outputPC")
-            .build();
-
-    new BoundedSourceRunner.Factory<>()
-        .createRunnerForPTransform(
-            PipelineOptionsFactory.create(),
-            null /* beamFnDataClient */,
-            null /* beamFnStateClient */,
-            null /* beamFnTimerClient */,
-            "pTransformId",
-            pTransform,
-            Suppliers.ofInstance("57L")::get,
-            Collections.emptyMap(),
-            Collections.emptyMap(),
-            Collections.emptyMap(),
-            consumers,
-            startFunctionRegistry,
-            finishFunctionRegistry,
-            null /* addResetFunction */,
-            teardownFunctions::add,
-            null /* addProgressRequestCallback */,
-            null /* splitListener */,
-            null /* bundleFinalizer */);
-
-    // This is testing a deprecated way of running sources and should be removed
-    // once all source definitions are instead propagated along the input edge.
-    Iterables.getOnlyElement(startFunctionRegistry.getFunctions()).run();
-    assertThat(
-        outputValues,
-        contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(2L)));
-    outputValues.clear();
-
-    // Check that when passing a source along as an input, the source is processed.
-    assertThat(consumers.keySet(), containsInAnyOrder("inputPC", "outputPC"));
-    consumers
-        .getMultiplexingConsumer("inputPC")
-        .accept(valueInGlobalWindow(CountingSource.upTo(2)));
-    assertThat(outputValues, contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L)));
-
-    assertThat(finishFunctionRegistry.getFunctions(), Matchers.empty());
-    assertThat(teardownFunctions, Matchers.empty());
-  }
-
-  @Test
-  public void testRegistration() {
-    for (Registrar registrar : ServiceLoader.load(Registrar.class)) {
-      if (registrar instanceof BoundedSourceRunner.Registrar) {
-        assertThat(registrar.getPTransformRunnerFactories(), IsMapContaining.hasKey(URN));
-        return;
-      }
-    }
-    fail("Expected registrar not found.");
-  }
-}