You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 19:53:01 UTC
[05/50] [abbrv] beam git commit: Fix split package in SDK harness
http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
deleted file mode 100644
index 64d9ea6..0000000
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core;
-
-import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Suppliers;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.ServiceLoader;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.beam.fn.harness.data.BeamFnDataClient;
-import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
-import org.apache.beam.fn.harness.fn.ThrowingConsumer;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
-import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.core.PTransformRunnerFactory.Registrar;
-import org.apache.beam.runners.dataflow.util.CloudObjects;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.hamcrest.collection.IsMapContaining;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Matchers;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/** Tests for {@link BeamFnDataWriteRunner}. */
-@RunWith(JUnit4.class)
-public class BeamFnDataWriteRunnerTest {
-
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder()
- .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build();
- private static final RunnerApi.FunctionSpec FUNCTION_SPEC = RunnerApi.FunctionSpec.newBuilder()
- .setParameter(Any.pack(PORT_SPEC)).build();
- private static final String CODER_ID = "string-coder-id";
- private static final Coder<WindowedValue<String>> CODER =
- WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
- private static final RunnerApi.Coder CODER_SPEC;
- private static final String URN = "urn:org.apache.beam:sink:runner:0.1";
-
- static {
- try {
- CODER_SPEC = RunnerApi.Coder.newBuilder().setSpec(
- RunnerApi.SdkFunctionSpec.newBuilder().setSpec(
- RunnerApi.FunctionSpec.newBuilder().setParameter(
- Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
- OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(CODER))))
- .build()))
- .build())
- .build())
- .build();
- } catch (IOException e) {
- throw new ExceptionInInitializerError(e);
- }
- }
- private static final BeamFnApi.Target OUTPUT_TARGET = BeamFnApi.Target.newBuilder()
- .setPrimitiveTransformReference("1")
- .setName("out")
- .build();
-
- @Mock private BeamFnDataClient mockBeamFnDataClient;
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
- }
-
-
- @Test
- public void testCreatingAndProcessingBeamFnDataWriteRunner() throws Exception {
- String bundleId = "57L";
- String inputId = "100L";
-
- Multimap<String, ThrowingConsumer<WindowedValue<?>>> consumers = HashMultimap.create();
- List<ThrowingRunnable> startFunctions = new ArrayList<>();
- List<ThrowingRunnable> finishFunctions = new ArrayList<>();
-
- RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder()
- .setUrn("urn:org.apache.beam:sink:runner:0.1")
- .setParameter(Any.pack(PORT_SPEC))
- .build();
-
- RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder()
- .setSpec(functionSpec)
- .putInputs(inputId, "inputPC")
- .build();
-
- new BeamFnDataWriteRunner.Factory<String>().createRunnerForPTransform(
- PipelineOptionsFactory.create(),
- mockBeamFnDataClient,
- "ptransformId",
- pTransform,
- Suppliers.ofInstance(bundleId)::get,
- ImmutableMap.of("inputPC",
- RunnerApi.PCollection.newBuilder().setCoderId(CODER_ID).build()),
- ImmutableMap.of(CODER_ID, CODER_SPEC),
- consumers,
- startFunctions::add,
- finishFunctions::add);
-
- verifyZeroInteractions(mockBeamFnDataClient);
-
- List<WindowedValue<String>> outputValues = new ArrayList<>();
- AtomicBoolean wasCloseCalled = new AtomicBoolean();
- CloseableThrowingConsumer<WindowedValue<String>> outputConsumer =
- new CloseableThrowingConsumer<WindowedValue<String>>(){
- @Override
- public void close() throws Exception {
- wasCloseCalled.set(true);
- }
-
- @Override
- public void accept(WindowedValue<String> t) throws Exception {
- outputValues.add(t);
- }
- };
-
- when(mockBeamFnDataClient.forOutboundConsumer(
- any(),
- any(),
- Matchers.<Coder<WindowedValue<String>>>any())).thenReturn(outputConsumer);
- Iterables.getOnlyElement(startFunctions).run();
- verify(mockBeamFnDataClient).forOutboundConsumer(
- eq(PORT_SPEC.getApiServiceDescriptor()),
- eq(KV.of(bundleId, BeamFnApi.Target.newBuilder()
- .setPrimitiveTransformReference("ptransformId")
- .setName(inputId)
- .build())),
- eq(CODER));
-
- assertThat(consumers.keySet(), containsInAnyOrder("inputPC"));
- Iterables.getOnlyElement(consumers.get("inputPC")).accept(valueInGlobalWindow("TestValue"));
- assertThat(outputValues, contains(valueInGlobalWindow("TestValue")));
- outputValues.clear();
-
- assertFalse(wasCloseCalled.get());
- Iterables.getOnlyElement(finishFunctions).run();
- assertTrue(wasCloseCalled.get());
-
- verifyNoMoreInteractions(mockBeamFnDataClient);
- }
-
- @Test
- public void testReuseForMultipleBundles() throws Exception {
- RecordingConsumer<WindowedValue<String>> valuesA = new RecordingConsumer<>();
- RecordingConsumer<WindowedValue<String>> valuesB = new RecordingConsumer<>();
- when(mockBeamFnDataClient.forOutboundConsumer(
- any(),
- any(),
- Matchers.<Coder<WindowedValue<String>>>any())).thenReturn(valuesA).thenReturn(valuesB);
- AtomicReference<String> bundleId = new AtomicReference<>("0");
- BeamFnDataWriteRunner<String> writeRunner = new BeamFnDataWriteRunner<>(
- FUNCTION_SPEC,
- bundleId::get,
- OUTPUT_TARGET,
- CODER_SPEC,
- mockBeamFnDataClient);
-
- // Process for bundle id 0
- writeRunner.registerForOutput();
-
- verify(mockBeamFnDataClient).forOutboundConsumer(
- eq(PORT_SPEC.getApiServiceDescriptor()),
- eq(KV.of(bundleId.get(), OUTPUT_TARGET)),
- eq(CODER));
-
- writeRunner.consume(valueInGlobalWindow("ABC"));
- writeRunner.consume(valueInGlobalWindow("DEF"));
- writeRunner.close();
-
- assertTrue(valuesA.closed);
- assertThat(valuesA, contains(valueInGlobalWindow("ABC"), valueInGlobalWindow("DEF")));
-
- // Process for bundle id 1
- bundleId.set("1");
- valuesA.clear();
- valuesB.clear();
- writeRunner.registerForOutput();
-
- verify(mockBeamFnDataClient).forOutboundConsumer(
- eq(PORT_SPEC.getApiServiceDescriptor()),
- eq(KV.of(bundleId.get(), OUTPUT_TARGET)),
- eq(CODER));
-
- writeRunner.consume(valueInGlobalWindow("GHI"));
- writeRunner.consume(valueInGlobalWindow("JKL"));
- writeRunner.close();
-
- assertTrue(valuesB.closed);
- assertThat(valuesB, contains(valueInGlobalWindow("GHI"), valueInGlobalWindow("JKL")));
- verifyNoMoreInteractions(mockBeamFnDataClient);
- }
-
- private static class RecordingConsumer<T> extends ArrayList<T>
- implements CloseableThrowingConsumer<T> {
- private boolean closed;
- @Override
- public void close() throws Exception {
- closed = true;
- }
-
- @Override
- public void accept(T t) throws Exception {
- if (closed) {
- throw new IllegalStateException("Consumer is closed but attempting to consume " + t);
- }
- add(t);
- }
- }
-
- @Test
- public void testRegistration() {
- for (Registrar registrar :
- ServiceLoader.load(Registrar.class)) {
- if (registrar instanceof BeamFnDataWriteRunner.Registrar) {
- assertThat(registrar.getPTransformRunnerFactories(), IsMapContaining.hasKey(URN));
- return;
- }
- }
- fail("Expected registrar not found.");
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java
deleted file mode 100644
index 6c9a4cb..0000000
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core;
-
-import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.collection.IsEmptyCollection.empty;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
-import com.google.common.base.Suppliers;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.ServiceLoader;
-import org.apache.beam.fn.harness.fn.ThrowingConsumer;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
-import org.apache.beam.runners.core.PTransformRunnerFactory.Registrar;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.CountingSource;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.hamcrest.Matchers;
-import org.hamcrest.collection.IsMapContaining;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link BoundedSourceRunner}. */
-@RunWith(JUnit4.class)
-public class BoundedSourceRunnerTest {
-
- public static final String URN = "urn:org.apache.beam:source:java:0.1";
-
- @Test
- public void testRunReadLoopWithMultipleSources() throws Exception {
- List<WindowedValue<Long>> out1Values = new ArrayList<>();
- List<WindowedValue<Long>> out2Values = new ArrayList<>();
- Collection<ThrowingConsumer<WindowedValue<Long>>> consumers =
- ImmutableList.of(out1Values::add, out2Values::add);
-
- BoundedSourceRunner<BoundedSource<Long>, Long> runner = new BoundedSourceRunner<>(
- PipelineOptionsFactory.create(),
- RunnerApi.FunctionSpec.getDefaultInstance(),
- consumers);
-
- runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(2)));
- runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(1)));
-
- assertThat(out1Values,
- contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(0L)));
- assertThat(out2Values,
- contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(0L)));
- }
-
- @Test
- public void testRunReadLoopWithEmptySource() throws Exception {
- List<WindowedValue<Long>> outValues = new ArrayList<>();
- Collection<ThrowingConsumer<WindowedValue<Long>>> consumers =
- ImmutableList.of(outValues::add);
-
- BoundedSourceRunner<BoundedSource<Long>, Long> runner = new BoundedSourceRunner<>(
- PipelineOptionsFactory.create(),
- RunnerApi.FunctionSpec.getDefaultInstance(),
- consumers);
-
- runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(0)));
-
- assertThat(outValues, empty());
- }
-
- @Test
- public void testStart() throws Exception {
- List<WindowedValue<Long>> outValues = new ArrayList<>();
- Collection<ThrowingConsumer<WindowedValue<Long>>> consumers =
- ImmutableList.of(outValues::add);
-
- ByteString encodedSource =
- ByteString.copyFrom(SerializableUtils.serializeToByteArray(CountingSource.upTo(3)));
-
- BoundedSourceRunner<BoundedSource<Long>, Long> runner = new BoundedSourceRunner<>(
- PipelineOptionsFactory.create(),
- RunnerApi.FunctionSpec.newBuilder().setParameter(
- Any.pack(BytesValue.newBuilder().setValue(encodedSource).build())).build(),
- consumers);
-
- runner.start();
-
- assertThat(outValues,
- contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(2L)));
- }
-
- @Test
- public void testCreatingAndProcessingSourceFromFactory() throws Exception {
- List<WindowedValue<String>> outputValues = new ArrayList<>();
-
- Multimap<String, ThrowingConsumer<WindowedValue<?>>> consumers = HashMultimap.create();
- consumers.put("outputPC",
- (ThrowingConsumer) (ThrowingConsumer<WindowedValue<String>>) outputValues::add);
- List<ThrowingRunnable> startFunctions = new ArrayList<>();
- List<ThrowingRunnable> finishFunctions = new ArrayList<>();
-
- RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder()
- .setUrn("urn:org.apache.beam:source:java:0.1")
- .setParameter(Any.pack(BytesValue.newBuilder()
- .setValue(ByteString.copyFrom(
- SerializableUtils.serializeToByteArray(CountingSource.upTo(3))))
- .build()))
- .build();
-
- RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder()
- .setSpec(functionSpec)
- .putInputs("input", "inputPC")
- .putOutputs("output", "outputPC")
- .build();
-
- new BoundedSourceRunner.Factory<>().createRunnerForPTransform(
- PipelineOptionsFactory.create(),
- null /* beamFnDataClient */,
- "pTransformId",
- pTransform,
- Suppliers.ofInstance("57L")::get,
- ImmutableMap.of(),
- ImmutableMap.of(),
- consumers,
- startFunctions::add,
- finishFunctions::add);
-
- // This is testing a deprecated way of running sources and should be removed
- // once all source definitions are instead propagated along the input edge.
- Iterables.getOnlyElement(startFunctions).run();
- assertThat(outputValues, contains(
- valueInGlobalWindow(0L),
- valueInGlobalWindow(1L),
- valueInGlobalWindow(2L)));
- outputValues.clear();
-
- // Check that when passing a source along as an input, the source is processed.
- assertThat(consumers.keySet(), containsInAnyOrder("inputPC", "outputPC"));
- Iterables.getOnlyElement(consumers.get("inputPC")).accept(
- valueInGlobalWindow(CountingSource.upTo(2)));
- assertThat(outputValues, contains(
- valueInGlobalWindow(0L),
- valueInGlobalWindow(1L)));
-
- assertThat(finishFunctions, Matchers.empty());
- }
-
- @Test
- public void testRegistration() {
- for (Registrar registrar :
- ServiceLoader.load(Registrar.class)) {
- if (registrar instanceof BoundedSourceRunner.Registrar) {
- assertThat(registrar.getPTransformRunnerFactories(), IsMapContaining.hasKey(URN));
- return;
- }
- }
- fail("Expected registrar not found.");
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java
deleted file mode 100644
index c4df77a..0000000
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core;
-
-import static org.apache.beam.sdk.util.WindowedValue.timestampedValueInGlobalWindow;
-import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Suppliers;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
-import com.google.protobuf.Message;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.ServiceLoader;
-import org.apache.beam.fn.harness.fn.ThrowingConsumer;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
-import org.apache.beam.runners.core.PTransformRunnerFactory.Registrar;
-import org.apache.beam.runners.core.construction.ParDoTranslation;
-import org.apache.beam.runners.dataflow.util.CloudObjects;
-import org.apache.beam.runners.dataflow.util.DoFnInfo;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.hamcrest.collection.IsMapContaining;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link FnApiDoFnRunner}. */
-@RunWith(JUnit4.class)
-public class FnApiDoFnRunnerTest {
-
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private static final Coder<WindowedValue<String>> STRING_CODER =
- WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
- private static final String STRING_CODER_SPEC_ID = "999L";
- private static final RunnerApi.Coder STRING_CODER_SPEC;
-
- static {
- try {
- STRING_CODER_SPEC = RunnerApi.Coder.newBuilder()
- .setSpec(RunnerApi.SdkFunctionSpec.newBuilder()
- .setSpec(RunnerApi.FunctionSpec.newBuilder()
- .setParameter(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
- OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(STRING_CODER))))
- .build())))
- .build())
- .build();
- } catch (IOException e) {
- throw new ExceptionInInitializerError(e);
- }
- }
-
- private static class TestDoFn extends DoFn<String, String> {
- private static final TupleTag<String> mainOutput = new TupleTag<>("mainOutput");
- private static final TupleTag<String> additionalOutput = new TupleTag<>("output");
-
- private BoundedWindow window;
-
- @ProcessElement
- public void processElement(ProcessContext context, BoundedWindow window) {
- context.output("MainOutput" + context.element());
- context.output(additionalOutput, "AdditionalOutput" + context.element());
- this.window = window;
- }
-
- @FinishBundle
- public void finishBundle(FinishBundleContext context) {
- if (window != null) {
- context.output("FinishBundle", window.maxTimestamp(), window);
- window = null;
- }
- }
- }
-
- /**
- * Create a DoFn that has 3 inputs (inputATarget1, inputATarget2, inputBTarget) and 2 outputs
- * (mainOutput, output). Validate that inputs are fed to the {@link DoFn} and that outputs
- * are directed to the correct consumers.
- */
- @Test
- public void testCreatingAndProcessingDoFn() throws Exception {
- Map<String, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC);
- String pTransformId = "pTransformId";
- String mainOutputId = "101";
- String additionalOutputId = "102";
-
- DoFnInfo<?, ?> doFnInfo = DoFnInfo.forFn(
- new TestDoFn(),
- WindowingStrategy.globalDefault(),
- ImmutableList.of(),
- StringUtf8Coder.of(),
- Long.parseLong(mainOutputId),
- ImmutableMap.of(
- Long.parseLong(mainOutputId), TestDoFn.mainOutput,
- Long.parseLong(additionalOutputId), TestDoFn.additionalOutput));
- RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder()
- .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)
- .setParameter(Any.pack(BytesValue.newBuilder()
- .setValue(ByteString.copyFrom(SerializableUtils.serializeToByteArray(doFnInfo)))
- .build()))
- .build();
- RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder()
- .setSpec(functionSpec)
- .putInputs("inputA", "inputATarget")
- .putInputs("inputB", "inputBTarget")
- .putOutputs(mainOutputId, "mainOutputTarget")
- .putOutputs(additionalOutputId, "additionalOutputTarget")
- .build();
-
- List<WindowedValue<String>> mainOutputValues = new ArrayList<>();
- List<WindowedValue<String>> additionalOutputValues = new ArrayList<>();
- Multimap<String, ThrowingConsumer<WindowedValue<?>>> consumers = HashMultimap.create();
- consumers.put("mainOutputTarget",
- (ThrowingConsumer) (ThrowingConsumer<WindowedValue<String>>) mainOutputValues::add);
- consumers.put("additionalOutputTarget",
- (ThrowingConsumer) (ThrowingConsumer<WindowedValue<String>>) additionalOutputValues::add);
- List<ThrowingRunnable> startFunctions = new ArrayList<>();
- List<ThrowingRunnable> finishFunctions = new ArrayList<>();
-
- new FnApiDoFnRunner.Factory<>().createRunnerForPTransform(
- PipelineOptionsFactory.create(),
- null /* beamFnDataClient */,
- pTransformId,
- pTransform,
- Suppliers.ofInstance("57L")::get,
- ImmutableMap.of(),
- ImmutableMap.of(),
- consumers,
- startFunctions::add,
- finishFunctions::add);
-
- Iterables.getOnlyElement(startFunctions).run();
- mainOutputValues.clear();
-
- assertThat(consumers.keySet(), containsInAnyOrder(
- "inputATarget", "inputBTarget", "mainOutputTarget", "additionalOutputTarget"));
-
- Iterables.getOnlyElement(consumers.get("inputATarget")).accept(valueInGlobalWindow("A1"));
- Iterables.getOnlyElement(consumers.get("inputATarget")).accept(valueInGlobalWindow("A2"));
- Iterables.getOnlyElement(consumers.get("inputATarget")).accept(valueInGlobalWindow("B"));
- assertThat(mainOutputValues, contains(
- valueInGlobalWindow("MainOutputA1"),
- valueInGlobalWindow("MainOutputA2"),
- valueInGlobalWindow("MainOutputB")));
- assertThat(additionalOutputValues, contains(
- valueInGlobalWindow("AdditionalOutputA1"),
- valueInGlobalWindow("AdditionalOutputA2"),
- valueInGlobalWindow("AdditionalOutputB")));
- mainOutputValues.clear();
- additionalOutputValues.clear();
-
- Iterables.getOnlyElement(finishFunctions).run();
- assertThat(
- mainOutputValues,
- contains(
- timestampedValueInGlobalWindow("FinishBundle", GlobalWindow.INSTANCE.maxTimestamp())));
- mainOutputValues.clear();
- }
-
- @Test
- public void testRegistration() {
- for (Registrar registrar :
- ServiceLoader.load(Registrar.class)) {
- if (registrar instanceof FnApiDoFnRunner.Registrar) {
- assertThat(registrar.getPTransformRunnerFactories(),
- IsMapContaining.hasKey(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN));
- return;
- }
- }
- fail("Expected registrar not found.");
- }
-}