You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2021/10/22 01:41:31 UTC
[beam] branch master updated: [BEAM-13015] Create a multiplexer
that sends Elements based upon instruction id allowing for an inbound
observer responsible for the entire instruction id. (#15747)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6173abb [BEAM-13015] Create a multiplexer that sends Elements based upon instruction id allowing for an inbound observer responsible for the entire instruction id. (#15747)
6173abb is described below
commit 6173abb2e241865d89dff9ae679f4d422be84ee9
Author: Lukasz Cwik <lu...@gmail.com>
AuthorDate: Thu Oct 21 18:40:10 2021 -0700
[BEAM-13015] Create a multiplexer that sends Elements based upon instruction id allowing for an inbound observer responsible for the entire instruction id. (#15747)
* [BEAM-13015] Create a multiplexer that sends Elements based upon instruction id allowing for an inbound observer responsible for the entire instruction id.
* fixup! checkstyle
* Update sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java
Co-authored-by: Brian Hulette <hu...@gmail.com>
* fixup! Address PR comments.
Co-authored-by: Brian Hulette <hu...@gmail.com>
---
.../sdk/fn/data/BeamFnDataGrpcMultiplexer2.java | 274 ++++++++++++++++
.../sdk/fn/data/BeamFnDataInboundObserver2.java | 196 +++++++++++
.../org/apache/beam/sdk/fn/data/DataEndpoint.java | 35 ++
.../org/apache/beam/sdk/fn/data/TimerEndpoint.java | 37 +++
.../fn/data/BeamFnDataGrpcMultiplexer2Test.java | 363 +++++++++++++++++++++
.../fn/data/BeamFnDataInboundObserver2Test.java | 249 ++++++++++++++
6 files changed, 1154 insertions(+)
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java
new file mode 100644
index 0000000..c00db8c
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.sdk.fn.CancellableQueue;
+import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
+import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.Status;
+import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A gRPC multiplexer for a specific {@link Endpoints.ApiServiceDescriptor}.
+ *
+ * <p>Multiplexes data for inbound consumers based upon their {@code instructionId}.
+ *
+ * <p>Multiplexing inbound and outbound streams is as thread safe as the consumers of those streams.
+ * For inbound streams, this is as thread safe as the inbound observers. For outbound streams, this
+ * is as thread safe as the underlying stream observer.
+ *
+ * <p>TODO: Add support for multiplexing over multiple outbound observers by stickying the output
+ * location with a specific outbound observer.
+ */
+public class BeamFnDataGrpcMultiplexer2 implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataGrpcMultiplexer2.class);
+ private final Endpoints.@Nullable ApiServiceDescriptor apiServiceDescriptor;
+ private final StreamObserver<BeamFnApi.Elements> inboundObserver;
+ private final StreamObserver<BeamFnApi.Elements> outboundObserver;
+ private final ConcurrentMap<
+ /*instructionId=*/ String, CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>>>
+ receivers;
+ private final ConcurrentMap<String, Boolean> erroredInstructionIds;
+ private final List<CancellableQueue<BeamFnApi.Elements>> unusedQueues;
+
+ public BeamFnDataGrpcMultiplexer2(
+ Endpoints.@Nullable ApiServiceDescriptor apiServiceDescriptor,
+ OutboundObserverFactory outboundObserverFactory,
+ OutboundObserverFactory.BasicFactory<BeamFnApi.Elements, BeamFnApi.Elements>
+ baseOutboundObserverFactory) {
+ this.apiServiceDescriptor = apiServiceDescriptor;
+ this.receivers = new ConcurrentHashMap<>();
+ this.inboundObserver = new InboundObserver();
+ this.outboundObserver =
+ outboundObserverFactory.outboundObserverFor(baseOutboundObserverFactory, inboundObserver);
+ this.erroredInstructionIds = new ConcurrentHashMap<>();
+ this.unusedQueues = new ArrayList<>(100);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .omitNullValues()
+ .add("apiServiceDescriptor", apiServiceDescriptor)
+ .add("consumers", receivers)
+ .toString();
+ }
+
+ public StreamObserver<BeamFnApi.Elements> getInboundObserver() {
+ return inboundObserver;
+ }
+
+ public StreamObserver<BeamFnApi.Elements> getOutboundObserver() {
+ return outboundObserver;
+ }
+
+ private CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> receiverFuture(
+ String instructionId) {
+ return receivers.computeIfAbsent(instructionId, (unused) -> new CompletableFuture<>());
+ }
+
+ /**
+ * Registers a consumer for the specified intruction id.
+ *
+ * <p>The {@link BeamFnDataGrpcMultiplexer2} partitions {@link BeamFnApi.Elements} with multiple
+ * instruction ids ensuring that the receiver will only see {@link BeamFnApi.Elements} with a
+ * single instruction id.
+ *
+ * <p>The caller must {@link #unregisterConsumer unregister the consumer} when they no longer wish
+ * to receive messages.
+ */
+ public void registerConsumer(
+ String instructionId, CloseableFnDataReceiver<BeamFnApi.Elements> receiver) {
+ receiverFuture(instructionId).complete(receiver);
+ }
+
+ /** Unregisters a consumer. */
+ public void unregisterConsumer(String instructionId) {
+ receivers.remove(instructionId);
+ }
+
+ @VisibleForTesting
+ boolean hasConsumer(String instructionId) {
+ return receivers.containsKey(instructionId);
+ }
+
+ @Override
+ public void close() throws Exception {
+ Exception exception = null;
+ for (CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> receiver :
+ ImmutableList.copyOf(receivers.values())) {
+ // Cancel any observer waiting for the client to complete. If the receiver has already been
+ // completed or cancelled, this call will be ignored.
+ receiver.cancel(true);
+ if (!receiver.isCompletedExceptionally()) {
+ try {
+ receiver.get().close();
+ } catch (Exception e) {
+ if (exception == null) {
+ exception = e;
+ } else {
+ exception.addSuppressed(e);
+ }
+ }
+ }
+ }
+ // Cancel any outbound calls and complete any inbound calls, as this multiplexer is hanging up
+ outboundObserver.onError(
+ Status.CANCELLED.withDescription("Multiplexer hanging up").asException());
+ inboundObserver.onCompleted();
+ if (exception != null) {
+ throw exception;
+ }
+ }
+
+ /**
+ * A multiplexing {@link StreamObserver} that selects the inbound {@link Consumer} to pass the
+ * elements to.
+ *
+ * <p>The inbound observer blocks until the {@link Consumer} is bound allowing for the sending
+ * harness to initiate transmitting data without needing for the receiving harness to signal that
+ * it is ready to consume that data.
+ */
+ private final class InboundObserver implements StreamObserver<BeamFnApi.Elements> {
+ @Override
+ public void onNext(BeamFnApi.Elements value) {
+ // Have a fast path to handle the common case and provide a short circuit to exit if we detect
+ // multiple instruction ids.
+ SINGLE_INSTRUCTION_ID:
+ {
+ String instructionId = null;
+ for (BeamFnApi.Elements.Data data : value.getDataList()) {
+ if (instructionId == null) {
+ instructionId = data.getInstructionId();
+ } else if (!instructionId.equals(data.getInstructionId())) {
+ // Multiple instruction ids detected, break out of this block
+ break SINGLE_INSTRUCTION_ID;
+ }
+ }
+ for (BeamFnApi.Elements.Timers timers : value.getTimersList()) {
+ if (instructionId == null) {
+ instructionId = timers.getInstructionId();
+ } else if (!instructionId.equals(timers.getInstructionId())) {
+ // Multiple instruction ids detected, break out of this block
+ break SINGLE_INSTRUCTION_ID;
+ }
+ }
+ if (instructionId == null) {
+ return;
+ }
+ forwardToConsumerForInstructionId(instructionId, value);
+ return;
+ }
+
+ // Handle the case if there are multiple instruction ids.
+ HashSet<String> instructionIds = new HashSet<>();
+ for (BeamFnApi.Elements.Data data : value.getDataList()) {
+ instructionIds.add(data.getInstructionId());
+ }
+ for (BeamFnApi.Elements.Timers timers : value.getTimersList()) {
+ instructionIds.add(timers.getInstructionId());
+ }
+ for (String instructionId : instructionIds) {
+ BeamFnApi.Elements.Builder builder = BeamFnApi.Elements.newBuilder();
+ for (BeamFnApi.Elements.Data data : value.getDataList()) {
+ if (instructionId.equals(data.getInstructionId())) {
+ builder.addData(data);
+ }
+ }
+ for (BeamFnApi.Elements.Timers timers : value.getTimersList()) {
+ if (instructionId.equals(timers.getInstructionId())) {
+ builder.addTimers(timers);
+ }
+ }
+ forwardToConsumerForInstructionId(instructionId, builder.build());
+ }
+ }
+
+ private void forwardToConsumerForInstructionId(String instructionId, BeamFnApi.Elements value) {
+ if (erroredInstructionIds.containsKey(instructionId)) {
+ LOG.debug("Ignoring inbound data for failed instruction {}", instructionId);
+ return;
+ }
+ CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> consumerFuture =
+ receiverFuture(instructionId);
+ if (!consumerFuture.isDone()) {
+ LOG.debug(
+ "Received data for instruction {} without consumer ready. "
+ + "Waiting for consumer to be registered.",
+ instructionId);
+ }
+ CloseableFnDataReceiver<BeamFnApi.Elements> consumer;
+ try {
+ consumer = consumerFuture.get();
+
+ /*
+ * TODO: On failure we should fail any bundles that were impacted eagerly
+ * instead of relying on the Runner harness to do all the failure handling.
+ */
+ } catch (ExecutionException | InterruptedException e) {
+ LOG.error(
+ "Client interrupted during handling of data for instruction {}", instructionId, e);
+ outboundObserver.onError(e);
+ return;
+ } catch (RuntimeException e) {
+ LOG.error("Client failed to handle data for instruction {}", instructionId, e);
+ outboundObserver.onError(e);
+ return;
+ }
+ try {
+ consumer.accept(value);
+ } catch (Exception e) {
+ erroredInstructionIds.put(instructionId, true);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ LOG.error(
+ "Failed to handle for {}",
+ apiServiceDescriptor == null ? "unknown endpoint" : apiServiceDescriptor,
+ t);
+ outboundObserver.onCompleted();
+ ;
+ }
+
+ @Override
+ public void onCompleted() {
+ LOG.warn(
+ "Hanged up for {}.",
+ apiServiceDescriptor == null ? "unknown endpoint" : apiServiceDescriptor);
+ outboundObserver.onCompleted();
+ ;
+ }
+ }
+}
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2.java
new file mode 100644
index 0000000..887f650
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.CancellableQueue;
+
+/**
+ * Decodes {@link BeamFnApi.Elements} partitioning them using the provided {@link DataEndpoint}s and
+ * {@link TimerEndpoint}s.
+ *
+ * <p>Note that this receiver uses a queue to buffer and pass elements from one thread to be
+ * processed by the thread which invokes {@link #awaitCompletion}.
+ *
+ * <p>Closing the receiver will unblock any upstream producer and downstream consumer exceptionally.
+ */
+public class BeamFnDataInboundObserver2 implements CloseableFnDataReceiver<BeamFnApi.Elements> {
+
+ /**
+ * Creates a receiver that is able to consume elements multiplexing on to the provided set of
+ * endpoints.
+ */
+ public static BeamFnDataInboundObserver2 forConsumers(
+ List<DataEndpoint<?>> dataEndpoints, List<TimerEndpoint<?>> timerEndpoints) {
+ return new BeamFnDataInboundObserver2(dataEndpoints, timerEndpoints);
+ }
+
+ /** Holds the status of whether the endpoint has been completed or not. */
+ private static class EndpointStatus<T> {
+ final T endpoint;
+ boolean isDone;
+
+ EndpointStatus(T endpoint) {
+ this.endpoint = endpoint;
+ }
+ }
+
+ private final Map<String, EndpointStatus<DataEndpoint<?>>> transformIdToDataEndpoint;
+ private final Map<String, Map<String, EndpointStatus<TimerEndpoint<?>>>>
+ transformIdToTimerFamilyIdToTimerEndpoint;
+ private final CancellableQueue<BeamFnApi.Elements> queue;
+ private final int totalNumEndpoints;
+ private int numEndpointsThatAreIncomplete;
+
+ private BeamFnDataInboundObserver2(
+ List<DataEndpoint<?>> dataEndpoints, List<TimerEndpoint<?>> timerEndpoints) {
+ this.transformIdToDataEndpoint = new HashMap<>();
+ for (DataEndpoint<?> endpoint : dataEndpoints) {
+ transformIdToDataEndpoint.put(endpoint.getTransformId(), new EndpointStatus<>(endpoint));
+ }
+ this.transformIdToTimerFamilyIdToTimerEndpoint = new HashMap<>();
+ for (TimerEndpoint<?> endpoint : timerEndpoints) {
+ transformIdToTimerFamilyIdToTimerEndpoint
+ .computeIfAbsent(endpoint.getTransformId(), unused -> new HashMap<>())
+ .put(endpoint.getTimerFamilyId(), new EndpointStatus<>(endpoint));
+ }
+ this.queue = new CancellableQueue<>(100);
+ this.totalNumEndpoints = dataEndpoints.size() + timerEndpoints.size();
+ this.numEndpointsThatAreIncomplete = totalNumEndpoints;
+ }
+
+ @Override
+ public void accept(BeamFnApi.Elements elements) throws Exception {
+ queue.put(elements);
+ }
+
+ @Override
+ public void flush() throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws Exception {
+ queue.cancel(new IllegalStateException("Inbound observer closed."));
+ }
+
+ /**
+ * Uses the callers thread to process all elements received until we receive the end of the stream
+ * from the upstream producer for all endpoints specified.
+ *
+ * <p>Erroneous elements passed from the producer will be visible to the caller of this method.
+ */
+ public void awaitCompletion() throws Exception {
+ try {
+ while (true) {
+ BeamFnApi.Elements elements = queue.take();
+ for (BeamFnApi.Elements.Data data : elements.getDataList()) {
+ EndpointStatus<DataEndpoint<?>> endpoint =
+ transformIdToDataEndpoint.get(data.getTransformId());
+ if (endpoint == null) {
+ throw new IllegalStateException(
+ String.format(
+ "Unable to find inbound data receiver for instruction %s and transform %s.",
+ data.getInstructionId(), data.getTransformId()));
+ } else if (endpoint.isDone) {
+ throw new IllegalStateException(
+ String.format(
+ "Received data after inbound data receiver is done for instruction %s and transform %s.",
+ data.getInstructionId(), data.getTransformId()));
+ }
+ InputStream inputStream = data.getData().newInput();
+ Coder<Object> coder = (Coder<Object>) endpoint.endpoint.getCoder();
+ FnDataReceiver<Object> receiver =
+ (FnDataReceiver<Object>) endpoint.endpoint.getReceiver();
+ while (inputStream.available() > 0) {
+ receiver.accept(coder.decode(inputStream));
+ }
+ if (data.getIsLast()) {
+ endpoint.isDone = true;
+ numEndpointsThatAreIncomplete -= 1;
+ if (numEndpointsThatAreIncomplete == 0) {
+ return;
+ }
+ }
+ }
+
+ for (BeamFnApi.Elements.Timers timers : elements.getTimersList()) {
+ Map<String, EndpointStatus<TimerEndpoint<?>>> timerFamilyIdToEndpoints =
+ transformIdToTimerFamilyIdToTimerEndpoint.get(timers.getTransformId());
+ if (timerFamilyIdToEndpoints == null) {
+ throw new IllegalStateException(
+ String.format(
+ "Unable to find inbound timer receiver for instruction %s, transform %s, and timer family %s.",
+ timers.getInstructionId(), timers.getTransformId(), timers.getTimerFamilyId()));
+ }
+ EndpointStatus<TimerEndpoint<?>> endpoint =
+ timerFamilyIdToEndpoints.get(timers.getTimerFamilyId());
+ if (endpoint == null) {
+ throw new IllegalStateException(
+ String.format(
+ "Unable to find inbound timer receiver for instruction %s, transform %s, and timer family %s.",
+ timers.getInstructionId(), timers.getTransformId(), timers.getTimerFamilyId()));
+ } else if (endpoint.isDone) {
+ throw new IllegalStateException(
+ String.format(
+ "Received timer after inbound timer receiver is done for instruction %s, transform %s, and timer family %s.",
+ timers.getInstructionId(), timers.getTransformId(), timers.getTimerFamilyId()));
+ }
+ InputStream inputStream = timers.getTimers().newInput();
+ Coder<Object> coder = (Coder<Object>) endpoint.endpoint.getCoder();
+ FnDataReceiver<Object> receiver =
+ (FnDataReceiver<Object>) endpoint.endpoint.getReceiver();
+ while (inputStream.available() > 0) {
+ receiver.accept(coder.decode(inputStream));
+ }
+ if (timers.getIsLast()) {
+ numEndpointsThatAreIncomplete -= 1;
+ if (numEndpointsThatAreIncomplete == 0) {
+ return;
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ queue.cancel(e);
+ throw e;
+ } finally {
+ close();
+ }
+ }
+
+ /** Enables this receiver to be used again for another bundle. */
+ public void reset() {
+ numEndpointsThatAreIncomplete = totalNumEndpoints;
+ for (EndpointStatus<?> value : transformIdToDataEndpoint.values()) {
+ value.isDone = false;
+ }
+ for (Map<String, EndpointStatus<TimerEndpoint<?>>> value :
+ transformIdToTimerFamilyIdToTimerEndpoint.values()) {
+ for (EndpointStatus<?> status : value.values()) {
+ status.isDone = false;
+ }
+ }
+ queue.reset();
+ }
+}
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/DataEndpoint.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/DataEndpoint.java
new file mode 100644
index 0000000..e789cb1
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/DataEndpoint.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+
+@AutoValue
+public abstract class DataEndpoint<T> {
+ public static <T> DataEndpoint<T> create(
+ String transformId, Coder<T> coder, FnDataReceiver<T> receiver) {
+ return new AutoValue_DataEndpoint<>(transformId, coder, receiver);
+ }
+
+ public abstract String getTransformId();
+
+ public abstract Coder<T> getCoder();
+
+ public abstract FnDataReceiver<T> getReceiver();
+}
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/TimerEndpoint.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/TimerEndpoint.java
new file mode 100644
index 0000000..f32503a
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/TimerEndpoint.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+
+@AutoValue
+public abstract class TimerEndpoint<T> {
+ public static <T> TimerEndpoint<T> create(
+ String transformId, String timerFamilyId, Coder<T> coder, FnDataReceiver<T> receiver) {
+ return new AutoValue_TimerEndpoint<>(transformId, timerFamilyId, coder, receiver);
+ }
+
+ public abstract String getTransformId();
+
+ public abstract String getTimerFamilyId();
+
+ public abstract Coder<T> getCoder();
+
+ public abstract FnDataReceiver<T> getReceiver();
+}
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2Test.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2Test.java
new file mode 100644
index 0000000..c517995
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2Test.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.core.StringContains.containsString;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
+import org.apache.beam.sdk.fn.test.TestExecutors;
+import org.apache.beam.sdk.fn.test.TestExecutors.TestExecutorService;
+import org.apache.beam.sdk.fn.test.TestStreams;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.junit.Rule;
+import org.junit.Test;
+
+/** Tests for {@link BeamFnDataGrpcMultiplexer2}. */
+public class BeamFnDataGrpcMultiplexer2Test {
+
+ private static final Endpoints.ApiServiceDescriptor DESCRIPTOR =
+ Endpoints.ApiServiceDescriptor.newBuilder().setUrl("test").build();
+ private static final String DATA_INSTRUCTION_ID = "dataInstructionId";
+ private static final String TIMER_INSTRUCTION_ID = "timerInstructionId";
+ private static final BeamFnApi.Elements ELEMENTS =
+ BeamFnApi.Elements.newBuilder()
+ .addData(
+ BeamFnApi.Elements.Data.newBuilder()
+ .setInstructionId(DATA_INSTRUCTION_ID)
+ .setTransformId("dataTransformId")
+ .setData(ByteString.copyFrom(new byte[1])))
+ .addTimers(
+ BeamFnApi.Elements.Timers.newBuilder()
+ .setInstructionId(TIMER_INSTRUCTION_ID)
+ .setTransformId("timerTransformId")
+ .setTimerFamilyId("timerFamilyId")
+ .setTimers(ByteString.copyFrom(new byte[2])))
+ .build();
+ private static final BeamFnApi.Elements TERMINAL_ELEMENTS =
+ BeamFnApi.Elements.newBuilder()
+ .addData(
+ BeamFnApi.Elements.Data.newBuilder()
+ .setInstructionId(DATA_INSTRUCTION_ID)
+ .setTransformId("dataTransformId")
+ .setIsLast(true))
+ .addTimers(
+ BeamFnApi.Elements.Timers.newBuilder()
+ .setInstructionId(TIMER_INSTRUCTION_ID)
+ .setTransformId("timerTransformId")
+ .setTimerFamilyId("timerFamilyId")
+ .setIsLast(true))
+ .build();
+
+ @Rule
+ public final TestExecutorService executor = TestExecutors.from(Executors::newCachedThreadPool);
+
+ @Test
+ public void testOutboundObserver() {
+ Collection<BeamFnApi.Elements> values = new ArrayList<>();
+ BeamFnDataGrpcMultiplexer2 multiplexer =
+ new BeamFnDataGrpcMultiplexer2(
+ DESCRIPTOR,
+ OutboundObserverFactory.clientDirect(),
+ inboundObserver -> TestStreams.withOnNext(values::add).build());
+ multiplexer.getOutboundObserver().onNext(ELEMENTS);
+ assertThat(values, contains(ELEMENTS));
+ }
+
+ @Test
+ public void testInboundObserverBlocksTillConsumerConnects() throws Exception {
+ Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
+ Collection<BeamFnApi.Elements> dataInboundValues = new ArrayList<>();
+ Collection<BeamFnApi.Elements> timerInboundValues = new ArrayList<>();
+ BeamFnDataGrpcMultiplexer2 multiplexer =
+ new BeamFnDataGrpcMultiplexer2(
+ DESCRIPTOR,
+ OutboundObserverFactory.clientDirect(),
+ inboundObserver -> TestStreams.withOnNext(outboundValues::add).build());
+ Future<?> registerFuture =
+ executor.submit(
+ () -> {
+ multiplexer.registerConsumer(
+ DATA_INSTRUCTION_ID,
+ new CloseableFnDataReceiver<BeamFnApi.Elements>() {
+ @Override
+ public void flush() throws Exception {
+ fail("Unexpected call");
+ }
+
+ @Override
+ public void close() throws Exception {
+ fail("Unexpected call");
+ }
+
+ @Override
+ public void accept(BeamFnApi.Elements input) throws Exception {
+ dataInboundValues.add(input);
+ }
+ });
+ multiplexer.registerConsumer(
+ TIMER_INSTRUCTION_ID,
+ new CloseableFnDataReceiver<BeamFnApi.Elements>() {
+ @Override
+ public void flush() throws Exception {
+ fail("Unexpected call");
+ }
+
+ @Override
+ public void close() throws Exception {
+ fail("Unexpected call");
+ }
+
+ @Override
+ public void accept(BeamFnApi.Elements input) throws Exception {
+ timerInboundValues.add(input);
+ }
+ });
+ });
+
+ multiplexer.getInboundObserver().onNext(ELEMENTS);
+ assertTrue(multiplexer.hasConsumer(DATA_INSTRUCTION_ID));
+ assertTrue(multiplexer.hasConsumer(TIMER_INSTRUCTION_ID));
+
+ // Verify that on terminal elements we still wait to be unregistered.
+ multiplexer.getInboundObserver().onNext(TERMINAL_ELEMENTS);
+ assertTrue(multiplexer.hasConsumer(DATA_INSTRUCTION_ID));
+ assertTrue(multiplexer.hasConsumer(TIMER_INSTRUCTION_ID));
+
+ registerFuture.get();
+ multiplexer.unregisterConsumer(DATA_INSTRUCTION_ID);
+ multiplexer.unregisterConsumer(TIMER_INSTRUCTION_ID);
+ assertFalse(multiplexer.hasConsumer(DATA_INSTRUCTION_ID));
+ assertFalse(multiplexer.hasConsumer(TIMER_INSTRUCTION_ID));
+
+ // Assert that normal and terminal Elements are passed to the consumer
+ assertThat(
+ dataInboundValues,
+ contains(
+ ELEMENTS.toBuilder().clearTimers().build(),
+ TERMINAL_ELEMENTS.toBuilder().clearTimers().build()));
+ assertThat(
+ timerInboundValues,
+ contains(
+ ELEMENTS.toBuilder().clearData().build(),
+ TERMINAL_ELEMENTS.toBuilder().clearData().build()));
+ }
+
+ @Test
+ public void testElementsNeedsPartitioning() throws Exception {
+ Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
+ Collection<BeamFnApi.Elements> dataInboundValues = new ArrayList<>();
+ Collection<BeamFnApi.Elements> timerInboundValues = new ArrayList<>();
+ BeamFnDataGrpcMultiplexer2 multiplexer =
+ new BeamFnDataGrpcMultiplexer2(
+ DESCRIPTOR,
+ OutboundObserverFactory.clientDirect(),
+ inboundObserver -> TestStreams.withOnNext(outboundValues::add).build());
+ multiplexer.registerConsumer(
+ DATA_INSTRUCTION_ID,
+ new CloseableFnDataReceiver<BeamFnApi.Elements>() {
+ @Override
+ public void flush() throws Exception {
+ fail("Unexpected call");
+ }
+
+ @Override
+ public void close() throws Exception {
+ fail("Unexpected call");
+ }
+
+ @Override
+ public void accept(BeamFnApi.Elements input) throws Exception {
+ dataInboundValues.add(input);
+ }
+ });
+ multiplexer.registerConsumer(
+ TIMER_INSTRUCTION_ID,
+ new CloseableFnDataReceiver<BeamFnApi.Elements>() {
+ @Override
+ public void flush() throws Exception {
+ fail("Unexpected call");
+ }
+
+ @Override
+ public void close() throws Exception {
+ fail("Unexpected call");
+ }
+
+ @Override
+ public void accept(BeamFnApi.Elements input) throws Exception {
+ timerInboundValues.add(input);
+ }
+ });
+
+ multiplexer.getInboundObserver().onNext(ELEMENTS);
+ multiplexer.getInboundObserver().onNext(TERMINAL_ELEMENTS);
+
+ // Assert that elements are partitioned based upon the instruction id.
+ assertThat(
+ dataInboundValues,
+ contains(
+ ELEMENTS.toBuilder().clearTimers().build(),
+ TERMINAL_ELEMENTS.toBuilder().clearTimers().build()));
+ assertThat(
+ timerInboundValues,
+ contains(
+ ELEMENTS.toBuilder().clearData().build(),
+ TERMINAL_ELEMENTS.toBuilder().clearData().build()));
+ }
+
+ @Test
+ public void testElementsWithOnlySingleInstructionIdUsingHotPath() throws Exception {
+ Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
+ Collection<BeamFnApi.Elements> dataInboundValues = new ArrayList<>();
+ BeamFnDataGrpcMultiplexer2 multiplexer =
+ new BeamFnDataGrpcMultiplexer2(
+ DESCRIPTOR,
+ OutboundObserverFactory.clientDirect(),
+ inboundObserver -> TestStreams.withOnNext(outboundValues::add).build());
+ multiplexer.registerConsumer(
+ DATA_INSTRUCTION_ID,
+ new CloseableFnDataReceiver<BeamFnApi.Elements>() {
+ @Override
+ public void flush() throws Exception {
+ fail("Unexpected call");
+ }
+
+ @Override
+ public void close() throws Exception {
+ fail("Unexpected call");
+ }
+
+ @Override
+ public void accept(BeamFnApi.Elements input) throws Exception {
+ dataInboundValues.add(input);
+ }
+ });
+
+ BeamFnApi.Elements value = ELEMENTS.toBuilder().clearTimers().build();
+
+ multiplexer.getInboundObserver().onNext(value);
+
+ // Assert that we passed the same instance through.
+ assertSame(Iterables.getOnlyElement(dataInboundValues), value);
+ }
+
+ @Test
+ public void testFailedProcessingCausesAdditionalInboundDataToBeIgnored() throws Exception {
+ Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
+ Collection<BeamFnApi.Elements> dataInboundValues = new ArrayList<>();
+ BeamFnDataGrpcMultiplexer2 multiplexer =
+ new BeamFnDataGrpcMultiplexer2(
+ DESCRIPTOR,
+ OutboundObserverFactory.clientDirect(),
+ inboundObserver -> TestStreams.withOnNext(outboundValues::add).build());
+ multiplexer.registerConsumer(
+ DATA_INSTRUCTION_ID,
+ new CloseableFnDataReceiver<BeamFnApi.Elements>() {
+ @Override
+ public void flush() throws Exception {
+ fail("Unexpected call");
+ }
+
+ @Override
+ public void close() throws Exception {
+ fail("Unexpected call");
+ }
+
+ @Override
+ public void accept(BeamFnApi.Elements input) throws Exception {
+ if (dataInboundValues.size() == 1) {
+ throw new Exception("processing failed");
+ }
+ dataInboundValues.add(input);
+ }
+ });
+
+ BeamFnApi.Elements.Data.Builder data =
+ BeamFnApi.Elements.Data.newBuilder().setInstructionId(DATA_INSTRUCTION_ID);
+
+ multiplexer
+ .getInboundObserver()
+ .onNext(BeamFnApi.Elements.newBuilder().addData(data.setTransformId("A").build()).build());
+ multiplexer
+ .getInboundObserver()
+ .onNext(BeamFnApi.Elements.newBuilder().addData(data.setTransformId("B").build()).build());
+ multiplexer
+ .getInboundObserver()
+ .onNext(BeamFnApi.Elements.newBuilder().addData(data.setTransformId("C").build()).build());
+
+ // Assert that we ignored the other two elements
+ assertThat(
+ dataInboundValues,
+ contains(
+ BeamFnApi.Elements.newBuilder().addData(data.setTransformId("A").build()).build()));
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
+ Collection<Throwable> errorWasReturned = new ArrayList<>();
+ AtomicBoolean wasClosed = new AtomicBoolean();
+ final BeamFnDataGrpcMultiplexer2 multiplexer =
+ new BeamFnDataGrpcMultiplexer2(
+ DESCRIPTOR,
+ OutboundObserverFactory.clientDirect(),
+ inboundObserver ->
+ TestStreams.withOnNext(outboundValues::add)
+ .withOnError(errorWasReturned::add)
+ .build());
+ multiplexer.registerConsumer(
+ DATA_INSTRUCTION_ID,
+ new CloseableFnDataReceiver<BeamFnApi.Elements>() {
+ @Override
+ public void flush() throws Exception {
+ fail("Unexpected call");
+ }
+
+ @Override
+ public void close() throws Exception {
+ wasClosed.set(true);
+ }
+
+ @Override
+ public void accept(BeamFnApi.Elements input) throws Exception {
+ fail("Unexpected call");
+ }
+ });
+
+ multiplexer.close();
+
+ assertTrue(wasClosed.get());
+ assertThat(
+ Iterables.getOnlyElement(errorWasReturned).getMessage(),
+ containsString("Multiplexer hanging up"));
+ }
+}
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2Test.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2Test.java
new file mode 100644
index 0000000..8b2b679
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2Test.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.fn.test.TestExecutors;
+import org.apache.beam.sdk.fn.test.TestExecutors.TestExecutorService;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link BeamFnDataInboundObserver2}. */
+@RunWith(JUnit4.class)
+public class BeamFnDataInboundObserver2Test {
+ private static final Coder<WindowedValue<String>> CODER =
+ WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
+ private static final String TRANSFORM_ID = "transformId";
+ private static final String TIMER_FAMILY_ID = "timerFamilyId";
+
+ @Rule
+ public final TestExecutorService executor = TestExecutors.from(Executors::newCachedThreadPool);
+
+ @Test
+ public void testConsumptionOfValuesHappensOnAwaitCompletionCallersThread() throws Exception {
+ Thread thread = Thread.currentThread();
+ Collection<WindowedValue<String>> values = new ArrayList<>();
+ Collection<WindowedValue<String>> timers = new ArrayList<>();
+ BeamFnDataInboundObserver2 observer =
+ BeamFnDataInboundObserver2.forConsumers(
+ Arrays.asList(
+ DataEndpoint.create(
+ TRANSFORM_ID,
+ CODER,
+ (value) -> {
+ assertSame(thread, Thread.currentThread());
+ values.add(value);
+ })),
+ Arrays.asList(
+ TimerEndpoint.create(
+ TRANSFORM_ID,
+ TIMER_FAMILY_ID,
+ CODER,
+ (value) -> {
+ assertSame(thread, Thread.currentThread());
+ timers.add(value);
+ })));
+
+ Future<?> future =
+ executor.submit(
+ () -> {
+ // Test decoding multiple messages
+ observer.accept(dataWith("ABC", "DEF", "GHI"));
+ observer.accept(lastData());
+ observer.accept(timerWith("UVW"));
+ observer.accept(timerWith("XYZ"));
+ observer.accept(lastTimer());
+ return null;
+ });
+
+ observer.awaitCompletion();
+ assertThat(
+ values,
+ contains(
+ valueInGlobalWindow("ABC"), valueInGlobalWindow("DEF"), valueInGlobalWindow("GHI")));
+ assertThat(timers, contains(valueInGlobalWindow("UVW"), valueInGlobalWindow("XYZ")));
+ future.get();
+ }
+
+ @Test
+ public void testAwaitCompletionFailureVisibleToAwaitCompletionCallerAndProducer()
+ throws Exception {
+ BeamFnDataInboundObserver2 observer =
+ BeamFnDataInboundObserver2.forConsumers(
+ Arrays.asList(
+ DataEndpoint.create(
+ TRANSFORM_ID,
+ CODER,
+ (value) -> {
+ throw new Exception("test consumer failed");
+ })),
+ Collections.emptyList());
+
+ Future<?> future =
+ executor.submit(
+ () -> {
+ observer.accept(dataWith("ABC"));
+ assertThrows(
+ "test consumer failed",
+ Exception.class,
+ () -> {
+ while (true) {
+ // keep trying to send messages since the queue buffers messages and the
+ // consumer
+ // may have not yet noticed the bad state.
+ observer.accept(dataWith("ABC"));
+ }
+ });
+ return null;
+ });
+
+ assertThrows("test consumer failed", Exception.class, () -> observer.awaitCompletion());
+ future.get();
+ }
+
+ @Test
+ public void testCloseVisibleToAwaitCompletionCallerAndProducer() throws Exception {
+ BeamFnDataInboundObserver2 observer =
+ BeamFnDataInboundObserver2.forConsumers(
+ Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER, (value) -> {})),
+ Collections.emptyList());
+
+ Future<?> future =
+ executor.submit(
+ () -> {
+ observer.accept(dataWith("ABC"));
+ assertThrows(
+ "Inbound observer closed",
+ IllegalStateException.class,
+ () -> {
+ while (true) {
+ // keep trying to send messages since the queue buffers messages and the
+ // consumer
+ // may have not yet noticed the bad state.
+ observer.accept(dataWith("ABC"));
+ }
+ });
+ return null;
+ });
+ Future<?> future2 =
+ executor.submit(
+ () -> {
+ observer.close();
+ return null;
+ });
+
+ assertThrows(
+ "Inbound observer closed", IllegalStateException.class, () -> observer.awaitCompletion());
+ future.get();
+ future2.get();
+ }
+
+ @Test
+ public void testBadProducerDataFailureVisibleToAwaitCompletionCallerAndProducer()
+ throws Exception {
+ BeamFnDataInboundObserver2 observer =
+ BeamFnDataInboundObserver2.forConsumers(
+ Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER, (value) -> {})),
+ Collections.emptyList());
+ Future<?> future =
+ executor.submit(
+ () -> {
+ observer.accept(timerWith("DEF"));
+ assertThrows(
+ "Unable to find inbound timer receiver for instruction",
+ IllegalStateException.class,
+ () -> {
+ // keep trying to send messages since the queue buffers messages and the
+ // consumer
+ // may have not yet noticed the bad state.
+ while (true) {
+ observer.accept(dataWith("ABC"));
+ }
+ });
+ return null;
+ });
+
+ assertThrows(
+ "Unable to find inbound timer receiver for instruction",
+ IllegalStateException.class,
+ () -> observer.awaitCompletion());
+ future.get();
+ }
+
+ private BeamFnApi.Elements dataWith(String... values) throws Exception {
+ ByteString.Output output = ByteString.newOutput();
+ for (String value : values) {
+ CODER.encode(valueInGlobalWindow(value), output);
+ }
+ return BeamFnApi.Elements.newBuilder()
+ .addData(
+ BeamFnApi.Elements.Data.newBuilder()
+ .setTransformId(TRANSFORM_ID)
+ .setData(output.toByteString()))
+ .build();
+ }
+
+ private BeamFnApi.Elements lastData() throws Exception {
+ return BeamFnApi.Elements.newBuilder()
+ .addData(BeamFnApi.Elements.Data.newBuilder().setTransformId(TRANSFORM_ID).setIsLast(true))
+ .build();
+ }
+
+ private BeamFnApi.Elements timerWith(String... values) throws Exception {
+ ByteString.Output output = ByteString.newOutput();
+ for (String value : values) {
+ CODER.encode(valueInGlobalWindow(value), output);
+ }
+ return BeamFnApi.Elements.newBuilder()
+ .addTimers(
+ BeamFnApi.Elements.Timers.newBuilder()
+ .setTransformId(TRANSFORM_ID)
+ .setTimerFamilyId(TIMER_FAMILY_ID)
+ .setTimers(output.toByteString()))
+ .build();
+ }
+
+ private BeamFnApi.Elements lastTimer() throws Exception {
+ return BeamFnApi.Elements.newBuilder()
+ .addTimers(
+ BeamFnApi.Elements.Timers.newBuilder()
+ .setTransformId(TRANSFORM_ID)
+ .setTimerFamilyId(TIMER_FAMILY_ID)
+ .setIsLast(true))
+ .build();
+ }
+}