You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2021/10/22 01:41:31 UTC

[beam] branch master updated: [BEAM-13015] Create a multiplexer that sends Elements based upon instruction id allowing for an inbound observer responsible for the entire instruction id. (#15747)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6173abb  [BEAM-13015] Create a multiplexer that sends Elements based upon instruction id allowing for an inbound observer responsible for the entire instruction id. (#15747)
6173abb is described below

commit 6173abb2e241865d89dff9ae679f4d422be84ee9
Author: Lukasz Cwik <lu...@gmail.com>
AuthorDate: Thu Oct 21 18:40:10 2021 -0700

    [BEAM-13015] Create a multiplexer that sends Elements based upon instruction id allowing for an inbound observer responsible for the entire instruction id. (#15747)
    
    * [BEAM-13015] Create a multiplexer that sends Elements based upon instruction id allowing for an inbound observer responsible for the entire instruction id.
    
    * fixup! checkstyle
    
    * Update sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java
    
    Co-authored-by: Brian Hulette <hu...@gmail.com>
    
    * fixup! Address PR comments.
    
    Co-authored-by: Brian Hulette <hu...@gmail.com>
---
 .../sdk/fn/data/BeamFnDataGrpcMultiplexer2.java    | 274 ++++++++++++++++
 .../sdk/fn/data/BeamFnDataInboundObserver2.java    | 196 +++++++++++
 .../org/apache/beam/sdk/fn/data/DataEndpoint.java  |  35 ++
 .../org/apache/beam/sdk/fn/data/TimerEndpoint.java |  37 +++
 .../fn/data/BeamFnDataGrpcMultiplexer2Test.java    | 363 +++++++++++++++++++++
 .../fn/data/BeamFnDataInboundObserver2Test.java    | 249 ++++++++++++++
 6 files changed, 1154 insertions(+)

diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java
new file mode 100644
index 0000000..c00db8c
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.sdk.fn.CancellableQueue;
+import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
+import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.Status;
+import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A gRPC multiplexer for a specific {@link Endpoints.ApiServiceDescriptor}.
+ *
+ * <p>Multiplexes data for inbound consumers based upon their {@code instructionId}.
+ *
+ * <p>Multiplexing inbound and outbound streams is as thread safe as the consumers of those streams.
+ * For inbound streams, this is as thread safe as the inbound observers. For outbound streams, this
+ * is as thread safe as the underlying stream observer.
+ *
+ * <p>TODO: Add support for multiplexing over multiple outbound observers by stickying the output
+ * location with a specific outbound observer.
+ */
+public class BeamFnDataGrpcMultiplexer2 implements AutoCloseable {
+  private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataGrpcMultiplexer2.class);
+  private final Endpoints.@Nullable ApiServiceDescriptor apiServiceDescriptor;
+  private final StreamObserver<BeamFnApi.Elements> inboundObserver;
+  private final StreamObserver<BeamFnApi.Elements> outboundObserver;
+  private final ConcurrentMap<
+          /*instructionId=*/ String, CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>>>
+      receivers;
+  private final ConcurrentMap<String, Boolean> erroredInstructionIds;
+  private final List<CancellableQueue<BeamFnApi.Elements>> unusedQueues;
+
+  public BeamFnDataGrpcMultiplexer2(
+      Endpoints.@Nullable ApiServiceDescriptor apiServiceDescriptor,
+      OutboundObserverFactory outboundObserverFactory,
+      OutboundObserverFactory.BasicFactory<BeamFnApi.Elements, BeamFnApi.Elements>
+          baseOutboundObserverFactory) {
+    this.apiServiceDescriptor = apiServiceDescriptor;
+    this.receivers = new ConcurrentHashMap<>();
+    this.inboundObserver = new InboundObserver();
+    this.outboundObserver =
+        outboundObserverFactory.outboundObserverFor(baseOutboundObserverFactory, inboundObserver);
+    this.erroredInstructionIds = new ConcurrentHashMap<>();
+    this.unusedQueues = new ArrayList<>(100);
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .omitNullValues()
+        .add("apiServiceDescriptor", apiServiceDescriptor)
+        .add("consumers", receivers)
+        .toString();
+  }
+
+  public StreamObserver<BeamFnApi.Elements> getInboundObserver() {
+    return inboundObserver;
+  }
+
+  public StreamObserver<BeamFnApi.Elements> getOutboundObserver() {
+    return outboundObserver;
+  }
+
+  private CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> receiverFuture(
+      String instructionId) {
+    return receivers.computeIfAbsent(instructionId, (unused) -> new CompletableFuture<>());
+  }
+
+  /**
+   * Registers a consumer for the specified intruction id.
+   *
+   * <p>The {@link BeamFnDataGrpcMultiplexer2} partitions {@link BeamFnApi.Elements} with multiple
+   * instruction ids ensuring that the receiver will only see {@link BeamFnApi.Elements} with a
+   * single instruction id.
+   *
+   * <p>The caller must {@link #unregisterConsumer unregister the consumer} when they no longer wish
+   * to receive messages.
+   */
+  public void registerConsumer(
+      String instructionId, CloseableFnDataReceiver<BeamFnApi.Elements> receiver) {
+    receiverFuture(instructionId).complete(receiver);
+  }
+
+  /** Unregisters a consumer. */
+  public void unregisterConsumer(String instructionId) {
+    receivers.remove(instructionId);
+  }
+
+  @VisibleForTesting
+  boolean hasConsumer(String instructionId) {
+    return receivers.containsKey(instructionId);
+  }
+
+  @Override
+  public void close() throws Exception {
+    Exception exception = null;
+    for (CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> receiver :
+        ImmutableList.copyOf(receivers.values())) {
+      // Cancel any observer waiting for the client to complete. If the receiver has already been
+      // completed or cancelled, this call will be ignored.
+      receiver.cancel(true);
+      if (!receiver.isCompletedExceptionally()) {
+        try {
+          receiver.get().close();
+        } catch (Exception e) {
+          if (exception == null) {
+            exception = e;
+          } else {
+            exception.addSuppressed(e);
+          }
+        }
+      }
+    }
+    // Cancel any outbound calls and complete any inbound calls, as this multiplexer is hanging up
+    outboundObserver.onError(
+        Status.CANCELLED.withDescription("Multiplexer hanging up").asException());
+    inboundObserver.onCompleted();
+    if (exception != null) {
+      throw exception;
+    }
+  }
+
+  /**
+   * A multiplexing {@link StreamObserver} that selects the inbound {@link Consumer} to pass the
+   * elements to.
+   *
+   * <p>The inbound observer blocks until the {@link Consumer} is bound allowing for the sending
+   * harness to initiate transmitting data without needing for the receiving harness to signal that
+   * it is ready to consume that data.
+   */
+  private final class InboundObserver implements StreamObserver<BeamFnApi.Elements> {
+    @Override
+    public void onNext(BeamFnApi.Elements value) {
+      // Have a fast path to handle the common case and provide a short circuit to exit if we detect
+      // multiple instruction ids.
+      SINGLE_INSTRUCTION_ID:
+      {
+        String instructionId = null;
+        for (BeamFnApi.Elements.Data data : value.getDataList()) {
+          if (instructionId == null) {
+            instructionId = data.getInstructionId();
+          } else if (!instructionId.equals(data.getInstructionId())) {
+            // Multiple instruction ids detected, break out of this block
+            break SINGLE_INSTRUCTION_ID;
+          }
+        }
+        for (BeamFnApi.Elements.Timers timers : value.getTimersList()) {
+          if (instructionId == null) {
+            instructionId = timers.getInstructionId();
+          } else if (!instructionId.equals(timers.getInstructionId())) {
+            // Multiple instruction ids detected, break out of this block
+            break SINGLE_INSTRUCTION_ID;
+          }
+        }
+        if (instructionId == null) {
+          return;
+        }
+        forwardToConsumerForInstructionId(instructionId, value);
+        return;
+      }
+
+      // Handle the case if there are multiple instruction ids.
+      HashSet<String> instructionIds = new HashSet<>();
+      for (BeamFnApi.Elements.Data data : value.getDataList()) {
+        instructionIds.add(data.getInstructionId());
+      }
+      for (BeamFnApi.Elements.Timers timers : value.getTimersList()) {
+        instructionIds.add(timers.getInstructionId());
+      }
+      for (String instructionId : instructionIds) {
+        BeamFnApi.Elements.Builder builder = BeamFnApi.Elements.newBuilder();
+        for (BeamFnApi.Elements.Data data : value.getDataList()) {
+          if (instructionId.equals(data.getInstructionId())) {
+            builder.addData(data);
+          }
+        }
+        for (BeamFnApi.Elements.Timers timers : value.getTimersList()) {
+          if (instructionId.equals(timers.getInstructionId())) {
+            builder.addTimers(timers);
+          }
+        }
+        forwardToConsumerForInstructionId(instructionId, builder.build());
+      }
+    }
+
+    private void forwardToConsumerForInstructionId(String instructionId, BeamFnApi.Elements value) {
+      if (erroredInstructionIds.containsKey(instructionId)) {
+        LOG.debug("Ignoring inbound data for failed instruction {}", instructionId);
+        return;
+      }
+      CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> consumerFuture =
+          receiverFuture(instructionId);
+      if (!consumerFuture.isDone()) {
+        LOG.debug(
+            "Received data for instruction {} without consumer ready. "
+                + "Waiting for consumer to be registered.",
+            instructionId);
+      }
+      CloseableFnDataReceiver<BeamFnApi.Elements> consumer;
+      try {
+        consumer = consumerFuture.get();
+
+        /*
+         * TODO: On failure we should fail any bundles that were impacted eagerly
+         * instead of relying on the Runner harness to do all the failure handling.
+         */
+      } catch (ExecutionException | InterruptedException e) {
+        LOG.error(
+            "Client interrupted during handling of data for instruction {}", instructionId, e);
+        outboundObserver.onError(e);
+        return;
+      } catch (RuntimeException e) {
+        LOG.error("Client failed to handle data for instruction {}", instructionId, e);
+        outboundObserver.onError(e);
+        return;
+      }
+      try {
+        consumer.accept(value);
+      } catch (Exception e) {
+        erroredInstructionIds.put(instructionId, true);
+      }
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      LOG.error(
+          "Failed to handle for {}",
+          apiServiceDescriptor == null ? "unknown endpoint" : apiServiceDescriptor,
+          t);
+      outboundObserver.onCompleted();
+      ;
+    }
+
+    @Override
+    public void onCompleted() {
+      LOG.warn(
+          "Hanged up for {}.",
+          apiServiceDescriptor == null ? "unknown endpoint" : apiServiceDescriptor);
+      outboundObserver.onCompleted();
+      ;
+    }
+  }
+}
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2.java
new file mode 100644
index 0000000..887f650
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.CancellableQueue;
+
+/**
+ * Decodes {@link BeamFnApi.Elements} partitioning them using the provided {@link DataEndpoint}s and
+ * {@link TimerEndpoint}s.
+ *
+ * <p>Note that this receiver uses a queue to buffer and pass elements from one thread to be
+ * processed by the thread which invokes {@link #awaitCompletion}.
+ *
+ * <p>Closing the receiver will unblock any upstream producer and downstream consumer exceptionally.
+ */
+public class BeamFnDataInboundObserver2 implements CloseableFnDataReceiver<BeamFnApi.Elements> {
+
+  /**
+   * Creates a receiver that is able to consume elements multiplexing on to the provided set of
+   * endpoints.
+   */
+  public static BeamFnDataInboundObserver2 forConsumers(
+      List<DataEndpoint<?>> dataEndpoints, List<TimerEndpoint<?>> timerEndpoints) {
+    return new BeamFnDataInboundObserver2(dataEndpoints, timerEndpoints);
+  }
+
+  /** Holds the status of whether the endpoint has been completed or not. */
+  private static class EndpointStatus<T> {
+    final T endpoint;
+    boolean isDone;
+
+    EndpointStatus(T endpoint) {
+      this.endpoint = endpoint;
+    }
+  }
+
+  private final Map<String, EndpointStatus<DataEndpoint<?>>> transformIdToDataEndpoint;
+  private final Map<String, Map<String, EndpointStatus<TimerEndpoint<?>>>>
+      transformIdToTimerFamilyIdToTimerEndpoint;
+  private final CancellableQueue<BeamFnApi.Elements> queue;
+  private final int totalNumEndpoints;
+  private int numEndpointsThatAreIncomplete;
+
+  private BeamFnDataInboundObserver2(
+      List<DataEndpoint<?>> dataEndpoints, List<TimerEndpoint<?>> timerEndpoints) {
+    this.transformIdToDataEndpoint = new HashMap<>();
+    for (DataEndpoint<?> endpoint : dataEndpoints) {
+      transformIdToDataEndpoint.put(endpoint.getTransformId(), new EndpointStatus<>(endpoint));
+    }
+    this.transformIdToTimerFamilyIdToTimerEndpoint = new HashMap<>();
+    for (TimerEndpoint<?> endpoint : timerEndpoints) {
+      transformIdToTimerFamilyIdToTimerEndpoint
+          .computeIfAbsent(endpoint.getTransformId(), unused -> new HashMap<>())
+          .put(endpoint.getTimerFamilyId(), new EndpointStatus<>(endpoint));
+    }
+    this.queue = new CancellableQueue<>(100);
+    this.totalNumEndpoints = dataEndpoints.size() + timerEndpoints.size();
+    this.numEndpointsThatAreIncomplete = totalNumEndpoints;
+  }
+
+  @Override
+  public void accept(BeamFnApi.Elements elements) throws Exception {
+    queue.put(elements);
+  }
+
+  @Override
+  public void flush() throws Exception {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void close() throws Exception {
+    queue.cancel(new IllegalStateException("Inbound observer closed."));
+  }
+
+  /**
+   * Uses the callers thread to process all elements received until we receive the end of the stream
+   * from the upstream producer for all endpoints specified.
+   *
+   * <p>Erroneous elements passed from the producer will be visible to the caller of this method.
+   */
+  public void awaitCompletion() throws Exception {
+    try {
+      while (true) {
+        BeamFnApi.Elements elements = queue.take();
+        for (BeamFnApi.Elements.Data data : elements.getDataList()) {
+          EndpointStatus<DataEndpoint<?>> endpoint =
+              transformIdToDataEndpoint.get(data.getTransformId());
+          if (endpoint == null) {
+            throw new IllegalStateException(
+                String.format(
+                    "Unable to find inbound data receiver for instruction %s and transform %s.",
+                    data.getInstructionId(), data.getTransformId()));
+          } else if (endpoint.isDone) {
+            throw new IllegalStateException(
+                String.format(
+                    "Received data after inbound data receiver is done for instruction %s and transform %s.",
+                    data.getInstructionId(), data.getTransformId()));
+          }
+          InputStream inputStream = data.getData().newInput();
+          Coder<Object> coder = (Coder<Object>) endpoint.endpoint.getCoder();
+          FnDataReceiver<Object> receiver =
+              (FnDataReceiver<Object>) endpoint.endpoint.getReceiver();
+          while (inputStream.available() > 0) {
+            receiver.accept(coder.decode(inputStream));
+          }
+          if (data.getIsLast()) {
+            endpoint.isDone = true;
+            numEndpointsThatAreIncomplete -= 1;
+            if (numEndpointsThatAreIncomplete == 0) {
+              return;
+            }
+          }
+        }
+
+        for (BeamFnApi.Elements.Timers timers : elements.getTimersList()) {
+          Map<String, EndpointStatus<TimerEndpoint<?>>> timerFamilyIdToEndpoints =
+              transformIdToTimerFamilyIdToTimerEndpoint.get(timers.getTransformId());
+          if (timerFamilyIdToEndpoints == null) {
+            throw new IllegalStateException(
+                String.format(
+                    "Unable to find inbound timer receiver for instruction %s, transform %s, and timer family %s.",
+                    timers.getInstructionId(), timers.getTransformId(), timers.getTimerFamilyId()));
+          }
+          EndpointStatus<TimerEndpoint<?>> endpoint =
+              timerFamilyIdToEndpoints.get(timers.getTimerFamilyId());
+          if (endpoint == null) {
+            throw new IllegalStateException(
+                String.format(
+                    "Unable to find inbound timer receiver for instruction %s, transform %s, and timer family %s.",
+                    timers.getInstructionId(), timers.getTransformId(), timers.getTimerFamilyId()));
+          } else if (endpoint.isDone) {
+            throw new IllegalStateException(
+                String.format(
+                    "Received timer after inbound timer receiver is done for instruction %s, transform %s, and timer family %s.",
+                    timers.getInstructionId(), timers.getTransformId(), timers.getTimerFamilyId()));
+          }
+          InputStream inputStream = timers.getTimers().newInput();
+          Coder<Object> coder = (Coder<Object>) endpoint.endpoint.getCoder();
+          FnDataReceiver<Object> receiver =
+              (FnDataReceiver<Object>) endpoint.endpoint.getReceiver();
+          while (inputStream.available() > 0) {
+            receiver.accept(coder.decode(inputStream));
+          }
+          if (timers.getIsLast()) {
+            numEndpointsThatAreIncomplete -= 1;
+            if (numEndpointsThatAreIncomplete == 0) {
+              return;
+            }
+          }
+        }
+      }
+    } catch (Exception e) {
+      queue.cancel(e);
+      throw e;
+    } finally {
+      close();
+    }
+  }
+
+  /** Enables this receiver to be used again for another bundle. */
+  public void reset() {
+    numEndpointsThatAreIncomplete = totalNumEndpoints;
+    for (EndpointStatus<?> value : transformIdToDataEndpoint.values()) {
+      value.isDone = false;
+    }
+    for (Map<String, EndpointStatus<TimerEndpoint<?>>> value :
+        transformIdToTimerFamilyIdToTimerEndpoint.values()) {
+      for (EndpointStatus<?> status : value.values()) {
+        status.isDone = false;
+      }
+    }
+    queue.reset();
+  }
+}
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/DataEndpoint.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/DataEndpoint.java
new file mode 100644
index 0000000..e789cb1
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/DataEndpoint.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+
+@AutoValue
+public abstract class DataEndpoint<T> {
+  public static <T> DataEndpoint<T> create(
+      String transformId, Coder<T> coder, FnDataReceiver<T> receiver) {
+    return new AutoValue_DataEndpoint<>(transformId, coder, receiver);
+  }
+
+  public abstract String getTransformId();
+
+  public abstract Coder<T> getCoder();
+
+  public abstract FnDataReceiver<T> getReceiver();
+}
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/TimerEndpoint.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/TimerEndpoint.java
new file mode 100644
index 0000000..f32503a
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/TimerEndpoint.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+
+@AutoValue
+public abstract class TimerEndpoint<T> {
+  public static <T> TimerEndpoint<T> create(
+      String transformId, String timerFamilyId, Coder<T> coder, FnDataReceiver<T> receiver) {
+    return new AutoValue_TimerEndpoint<>(transformId, timerFamilyId, coder, receiver);
+  }
+
+  public abstract String getTransformId();
+
+  public abstract String getTimerFamilyId();
+
+  public abstract Coder<T> getCoder();
+
+  public abstract FnDataReceiver<T> getReceiver();
+}
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2Test.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2Test.java
new file mode 100644
index 0000000..c517995
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2Test.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.core.StringContains.containsString;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
+import org.apache.beam.sdk.fn.test.TestExecutors;
+import org.apache.beam.sdk.fn.test.TestExecutors.TestExecutorService;
+import org.apache.beam.sdk.fn.test.TestStreams;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.junit.Rule;
+import org.junit.Test;
+
+/** Tests for {@link BeamFnDataGrpcMultiplexer2}. */
+public class BeamFnDataGrpcMultiplexer2Test {
+
+  private static final Endpoints.ApiServiceDescriptor DESCRIPTOR =
+      Endpoints.ApiServiceDescriptor.newBuilder().setUrl("test").build();
+  private static final String DATA_INSTRUCTION_ID = "dataInstructionId";
+  private static final String TIMER_INSTRUCTION_ID = "timerInstructionId";
+  private static final BeamFnApi.Elements ELEMENTS =
+      BeamFnApi.Elements.newBuilder()
+          .addData(
+              BeamFnApi.Elements.Data.newBuilder()
+                  .setInstructionId(DATA_INSTRUCTION_ID)
+                  .setTransformId("dataTransformId")
+                  .setData(ByteString.copyFrom(new byte[1])))
+          .addTimers(
+              BeamFnApi.Elements.Timers.newBuilder()
+                  .setInstructionId(TIMER_INSTRUCTION_ID)
+                  .setTransformId("timerTransformId")
+                  .setTimerFamilyId("timerFamilyId")
+                  .setTimers(ByteString.copyFrom(new byte[2])))
+          .build();
+  private static final BeamFnApi.Elements TERMINAL_ELEMENTS =
+      BeamFnApi.Elements.newBuilder()
+          .addData(
+              BeamFnApi.Elements.Data.newBuilder()
+                  .setInstructionId(DATA_INSTRUCTION_ID)
+                  .setTransformId("dataTransformId")
+                  .setIsLast(true))
+          .addTimers(
+              BeamFnApi.Elements.Timers.newBuilder()
+                  .setInstructionId(TIMER_INSTRUCTION_ID)
+                  .setTransformId("timerTransformId")
+                  .setTimerFamilyId("timerFamilyId")
+                  .setIsLast(true))
+          .build();
+
+  @Rule
+  public final TestExecutorService executor = TestExecutors.from(Executors::newCachedThreadPool);
+
+  @Test
+  public void testOutboundObserver() {
+    Collection<BeamFnApi.Elements> values = new ArrayList<>();
+    BeamFnDataGrpcMultiplexer2 multiplexer =
+        new BeamFnDataGrpcMultiplexer2(
+            DESCRIPTOR,
+            OutboundObserverFactory.clientDirect(),
+            inboundObserver -> TestStreams.withOnNext(values::add).build());
+    multiplexer.getOutboundObserver().onNext(ELEMENTS);
+    assertThat(values, contains(ELEMENTS));
+  }
+
+  @Test
+  public void testInboundObserverBlocksTillConsumerConnects() throws Exception {
+    Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
+    Collection<BeamFnApi.Elements> dataInboundValues = new ArrayList<>();
+    Collection<BeamFnApi.Elements> timerInboundValues = new ArrayList<>();
+    BeamFnDataGrpcMultiplexer2 multiplexer =
+        new BeamFnDataGrpcMultiplexer2(
+            DESCRIPTOR,
+            OutboundObserverFactory.clientDirect(),
+            inboundObserver -> TestStreams.withOnNext(outboundValues::add).build());
+    Future<?> registerFuture =
+        executor.submit(
+            () -> {
+              multiplexer.registerConsumer(
+                  DATA_INSTRUCTION_ID,
+                  new CloseableFnDataReceiver<BeamFnApi.Elements>() {
+                    @Override
+                    public void flush() throws Exception {
+                      fail("Unexpected call");
+                    }
+
+                    @Override
+                    public void close() throws Exception {
+                      fail("Unexpected call");
+                    }
+
+                    @Override
+                    public void accept(BeamFnApi.Elements input) throws Exception {
+                      dataInboundValues.add(input);
+                    }
+                  });
+              multiplexer.registerConsumer(
+                  TIMER_INSTRUCTION_ID,
+                  new CloseableFnDataReceiver<BeamFnApi.Elements>() {
+                    @Override
+                    public void flush() throws Exception {
+                      fail("Unexpected call");
+                    }
+
+                    @Override
+                    public void close() throws Exception {
+                      fail("Unexpected call");
+                    }
+
+                    @Override
+                    public void accept(BeamFnApi.Elements input) throws Exception {
+                      timerInboundValues.add(input);
+                    }
+                  });
+            });
+
+    multiplexer.getInboundObserver().onNext(ELEMENTS);
+    assertTrue(multiplexer.hasConsumer(DATA_INSTRUCTION_ID));
+    assertTrue(multiplexer.hasConsumer(TIMER_INSTRUCTION_ID));
+
+    // Verify that on terminal elements we still wait to be unregistered.
+    multiplexer.getInboundObserver().onNext(TERMINAL_ELEMENTS);
+    assertTrue(multiplexer.hasConsumer(DATA_INSTRUCTION_ID));
+    assertTrue(multiplexer.hasConsumer(TIMER_INSTRUCTION_ID));
+
+    registerFuture.get();
+    multiplexer.unregisterConsumer(DATA_INSTRUCTION_ID);
+    multiplexer.unregisterConsumer(TIMER_INSTRUCTION_ID);
+    assertFalse(multiplexer.hasConsumer(DATA_INSTRUCTION_ID));
+    assertFalse(multiplexer.hasConsumer(TIMER_INSTRUCTION_ID));
+
+    // Assert that normal and terminal Elements are passed to the consumer
+    assertThat(
+        dataInboundValues,
+        contains(
+            ELEMENTS.toBuilder().clearTimers().build(),
+            TERMINAL_ELEMENTS.toBuilder().clearTimers().build()));
+    assertThat(
+        timerInboundValues,
+        contains(
+            ELEMENTS.toBuilder().clearData().build(),
+            TERMINAL_ELEMENTS.toBuilder().clearData().build()));
+  }
+
+  @Test
+  public void testElementsNeedsPartitioning() throws Exception {
+    Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
+    Collection<BeamFnApi.Elements> dataInboundValues = new ArrayList<>();
+    Collection<BeamFnApi.Elements> timerInboundValues = new ArrayList<>();
+    BeamFnDataGrpcMultiplexer2 multiplexer =
+        new BeamFnDataGrpcMultiplexer2(
+            DESCRIPTOR,
+            OutboundObserverFactory.clientDirect(),
+            inboundObserver -> TestStreams.withOnNext(outboundValues::add).build());
+    multiplexer.registerConsumer(
+        DATA_INSTRUCTION_ID,
+        new CloseableFnDataReceiver<BeamFnApi.Elements>() {
+          @Override
+          public void flush() throws Exception {
+            fail("Unexpected call");
+          }
+
+          @Override
+          public void close() throws Exception {
+            fail("Unexpected call");
+          }
+
+          @Override
+          public void accept(BeamFnApi.Elements input) throws Exception {
+            dataInboundValues.add(input);
+          }
+        });
+    multiplexer.registerConsumer(
+        TIMER_INSTRUCTION_ID,
+        new CloseableFnDataReceiver<BeamFnApi.Elements>() {
+          @Override
+          public void flush() throws Exception {
+            fail("Unexpected call");
+          }
+
+          @Override
+          public void close() throws Exception {
+            fail("Unexpected call");
+          }
+
+          @Override
+          public void accept(BeamFnApi.Elements input) throws Exception {
+            timerInboundValues.add(input);
+          }
+        });
+
+    multiplexer.getInboundObserver().onNext(ELEMENTS);
+    multiplexer.getInboundObserver().onNext(TERMINAL_ELEMENTS);
+
+    // Assert that elements are partitioned based upon the instruction id.
+    assertThat(
+        dataInboundValues,
+        contains(
+            ELEMENTS.toBuilder().clearTimers().build(),
+            TERMINAL_ELEMENTS.toBuilder().clearTimers().build()));
+    assertThat(
+        timerInboundValues,
+        contains(
+            ELEMENTS.toBuilder().clearData().build(),
+            TERMINAL_ELEMENTS.toBuilder().clearData().build()));
+  }
+
+  @Test
+  public void testElementsWithOnlySingleInstructionIdUsingHotPath() throws Exception {
+    Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
+    Collection<BeamFnApi.Elements> dataInboundValues = new ArrayList<>();
+    BeamFnDataGrpcMultiplexer2 multiplexer =
+        new BeamFnDataGrpcMultiplexer2(
+            DESCRIPTOR,
+            OutboundObserverFactory.clientDirect(),
+            inboundObserver -> TestStreams.withOnNext(outboundValues::add).build());
+    multiplexer.registerConsumer(
+        DATA_INSTRUCTION_ID,
+        new CloseableFnDataReceiver<BeamFnApi.Elements>() {
+          @Override
+          public void flush() throws Exception {
+            fail("Unexpected call");
+          }
+
+          @Override
+          public void close() throws Exception {
+            fail("Unexpected call");
+          }
+
+          @Override
+          public void accept(BeamFnApi.Elements input) throws Exception {
+            dataInboundValues.add(input);
+          }
+        });
+
+    BeamFnApi.Elements value = ELEMENTS.toBuilder().clearTimers().build();
+
+    multiplexer.getInboundObserver().onNext(value);
+
+    // Assert that we passed the same instance through.
+    assertSame(Iterables.getOnlyElement(dataInboundValues), value);
+  }
+
+  @Test
+  public void testFailedProcessingCausesAdditionalInboundDataToBeIgnored() throws Exception {
+    Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
+    Collection<BeamFnApi.Elements> dataInboundValues = new ArrayList<>();
+    BeamFnDataGrpcMultiplexer2 multiplexer =
+        new BeamFnDataGrpcMultiplexer2(
+            DESCRIPTOR,
+            OutboundObserverFactory.clientDirect(),
+            inboundObserver -> TestStreams.withOnNext(outboundValues::add).build());
+    multiplexer.registerConsumer(
+        DATA_INSTRUCTION_ID,
+        new CloseableFnDataReceiver<BeamFnApi.Elements>() {
+          @Override
+          public void flush() throws Exception {
+            fail("Unexpected call");
+          }
+
+          @Override
+          public void close() throws Exception {
+            fail("Unexpected call");
+          }
+
+          @Override
+          public void accept(BeamFnApi.Elements input) throws Exception {
+            if (dataInboundValues.size() == 1) {
+              throw new Exception("processing failed");
+            }
+            dataInboundValues.add(input);
+          }
+        });
+
+    BeamFnApi.Elements.Data.Builder data =
+        BeamFnApi.Elements.Data.newBuilder().setInstructionId(DATA_INSTRUCTION_ID);
+
+    multiplexer
+        .getInboundObserver()
+        .onNext(BeamFnApi.Elements.newBuilder().addData(data.setTransformId("A").build()).build());
+    multiplexer
+        .getInboundObserver()
+        .onNext(BeamFnApi.Elements.newBuilder().addData(data.setTransformId("B").build()).build());
+    multiplexer
+        .getInboundObserver()
+        .onNext(BeamFnApi.Elements.newBuilder().addData(data.setTransformId("C").build()).build());
+
+    // Assert that we ignored the other two elements
+    assertThat(
+        dataInboundValues,
+        contains(
+            BeamFnApi.Elements.newBuilder().addData(data.setTransformId("A").build()).build()));
+  }
+
+  @Test
+  public void testClose() throws Exception {
+    Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
+    Collection<Throwable> errorWasReturned = new ArrayList<>();
+    AtomicBoolean wasClosed = new AtomicBoolean();
+    final BeamFnDataGrpcMultiplexer2 multiplexer =
+        new BeamFnDataGrpcMultiplexer2(
+            DESCRIPTOR,
+            OutboundObserverFactory.clientDirect(),
+            inboundObserver ->
+                TestStreams.withOnNext(outboundValues::add)
+                    .withOnError(errorWasReturned::add)
+                    .build());
+    multiplexer.registerConsumer(
+        DATA_INSTRUCTION_ID,
+        new CloseableFnDataReceiver<BeamFnApi.Elements>() {
+          @Override
+          public void flush() throws Exception {
+            fail("Unexpected call");
+          }
+
+          @Override
+          public void close() throws Exception {
+            wasClosed.set(true);
+          }
+
+          @Override
+          public void accept(BeamFnApi.Elements input) throws Exception {
+            fail("Unexpected call");
+          }
+        });
+
+    multiplexer.close();
+
+    assertTrue(wasClosed.get());
+    assertThat(
+        Iterables.getOnlyElement(errorWasReturned).getMessage(),
+        containsString("Multiplexer hanging up"));
+  }
+}
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2Test.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2Test.java
new file mode 100644
index 0000000..8b2b679
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2Test.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.fn.test.TestExecutors;
+import org.apache.beam.sdk.fn.test.TestExecutors.TestExecutorService;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link BeamFnDataInboundObserver2}. */
+@RunWith(JUnit4.class)
+public class BeamFnDataInboundObserver2Test {
+  private static final Coder<WindowedValue<String>> CODER =
+      WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
+  private static final String TRANSFORM_ID = "transformId";
+  private static final String TIMER_FAMILY_ID = "timerFamilyId";
+
+  @Rule
+  public final TestExecutorService executor = TestExecutors.from(Executors::newCachedThreadPool);
+
+  @Test
+  public void testConsumptionOfValuesHappensOnAwaitCompletionCallersThread() throws Exception {
+    Thread thread = Thread.currentThread();
+    Collection<WindowedValue<String>> values = new ArrayList<>();
+    Collection<WindowedValue<String>> timers = new ArrayList<>();
+    BeamFnDataInboundObserver2 observer =
+        BeamFnDataInboundObserver2.forConsumers(
+            Arrays.asList(
+                DataEndpoint.create(
+                    TRANSFORM_ID,
+                    CODER,
+                    (value) -> {
+                      assertSame(thread, Thread.currentThread());
+                      values.add(value);
+                    })),
+            Arrays.asList(
+                TimerEndpoint.create(
+                    TRANSFORM_ID,
+                    TIMER_FAMILY_ID,
+                    CODER,
+                    (value) -> {
+                      assertSame(thread, Thread.currentThread());
+                      timers.add(value);
+                    })));
+
+    Future<?> future =
+        executor.submit(
+            () -> {
+              // Test decoding multiple messages
+              observer.accept(dataWith("ABC", "DEF", "GHI"));
+              observer.accept(lastData());
+              observer.accept(timerWith("UVW"));
+              observer.accept(timerWith("XYZ"));
+              observer.accept(lastTimer());
+              return null;
+            });
+
+    observer.awaitCompletion();
+    assertThat(
+        values,
+        contains(
+            valueInGlobalWindow("ABC"), valueInGlobalWindow("DEF"), valueInGlobalWindow("GHI")));
+    assertThat(timers, contains(valueInGlobalWindow("UVW"), valueInGlobalWindow("XYZ")));
+    future.get();
+  }
+
+  @Test
+  public void testAwaitCompletionFailureVisibleToAwaitCompletionCallerAndProducer()
+      throws Exception {
+    BeamFnDataInboundObserver2 observer =
+        BeamFnDataInboundObserver2.forConsumers(
+            Arrays.asList(
+                DataEndpoint.create(
+                    TRANSFORM_ID,
+                    CODER,
+                    (value) -> {
+                      throw new Exception("test consumer failed");
+                    })),
+            Collections.emptyList());
+
+    Future<?> future =
+        executor.submit(
+            () -> {
+              observer.accept(dataWith("ABC"));
+              assertThrows(
+                  "test consumer failed",
+                  Exception.class,
+                  () -> {
+                    while (true) {
+                      // keep trying to send messages since the queue buffers messages and the
+                      // consumer
+                      // may have not yet noticed the bad state.
+                      observer.accept(dataWith("ABC"));
+                    }
+                  });
+              return null;
+            });
+
+    assertThrows("test consumer failed", Exception.class, () -> observer.awaitCompletion());
+    future.get();
+  }
+
+  @Test
+  public void testCloseVisibleToAwaitCompletionCallerAndProducer() throws Exception {
+    BeamFnDataInboundObserver2 observer =
+        BeamFnDataInboundObserver2.forConsumers(
+            Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER, (value) -> {})),
+            Collections.emptyList());
+
+    Future<?> future =
+        executor.submit(
+            () -> {
+              observer.accept(dataWith("ABC"));
+              assertThrows(
+                  "Inbound observer closed",
+                  IllegalStateException.class,
+                  () -> {
+                    while (true) {
+                      // keep trying to send messages since the queue buffers messages and the
+                      // consumer
+                      // may have not yet noticed the bad state.
+                      observer.accept(dataWith("ABC"));
+                    }
+                  });
+              return null;
+            });
+    Future<?> future2 =
+        executor.submit(
+            () -> {
+              observer.close();
+              return null;
+            });
+
+    assertThrows(
+        "Inbound observer closed", IllegalStateException.class, () -> observer.awaitCompletion());
+    future.get();
+    future2.get();
+  }
+
+  @Test
+  public void testBadProducerDataFailureVisibleToAwaitCompletionCallerAndProducer()
+      throws Exception {
+    BeamFnDataInboundObserver2 observer =
+        BeamFnDataInboundObserver2.forConsumers(
+            Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER, (value) -> {})),
+            Collections.emptyList());
+    Future<?> future =
+        executor.submit(
+            () -> {
+              observer.accept(timerWith("DEF"));
+              assertThrows(
+                  "Unable to find inbound timer receiver for instruction",
+                  IllegalStateException.class,
+                  () -> {
+                    // keep trying to send messages since the queue buffers messages and the
+                    // consumer
+                    // may have not yet noticed the bad state.
+                    while (true) {
+                      observer.accept(dataWith("ABC"));
+                    }
+                  });
+              return null;
+            });
+
+    assertThrows(
+        "Unable to find inbound timer receiver for instruction",
+        IllegalStateException.class,
+        () -> observer.awaitCompletion());
+    future.get();
+  }
+
+  private BeamFnApi.Elements dataWith(String... values) throws Exception {
+    ByteString.Output output = ByteString.newOutput();
+    for (String value : values) {
+      CODER.encode(valueInGlobalWindow(value), output);
+    }
+    return BeamFnApi.Elements.newBuilder()
+        .addData(
+            BeamFnApi.Elements.Data.newBuilder()
+                .setTransformId(TRANSFORM_ID)
+                .setData(output.toByteString()))
+        .build();
+  }
+
+  private BeamFnApi.Elements lastData() throws Exception {
+    return BeamFnApi.Elements.newBuilder()
+        .addData(BeamFnApi.Elements.Data.newBuilder().setTransformId(TRANSFORM_ID).setIsLast(true))
+        .build();
+  }
+
+  private BeamFnApi.Elements timerWith(String... values) throws Exception {
+    ByteString.Output output = ByteString.newOutput();
+    for (String value : values) {
+      CODER.encode(valueInGlobalWindow(value), output);
+    }
+    return BeamFnApi.Elements.newBuilder()
+        .addTimers(
+            BeamFnApi.Elements.Timers.newBuilder()
+                .setTransformId(TRANSFORM_ID)
+                .setTimerFamilyId(TIMER_FAMILY_ID)
+                .setTimers(output.toByteString()))
+        .build();
+  }
+
+  private BeamFnApi.Elements lastTimer() throws Exception {
+    return BeamFnApi.Elements.newBuilder()
+        .addTimers(
+            BeamFnApi.Elements.Timers.newBuilder()
+                .setTransformId(TRANSFORM_ID)
+                .setTimerFamilyId(TIMER_FAMILY_ID)
+                .setIsLast(true))
+        .build();
+  }
+}