You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/10/13 18:14:46 UTC
[beam] branch master updated: [BEAM-9309] Remove the READ
urn/payload from the Java SDK harness.
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 56313cc [BEAM-9309] Remove the READ urn/payload from the Java SDK harness.
new 0f7d423 Merge pull request #13074 from lukecwik/beam9309
56313cc is described below
commit 56313cc509be7ec54601a4635a4e9a5506b67647
Author: Luke Cwik <lc...@google.com>
AuthorDate: Mon Oct 12 09:16:13 2020 -0700
[BEAM-9309] Remove the READ urn/payload from the Java SDK harness.
This only covers part of what is necessary for BEAM-9309.
---
.../beam/fn/harness/BoundedSourceRunner.java | 184 ------------------
.../beam/fn/harness/BoundedSourceRunnerTest.java | 211 ---------------------
2 files changed, 395 deletions(-)
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
deleted file mode 100644
index ee63a6f..0000000
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.fn.harness;
-
-import com.google.auto.service.AutoService;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-import org.apache.beam.fn.harness.control.BundleSplitListener;
-import org.apache.beam.fn.harness.control.ProcessBundleHandler;
-import org.apache.beam.fn.harness.data.BeamFnDataClient;
-import org.apache.beam.fn.harness.data.BeamFnTimerClient;
-import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
-import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
-import org.apache.beam.fn.harness.state.BeamFnStateClient;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Coder;
-import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
-import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
-import org.apache.beam.model.pipeline.v1.RunnerApi.ReadPayload;
-import org.apache.beam.runners.core.construction.PTransformTranslation;
-import org.apache.beam.runners.core.construction.ReadTranslation;
-import org.apache.beam.sdk.fn.data.FnDataReceiver;
-import org.apache.beam.sdk.function.ThrowingRunnable;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Source.Reader;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-
-/**
- * A runner which creates {@link Reader}s for each {@link BoundedSource} sent as an input and
- * executes the {@link Reader}s read loop.
- */
-public class BoundedSourceRunner<InputT extends BoundedSource<OutputT>, OutputT> {
-
- /** A registrar which provides a factory to handle Java {@link BoundedSource}s. */
- @AutoService(PTransformRunnerFactory.Registrar.class)
- public static class Registrar implements PTransformRunnerFactory.Registrar {
-
- @Override
- public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
- return ImmutableMap.of(
- ProcessBundleHandler.JAVA_SOURCE_URN, new Factory(),
- PTransformTranslation.READ_TRANSFORM_URN, new Factory());
- }
- }
-
- /** A factory for {@link BoundedSourceRunner}. */
- static class Factory<InputT extends BoundedSource<OutputT>, OutputT>
- implements PTransformRunnerFactory<BoundedSourceRunner<InputT, OutputT>> {
- @Override
- public BoundedSourceRunner<InputT, OutputT> createRunnerForPTransform(
- PipelineOptions pipelineOptions,
- BeamFnDataClient beamFnDataClient,
- BeamFnStateClient beamFnStateClient,
- BeamFnTimerClient beamFnTimerClient,
- String pTransformId,
- PTransform pTransform,
- Supplier<String> processBundleInstructionId,
- Map<String, PCollection> pCollections,
- Map<String, Coder> coders,
- Map<String, RunnerApi.WindowingStrategy> windowingStrategies,
- PCollectionConsumerRegistry pCollectionConsumerRegistry,
- PTransformFunctionRegistry startFunctionRegistry,
- PTransformFunctionRegistry finishFunctionRegistry,
- Consumer<ThrowingRunnable> addResetFunction,
- Consumer<ThrowingRunnable> tearDownFunctions,
- Consumer<ProgressRequestCallback> addProgressRequestCallback,
- BundleSplitListener splitListener,
- BundleFinalizer bundleFinalizer) {
- ImmutableList.Builder<FnDataReceiver<WindowedValue<?>>> consumers = ImmutableList.builder();
- for (String pCollectionId : pTransform.getOutputsMap().values()) {
- consumers.add(pCollectionConsumerRegistry.getMultiplexingConsumer(pCollectionId));
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- BoundedSourceRunner<InputT, OutputT> runner =
- new BoundedSourceRunner(pipelineOptions, pTransform.getSpec(), consumers.build());
-
- // TODO: Remove and replace with source being sent across gRPC port
- startFunctionRegistry.register(pTransformId, runner::start);
-
- FnDataReceiver runReadLoop = (FnDataReceiver<WindowedValue<InputT>>) runner::runReadLoop;
- for (String pCollectionId : pTransform.getInputsMap().values()) {
- pCollectionConsumerRegistry.register(pCollectionId, pTransformId, runReadLoop);
- }
-
- return runner;
- }
- }
-
- private final PipelineOptions pipelineOptions;
- private final RunnerApi.FunctionSpec definition;
- private final Collection<FnDataReceiver<WindowedValue<OutputT>>> consumers;
-
- BoundedSourceRunner(
- PipelineOptions pipelineOptions,
- RunnerApi.FunctionSpec definition,
- Collection<FnDataReceiver<WindowedValue<OutputT>>> consumers) {
- this.pipelineOptions = pipelineOptions;
- this.definition = definition;
- this.consumers = consumers;
- }
-
- /**
- * @deprecated The runner harness is meant to send the source over the Beam Fn Data API which
- * would be consumed by the {@link #runReadLoop}. Drop this method once the runner harness
- * sends the source instead of unpacking it from the data block of the function specification.
- */
- @Deprecated
- public void start() throws Exception {
- try {
- // The representation here is defined as the java serialized representation of the
- // bounded source object in a ByteString wrapper.
- InputT boundedSource;
- if (definition.getUrn().equals(ProcessBundleHandler.JAVA_SOURCE_URN)) {
- byte[] bytes = definition.getPayload().toByteArray();
- @SuppressWarnings("unchecked")
- InputT boundedSource0 =
- (InputT) SerializableUtils.deserializeFromByteArray(bytes, definition.toString());
- boundedSource = boundedSource0;
- } else if (definition.getUrn().equals(PTransformTranslation.READ_TRANSFORM_URN)) {
- ReadPayload readPayload = ReadPayload.parseFrom(definition.getPayload());
- boundedSource = (InputT) ReadTranslation.boundedSourceFromProto(readPayload);
- } else {
- throw new IllegalArgumentException("Unknown source URN: " + definition.getUrn());
- }
- runReadLoop(WindowedValue.valueInGlobalWindow(boundedSource));
- } catch (InvalidProtocolBufferException e) {
- throw new IOException(String.format("Failed to decode %s", definition.getUrn()), e);
- }
- }
-
- /**
- * Creates a {@link Reader} for each {@link BoundedSource} and executes the {@link Reader}s read
- * loop. See {@link Reader} for further details of the read loop.
- *
- * <p>Propagates any exceptions caused during reading or processing via a consumer to the caller.
- */
- public void runReadLoop(WindowedValue<InputT> value) throws Exception {
- try (Reader<OutputT> reader = value.getValue().createReader(pipelineOptions)) {
- if (!reader.start()) {
- // Reader has no data, immediately return
- return;
- }
- do {
- // TODO: Should this use the input window as the window for all the outputs?
- WindowedValue<OutputT> nextValue =
- WindowedValue.timestampedValueInGlobalWindow(
- reader.getCurrent(), reader.getCurrentTimestamp());
- for (FnDataReceiver<WindowedValue<OutputT>> consumer : consumers) {
- consumer.accept(nextValue);
- }
- } while (reader.advance());
- }
- }
-
- @Override
- public String toString() {
- return definition.toString();
- }
-}
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
deleted file mode 100644
index 99528f5..0000000
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.fn.harness;
-
-import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.collection.IsEmptyCollection.empty;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.ServiceLoader;
-import org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar;
-import org.apache.beam.fn.harness.control.ProcessBundleHandler;
-import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
-import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
-import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
-import org.apache.beam.sdk.fn.data.FnDataReceiver;
-import org.apache.beam.sdk.function.ThrowingRunnable;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.CountingSource;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import org.hamcrest.Matchers;
-import org.hamcrest.collection.IsMapContaining;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link BoundedSourceRunner}. */
-@RunWith(JUnit4.class)
-public class BoundedSourceRunnerTest {
-
- public static final String URN = "beam:source:java:0.1";
-
- @Test
- public void testRunReadLoopWithMultipleSources() throws Exception {
- List<WindowedValue<Long>> out1Values = new ArrayList<>();
- List<WindowedValue<Long>> out2Values = new ArrayList<>();
- Collection<FnDataReceiver<WindowedValue<Long>>> consumers =
- ImmutableList.of(out1Values::add, out2Values::add);
-
- BoundedSourceRunner<BoundedSource<Long>, Long> runner =
- new BoundedSourceRunner<>(
- PipelineOptionsFactory.create(),
- RunnerApi.FunctionSpec.getDefaultInstance(),
- consumers);
-
- runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(2)));
- runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(1)));
-
- assertThat(
- out1Values,
- contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(0L)));
- assertThat(
- out2Values,
- contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(0L)));
- }
-
- @Test
- public void testRunReadLoopWithEmptySource() throws Exception {
- List<WindowedValue<Long>> outValues = new ArrayList<>();
- Collection<FnDataReceiver<WindowedValue<Long>>> consumers = ImmutableList.of(outValues::add);
-
- BoundedSourceRunner<BoundedSource<Long>, Long> runner =
- new BoundedSourceRunner<>(
- PipelineOptionsFactory.create(),
- RunnerApi.FunctionSpec.getDefaultInstance(),
- consumers);
-
- runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(0)));
-
- assertThat(outValues, empty());
- }
-
- @Test
- public void testStart() throws Exception {
- List<WindowedValue<Long>> outValues = new ArrayList<>();
- Collection<FnDataReceiver<WindowedValue<Long>>> consumers = ImmutableList.of(outValues::add);
-
- ByteString encodedSource =
- ByteString.copyFrom(SerializableUtils.serializeToByteArray(CountingSource.upTo(3)));
-
- BoundedSourceRunner<BoundedSource<Long>, Long> runner =
- new BoundedSourceRunner<>(
- PipelineOptionsFactory.create(),
- RunnerApi.FunctionSpec.newBuilder()
- .setUrn(ProcessBundleHandler.JAVA_SOURCE_URN)
- .setPayload(encodedSource)
- .build(),
- consumers);
-
- runner.start();
-
- assertThat(
- outValues,
- contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(2L)));
- }
-
- @Test
- public void testCreatingAndProcessingSourceFromFactory() throws Exception {
- List<WindowedValue<String>> outputValues = new ArrayList<>();
-
- MetricsContainerStepMap metricsContainerRegistry = new MetricsContainerStepMap();
- PCollectionConsumerRegistry consumers =
- new PCollectionConsumerRegistry(
- metricsContainerRegistry, mock(ExecutionStateTracker.class));
- consumers.register(
- "outputPC",
- "pTransformId",
- (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) outputValues::add);
- PTransformFunctionRegistry startFunctionRegistry =
- new PTransformFunctionRegistry(
- mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
- PTransformFunctionRegistry finishFunctionRegistry =
- new PTransformFunctionRegistry(
- mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "finish");
- List<ThrowingRunnable> teardownFunctions = new ArrayList<>();
-
- RunnerApi.FunctionSpec functionSpec =
- RunnerApi.FunctionSpec.newBuilder()
- .setUrn("beam:source:java:0.1")
- .setPayload(
- ByteString.copyFrom(SerializableUtils.serializeToByteArray(CountingSource.upTo(3))))
- .build();
-
- RunnerApi.PTransform pTransform =
- RunnerApi.PTransform.newBuilder()
- .setSpec(functionSpec)
- .putInputs("input", "inputPC")
- .putOutputs("output", "outputPC")
- .build();
-
- new BoundedSourceRunner.Factory<>()
- .createRunnerForPTransform(
- PipelineOptionsFactory.create(),
- null /* beamFnDataClient */,
- null /* beamFnStateClient */,
- null /* beamFnTimerClient */,
- "pTransformId",
- pTransform,
- Suppliers.ofInstance("57L")::get,
- Collections.emptyMap(),
- Collections.emptyMap(),
- Collections.emptyMap(),
- consumers,
- startFunctionRegistry,
- finishFunctionRegistry,
- null /* addResetFunction */,
- teardownFunctions::add,
- null /* addProgressRequestCallback */,
- null /* splitListener */,
- null /* bundleFinalizer */);
-
- // This is testing a deprecated way of running sources and should be removed
- // once all source definitions are instead propagated along the input edge.
- Iterables.getOnlyElement(startFunctionRegistry.getFunctions()).run();
- assertThat(
- outputValues,
- contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(2L)));
- outputValues.clear();
-
- // Check that when passing a source along as an input, the source is processed.
- assertThat(consumers.keySet(), containsInAnyOrder("inputPC", "outputPC"));
- consumers
- .getMultiplexingConsumer("inputPC")
- .accept(valueInGlobalWindow(CountingSource.upTo(2)));
- assertThat(outputValues, contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L)));
-
- assertThat(finishFunctionRegistry.getFunctions(), Matchers.empty());
- assertThat(teardownFunctions, Matchers.empty());
- }
-
- @Test
- public void testRegistration() {
- for (Registrar registrar : ServiceLoader.load(Registrar.class)) {
- if (registrar instanceof BoundedSourceRunner.Registrar) {
- assertThat(registrar.getPTransformRunnerFactories(), IsMapContaining.hasKey(URN));
- return;
- }
- }
- fail("Expected registrar not found.");
- }
-}