You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/01/30 20:48:55 UTC
[4/6] beam git commit: A proposal for a portability framework to
execute user definable functions.
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
new file mode 100644
index 0000000..14e26f0
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.control;
+
+import com.google.protobuf.Message;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.fn.v1.BeamFnApi.RegisterResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A handler and datastore for types that be can be registered via the Fn API.
+ *
+ * <p>Allows for {@link org.apache.beam.fn.v1.BeamFnApi.RegisterRequest}s to occur in parallel with
+ * subsequent requests that may lookup registered values by blocking lookups until registration
+ * occurs.
+ */
+public class RegisterHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RegisterHandler.class);
+ private final ConcurrentMap<Long, CompletableFuture<Message>> idToObject;
+
+ public RegisterHandler() {
+ idToObject = new ConcurrentHashMap<>();
+ }
+
+ public <T extends Message> T getById(long id) {
+ try {
+ @SuppressWarnings("unchecked")
+ CompletableFuture<T> returnValue = (CompletableFuture<T>) computeIfAbsent(id);
+ /*
+ * TODO: Even though the register request instruction occurs before the process bundle
+ * instruction in the control stream, the instructions are being processed in parallel
+ * in the Java harness causing a data race which is why we use a future. This will block
+ * forever in the case of a runner having a bug sending the wrong ids. Instead of blocking
+ * forever, we could do a timed wait or come up with another way of ordering the instruction
+ * processing to remove the data race.
+ */
+ return returnValue.get();
+ } catch (ExecutionException e) {
+ throw new RuntimeException(String.format("Failed to load %s", id), e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(String.format("Failed to load %s", id), e);
+ }
+ }
+
+ public BeamFnApi.InstructionResponse.Builder register(BeamFnApi.InstructionRequest request) {
+ BeamFnApi.InstructionResponse.Builder response = BeamFnApi.InstructionResponse.newBuilder()
+ .setRegister(RegisterResponse.getDefaultInstance());
+
+ BeamFnApi.RegisterRequest registerRequest = request.getRegister();
+ for (BeamFnApi.ProcessBundleDescriptor processBundleDescriptor
+ : registerRequest.getProcessBundleDescriptorList()) {
+ LOGGER.debug("Registering {} with type {}",
+ processBundleDescriptor.getId(),
+ processBundleDescriptor.getClass());
+ computeIfAbsent(processBundleDescriptor.getId()).complete(processBundleDescriptor);
+ for (BeamFnApi.Coder coder : processBundleDescriptor.getCodersList()) {
+ LOGGER.debug("Registering {} with type {}",
+ coder.getFunctionSpec().getId(),
+ coder.getClass());
+ computeIfAbsent(coder.getFunctionSpec().getId()).complete(coder);
+ }
+ }
+
+ return response;
+ }
+
+ private CompletableFuture<Message> computeIfAbsent(long id) {
+ return idToObject.computeIfAbsent(id, (Long ignored) -> new CompletableFuture<>());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/package-info.java
new file mode 100644
index 0000000..6535555
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Control service client and individual request handlers.
+ */
+package org.apache.beam.fn.harness.control;
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
new file mode 100644
index 0000000..3bf44ab
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.data;
+
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A buffering outbound {@link Consumer} for the Beam Fn Data API.
+ *
+ * <p>Encodes individually consumed elements with the provided {@link Coder} producing
+ * a single {@link org.apache.beam.fn.v1.BeamFnApi.Elements} message when the buffer threshold
+ * is surpassed.
+ *
+ * <p>The default buffer threshold can be overridden by specifying the experiment
+ * {@code beam_fn_api_data_buffer_limit=<bytes>}
+ *
+ * <p>TODO: Handle outputting large elements (> 2GiBs). Note that this also applies to the
+ * input side as well.
+ *
+ * <p>TODO: Handle outputting elements that are zero bytes by outputting a single byte as
+ * a marker, detect on the input side that no bytes were read and force reading a single byte.
+ */
+public class BeamFnDataBufferingOutboundObserver<T>
+ implements CloseableThrowingConsumer<WindowedValue<T>> {
+ private static final String BEAM_FN_API_DATA_BUFFER_LIMIT = "beam_fn_api_data_buffer_limit=";
+ private static final int DEFAULT_BUFFER_LIMIT_BYTES = 1_000_000;
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(BeamFnDataBufferingOutboundObserver.class);
+
+ private long byteCounter;
+ private long counter;
+ private final int bufferLimit;
+ private final Coder<WindowedValue<T>> coder;
+ private final KV<Long, BeamFnApi.Target> outputLocation;
+ private final StreamObserver<BeamFnApi.Elements> outboundObserver;
+ private final ByteString.Output bufferedElements;
+
+ public BeamFnDataBufferingOutboundObserver(
+ PipelineOptions options,
+ KV<Long, BeamFnApi.Target> outputLocation,
+ Coder<WindowedValue<T>> coder,
+ StreamObserver<BeamFnApi.Elements> outboundObserver) {
+ this.bufferLimit = getBufferLimit(options);
+ this.outputLocation = outputLocation;
+ this.coder = coder;
+ this.outboundObserver = outboundObserver;
+ this.bufferedElements = ByteString.newOutput();
+ }
+
+ /**
+ * Returns the {@code beam_fn_api_data_buffer_limit=<int>} experiment value if set. Otherwise
+ * returns the default buffer limit.
+ */
+ private static int getBufferLimit(PipelineOptions options) {
+ List<String> experiments = options.as(DataflowPipelineDebugOptions.class).getExperiments();
+ for (String experiment : experiments == null ? Collections.<String>emptyList() : experiments) {
+ if (experiment.startsWith(BEAM_FN_API_DATA_BUFFER_LIMIT)) {
+ return Integer.parseInt(experiment.substring(BEAM_FN_API_DATA_BUFFER_LIMIT.length()));
+ }
+ }
+ return DEFAULT_BUFFER_LIMIT_BYTES;
+ }
+
+ @Override
+ public void close() throws Exception {
+ BeamFnApi.Elements.Builder elements = convertBufferForTransmission();
+ // This will add an empty data block representing the end of stream.
+ elements.addDataBuilder()
+ .setInstructionReference(outputLocation.getKey())
+ .setTarget(outputLocation.getValue());
+
+ LOGGER.debug("Closing stream for instruction {} and "
+ + "target {} having transmitted {} values {} bytes",
+ outputLocation.getKey(),
+ outputLocation.getValue(),
+ counter,
+ byteCounter);
+ outboundObserver.onNext(elements.build());
+ }
+
+ @Override
+ public void accept(WindowedValue<T> t) throws IOException {
+ coder.encode(t, bufferedElements, Context.NESTED);
+ counter += 1;
+ if (bufferedElements.size() >= bufferLimit) {
+ outboundObserver.onNext(convertBufferForTransmission().build());
+ }
+ }
+
+ private BeamFnApi.Elements.Builder convertBufferForTransmission() {
+ BeamFnApi.Elements.Builder elements = BeamFnApi.Elements.newBuilder();
+ if (bufferedElements.size() == 0) {
+ return elements;
+ }
+
+ elements.addDataBuilder()
+ .setInstructionReference(outputLocation.getKey())
+ .setTarget(outputLocation.getValue())
+ .setData(bufferedElements.toByteString());
+
+ byteCounter += bufferedElements.size();
+ bufferedElements.reset();
+ return elements;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java
new file mode 100644
index 0000000..27b1acb
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.data;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * The {@link BeamFnDataClient} is able to forward inbound elements to a consumer and is also a
+ * consumer of outbound elements. Callers can register themselves as consumers for inbound elements
+ * or can get a handle for a consumer for outbound elements.
+ */
+public interface BeamFnDataClient {
+ /**
+ * Registers the following inbound consumer for the provided instruction id and target.
+ *
+ * <p>The provided coder is used to decode inbound elements. The decoded elements
+ * are passed to the provided consumer. Any failure during decoding or processing of the element
+ * will complete the returned future exceptionally. On successful termination of the stream,
+ * the returned future is completed successfully.
+ *
+ * <p>The consumer is not required to be thread safe.
+ */
+ <T> CompletableFuture<Void> forInboundConsumer(
+ BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
+ KV<Long, BeamFnApi.Target> inputLocation,
+ Coder<WindowedValue<T>> coder,
+ ThrowingConsumer<WindowedValue<T>> consumer);
+
+ /**
+ * Creates a closeable consumer using the provided instruction id and target.
+ *
+ * <p>The provided coder is used to encode elements on the outbound stream.
+ *
+ * <p>Closing the returned consumer signals the end of the stream.
+ *
+ * <p>The returned closeable consumer is not thread safe.
+ */
+ <T> CloseableThrowingConsumer<WindowedValue<T>> forOutboundConsumer(
+ BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
+ KV<Long, BeamFnApi.Target> outputLocation,
+ Coder<WindowedValue<T>> coder);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java
new file mode 100644
index 0000000..9bbdc78
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.data;
+
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.fn.v1.BeamFnDataGrpc;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link BeamFnDataClient} that uses gRPC for sending and receiving data.
+ *
+ * <p>TODO: Handle closing clients that are currently not a consumer nor are being consumed.
+ */
+public class BeamFnDataGrpcClient implements BeamFnDataClient {
+ private static final Logger LOGGER = LoggerFactory.getLogger(BeamFnDataGrpcClient.class);
+
+ private final ConcurrentMap<BeamFnApi.ApiServiceDescriptor, BeamFnDataGrpcMultiplexer> cache;
+ private final Function<BeamFnApi.ApiServiceDescriptor, ManagedChannel> channelFactory;
+ private final BiFunction<Function<StreamObserver<BeamFnApi.Elements>,
+ StreamObserver<BeamFnApi.Elements>>,
+ StreamObserver<BeamFnApi.Elements>,
+ StreamObserver<BeamFnApi.Elements>> streamObserverFactory;
+ private final PipelineOptions options;
+
+ public BeamFnDataGrpcClient(
+ PipelineOptions options,
+ Function<BeamFnApi.ApiServiceDescriptor, ManagedChannel> channelFactory,
+ BiFunction<Function<StreamObserver<BeamFnApi.Elements>, StreamObserver<BeamFnApi.Elements>>,
+ StreamObserver<BeamFnApi.Elements>,
+ StreamObserver<BeamFnApi.Elements>> streamObserverFactory) {
+ this.options = options;
+ this.channelFactory = channelFactory;
+ this.streamObserverFactory = streamObserverFactory;
+ this.cache = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Registers the following inbound stream consumer for the provided instruction id and target.
+ *
+ * <p>The provided coder is used to decode elements on the inbound stream. The decoded elements
+ * are passed to the provided consumer. Any failure during decoding or processing of the element
+ * will complete the returned future exceptionally. On successful termination of the stream
+ * (signaled by an empty data block), the returned future is completed successfully.
+ */
+ @Override
+ public <T> CompletableFuture<Void> forInboundConsumer(
+ BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
+ KV<Long, BeamFnApi.Target> inputLocation,
+ Coder<WindowedValue<T>> coder,
+ ThrowingConsumer<WindowedValue<T>> consumer) {
+ LOGGER.debug("Registering consumer instruction {} for target {}",
+ inputLocation.getKey(),
+ inputLocation.getValue());
+
+ CompletableFuture<Void> readFuture = new CompletableFuture<>();
+ BeamFnDataGrpcMultiplexer client = getClientFor(apiServiceDescriptor);
+ client.futureForKey(inputLocation).complete(
+ new BeamFnDataInboundObserver<>(coder, consumer, readFuture));
+ return readFuture;
+ }
+
+ /**
+ * Creates a closeable consumer using the provided instruction id and target.
+ *
+ * <p>The provided coder is used to encode elements on the outbound stream.
+ *
+ * <p>On closing the returned consumer, an empty data block is sent as a signal of the
+ * logical data stream finishing.
+ *
+ * <p>The returned closeable consumer is not thread safe.
+ */
+ @Override
+ public <T> CloseableThrowingConsumer<WindowedValue<T>> forOutboundConsumer(
+ BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
+ KV<Long, BeamFnApi.Target> outputLocation,
+ Coder<WindowedValue<T>> coder) {
+ BeamFnDataGrpcMultiplexer client = getClientFor(apiServiceDescriptor);
+
+ return new BeamFnDataBufferingOutboundObserver<>(
+ options, outputLocation, coder, client.getOutboundObserver());
+ }
+
+ private BeamFnDataGrpcMultiplexer getClientFor(
+ BeamFnApi.ApiServiceDescriptor apiServiceDescriptor) {
+ return cache.computeIfAbsent(apiServiceDescriptor,
+ (BeamFnApi.ApiServiceDescriptor descriptor) -> new BeamFnDataGrpcMultiplexer(
+ descriptor,
+ (StreamObserver<BeamFnApi.Elements> inboundObserver) -> streamObserverFactory.apply(
+ BeamFnDataGrpc.newStub(channelFactory.apply(apiServiceDescriptor))::data,
+ inboundObserver)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java
new file mode 100644
index 0000000..ea059df
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.data;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A gRPC multiplexer for a specific {@link org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor}.
+ *
+ * <p>Multiplexes data for inbound consumers based upon their individual
+ * {@link org.apache.beam.fn.v1.BeamFnApi.Target}s.
+ *
+ * <p>Multiplexing inbound and outbound streams is as thread safe as the consumers of those
+ * streams. For inbound streams, this is as thread safe as the inbound observers. For outbound
+ * streams, this is as thread safe as the underlying stream observer.
+ *
+ * <p>TODO: Add support for multiplexing over multiple outbound observers by stickying
+ * the output location with a specific outbound observer.
+ */
+public class BeamFnDataGrpcMultiplexer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(BeamFnDataGrpcMultiplexer.class);
+ private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
+ private final StreamObserver<BeamFnApi.Elements> inboundObserver;
+ private final StreamObserver<BeamFnApi.Elements> outboundObserver;
+ @VisibleForTesting
+ final ConcurrentMap<KV<Long, BeamFnApi.Target>,
+ CompletableFuture<Consumer<BeamFnApi.Elements.Data>>> consumers;
+
+ public BeamFnDataGrpcMultiplexer(
+ BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
+ Function<StreamObserver<BeamFnApi.Elements>,
+ StreamObserver<BeamFnApi.Elements>> outboundObserverFactory) {
+ this.apiServiceDescriptor = apiServiceDescriptor;
+ this.consumers = new ConcurrentHashMap<>();
+ this.inboundObserver = new InboundObserver();
+ this.outboundObserver = outboundObserverFactory.apply(inboundObserver);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("apiServiceDescriptor", apiServiceDescriptor)
+ .add("consumers", consumers)
+ .toString();
+ }
+
+ public StreamObserver<BeamFnApi.Elements> getInboundObserver() {
+ return inboundObserver;
+ }
+
+ public StreamObserver<BeamFnApi.Elements> getOutboundObserver() {
+ return outboundObserver;
+ }
+
+ public CompletableFuture<Consumer<BeamFnApi.Elements.Data>> futureForKey(
+ KV<Long, BeamFnApi.Target> key) {
+ return consumers.computeIfAbsent(
+ key,
+ (KV<Long, BeamFnApi.Target> providedKey) -> new CompletableFuture<>());
+ }
+
+ /**
+ * A multiplexing {@link StreamObserver} that selects the inbound {@link Consumer} to pass
+ * the elements to.
+ *
+ * <p>The inbound observer blocks until the {@link Consumer} is bound allowing for the
+ * sending harness to initiate transmitting data without needing for the receiving harness to
+ * signal that it is ready to consume that data.
+ */
+ private final class InboundObserver implements StreamObserver<BeamFnApi.Elements> {
+ @Override
+ public void onNext(BeamFnApi.Elements value) {
+ for (BeamFnApi.Elements.Data data : value.getDataList()) {
+ try {
+ KV<Long, BeamFnApi.Target> key =
+ KV.of(data.getInstructionReference(), data.getTarget());
+ futureForKey(key).get().accept(data);
+ if (data.getData().isEmpty()) {
+ consumers.remove(key);
+ }
+ /*
+ * TODO: On failure we should fail any bundles that were impacted eagerly
+ * instead of relying on the Runner harness to do all the failure handling.
+ */
+ } catch (ExecutionException | InterruptedException e) {
+ LOGGER.error(
+ "Client interrupted during handling of data for instruction {} and target {}",
+ data.getInstructionReference(),
+ data.getTarget(),
+ e);
+ outboundObserver.onError(e);
+ } catch (RuntimeException e) {
+ LOGGER.error(
+ "Client failed to handle data for instruction {} and target {}",
+ data.getInstructionReference(),
+ data.getTarget(),
+ e);
+ outboundObserver.onError(e);
+ }
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ LOGGER.error("Failed to handle for {}", apiServiceDescriptor, t);
+ }
+
+ @Override
+ public void onCompleted() {
+ LOGGER.warn("Hanged up for {}.", apiServiceDescriptor);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java
new file mode 100644
index 0000000..f8b5ab8
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.data;
+
+import java.io.InputStream;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Decodes individually consumed {@link org.apache.beam.fn.v1.BeamFnApi.Elements.Data} with the
+ * provided {@link Coder} passing the individual decoded elements to the provided consumer.
+ */
+public class BeamFnDataInboundObserver<T> implements Consumer<BeamFnApi.Elements.Data> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(BeamFnDataInboundObserver.class);
+ private final ThrowingConsumer<WindowedValue<T>> consumer;
+ private final Coder<WindowedValue<T>> coder;
+ private final CompletableFuture<Void> readFuture;
+ private long byteCounter;
+ private long counter;
+
+ public BeamFnDataInboundObserver(
+ Coder<WindowedValue<T>> coder,
+ ThrowingConsumer<WindowedValue<T>> consumer,
+ CompletableFuture<Void> readFuture) {
+ this.coder = coder;
+ this.consumer = consumer;
+ this.readFuture = readFuture;
+ }
+
+ @Override
+ public void accept(BeamFnApi.Elements.Data t) {
+ if (readFuture.isDone()) {
+ // Drop any incoming data if the stream processing has finished.
+ return;
+ }
+ try {
+ if (t.getData().isEmpty()) {
+ LOGGER.debug("Closing stream for instruction {} and "
+ + "target {} having consumed {} values {} bytes",
+ t.getInstructionReference(),
+ t.getTarget(),
+ counter,
+ byteCounter);
+ readFuture.complete(null);
+ return;
+ }
+
+ byteCounter += t.getData().size();
+ InputStream inputStream = t.getData().newInput();
+ while (inputStream.available() > 0) {
+ counter += 1;
+ WindowedValue<T> value = coder.decode(inputStream, Context.NESTED);
+ consumer.accept(value);
+ }
+ } catch (Exception e) {
+ readFuture.completeExceptionally(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/package-info.java
new file mode 100644
index 0000000..edaaa65
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Data service client and logical stream multiplexing.
+ */
+package org.apache.beam.fn.harness.data;
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java
new file mode 100644
index 0000000..b3b7f48
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.fake;
+
+import org.apache.beam.runners.core.AggregatorFactory;
+import org.apache.beam.runners.core.ExecutionContext.StepContext;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+
+/**
+ * A fake implementation of an {@link AggregatorFactory} that is to be filled in at a later time.
+ * The factory returns {@link Aggregator}s that do nothing when a value is added.
+ */
+public class FakeAggregatorFactory implements AggregatorFactory {
+ @Override
+ public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
+ Class<?> fnClass,
+ StepContext stepContext,
+ String aggregatorName,
+ CombineFn<InputT, AccumT, OutputT> combine) {
+ return new Aggregator<InputT, OutputT>() {
+ @Override
+ public void addValue(InputT value) {}
+
+ @Override
+ public String getName() {
+ return aggregatorName;
+ }
+
+ @Override
+ public CombineFn<InputT, ?, OutputT> getCombineFn() {
+ return combine;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
new file mode 100644
index 0000000..84da059
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.fake;
+
+import java.io.IOException;
+import org.apache.beam.runners.core.ExecutionContext.StepContext;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * A fake {@link StepContext} factory that performs no-ops.
+ */
+public class FakeStepContext implements StepContext {
+ @Override
+ public String getStepName() {
+ return "TODO";
+ }
+
+ @Override
+ public String getTransformName() {
+ return "TODO";
+ }
+
+ @Override
+ public void noteOutput(WindowedValue<?> output) {
+ }
+
+ @Override
+ public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {
+ }
+
+ @Override
+ public <T, W extends BoundedWindow> void writePCollectionViewData(
+ TupleTag<?> tag,
+ Iterable<WindowedValue<T>> data,
+ Coder<Iterable<WindowedValue<T>>> dataCoder,
+ W window,
+ Coder<W> windowCoder) throws IOException {
+ }
+
+ @Override
+ public StateInternals<?> stateInternals() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TimerInternals timerInternals() {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/package-info.java
new file mode 100644
index 0000000..cd6eb02
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Fake implementations of bindings used with runners-core.
+ */
+package org.apache.beam.fn.harness.fake;
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/CloseableThrowingConsumer.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/CloseableThrowingConsumer.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/CloseableThrowingConsumer.java
new file mode 100644
index 0000000..59ab149
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/CloseableThrowingConsumer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.fn;
+
+/** A {@link ThrowingConsumer} that can be closed. */
+public interface CloseableThrowingConsumer<T> extends AutoCloseable, ThrowingConsumer<T> {
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingBiFunction.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingBiFunction.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingBiFunction.java
new file mode 100644
index 0000000..9d505da
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingBiFunction.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.fn;
+
+import java.util.function.BiFunction;
+
+/**
+ * A {@link BiFunction} which can throw {@link Exception}s.
+ *
+ * <p>Used to expand the allowed set of method references to be used by Java 8
+ * functional interfaces.
+ */
+@FunctionalInterface
+public interface ThrowingBiFunction<T1, T2, T3> {
+ T3 apply(T1 t1, T2 t2) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingConsumer.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingConsumer.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingConsumer.java
new file mode 100644
index 0000000..b34e857
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingConsumer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.fn;
+
+import java.util.function.Consumer;
+
+/**
+ * A {@link Consumer} which can throw {@link Exception}s.
+ *
+ * <p>Used to expand the allowed set of method references to be used by Java 8
+ * functional interfaces.
+ */
+@FunctionalInterface
+public interface ThrowingConsumer<T> {
+ void accept(T t) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingFunction.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingFunction.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingFunction.java
new file mode 100644
index 0000000..446ff60
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingFunction.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.fn;
+
+import java.util.function.Function;
+
+/**
+ * A {@link Function} which can throw {@link Exception}s.
+ *
+ * <p>Used to expand the allowed set of method references to be used by Java 8
+ * functional interfaces.
+ */
+@FunctionalInterface
+public interface ThrowingFunction<T1, T2> {
+ T2 apply(T1 value) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingRunnable.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingRunnable.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingRunnable.java
new file mode 100644
index 0000000..c7fc29e
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingRunnable.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.fn;
+
+/**
+ * A {@link Runnable} which can throw {@link Exception}s.
+ *
+ * <p>Used to expand the allowed set of method references to be used by Java 8
+ * functional interfaces.
+ */
+@FunctionalInterface
+public interface ThrowingRunnable {
+ void run() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/package-info.java
new file mode 100644
index 0000000..bbf3396
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Java 8 functional interface extensions.
+ */
+package org.apache.beam.fn.harness.fn;
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
new file mode 100644
index 0000000..d74d9fa
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.logging;
+
+import static com.google.common.base.Throwables.getStackTraceAsString;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.Timestamp;
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.logging.Formatter;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.LogManager;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.logging.SimpleFormatter;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.fn.v1.BeamFnLoggingGrpc;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Configures {@link java.util.logging} to send all {@link LogRecord}s via the Beam Fn Logging API.
+ */
+public class BeamFnLoggingClient implements AutoCloseable {
+ private static final String ROOT_LOGGER_NAME = "";
+ private static final ImmutableMap<Level, BeamFnApi.LogEntry.Severity> LOG_LEVEL_MAP =
+ ImmutableMap.<Level, BeamFnApi.LogEntry.Severity>builder()
+ .put(Level.SEVERE, BeamFnApi.LogEntry.Severity.ERROR)
+ .put(Level.WARNING, BeamFnApi.LogEntry.Severity.WARN)
+ .put(Level.INFO, BeamFnApi.LogEntry.Severity.INFO)
+ .put(Level.FINE, BeamFnApi.LogEntry.Severity.DEBUG)
+ .put(Level.FINEST, BeamFnApi.LogEntry.Severity.TRACE)
+ .build();
+
+ private static final ImmutableMap<DataflowWorkerLoggingOptions.Level, Level> LEVEL_CONFIGURATION =
+ ImmutableMap.<DataflowWorkerLoggingOptions.Level, Level>builder()
+ .put(DataflowWorkerLoggingOptions.Level.OFF, Level.OFF)
+ .put(DataflowWorkerLoggingOptions.Level.ERROR, Level.SEVERE)
+ .put(DataflowWorkerLoggingOptions.Level.WARN, Level.WARNING)
+ .put(DataflowWorkerLoggingOptions.Level.INFO, Level.INFO)
+ .put(DataflowWorkerLoggingOptions.Level.DEBUG, Level.FINE)
+ .put(DataflowWorkerLoggingOptions.Level.TRACE, Level.FINEST)
+ .build();
+
+ private static final Formatter FORMATTER = new SimpleFormatter();
+
+ /* Used to signal to a thread processing a queue to finish its work gracefully. */
+ private static final BeamFnApi.LogEntry POISON_PILL =
+ BeamFnApi.LogEntry.newBuilder().setInstructionReference(Long.MIN_VALUE).build();
+
+ /**
+ * The number of log messages that will be buffered. Assuming log messages are at most 1 KiB,
+ * this represents a buffer of about 10 MiBs.
+ */
+ private static final int MAX_BUFFERED_LOG_ENTRY_COUNT = 10_000;
+
+ /* We need to store a reference to the configured loggers so that they are not
+ * garbage collected. java.util.logging only has weak references to the loggers
+ * so if they are garbage collected, our hierarchical configuration will be lost. */
+ private final Collection<Logger> configuredLoggers;
+ private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
+ private final ManagedChannel channel;
+ private final StreamObserver<BeamFnApi.LogEntry.List> outboundObserver;
+ private final LogControlObserver inboundObserver;
+ private final LogRecordHandler logRecordHandler;
+ private final CompletableFuture<Object> inboundObserverCompletion;
+
+ public BeamFnLoggingClient(
+ PipelineOptions options,
+ BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
+ Function<BeamFnApi.ApiServiceDescriptor, ManagedChannel> channelFactory,
+ BiFunction<Function<StreamObserver<BeamFnApi.LogControl>,
+ StreamObserver<BeamFnApi.LogEntry.List>>,
+ StreamObserver<BeamFnApi.LogControl>,
+ StreamObserver<BeamFnApi.LogEntry.List>> streamObserverFactory) {
+ this.apiServiceDescriptor = apiServiceDescriptor;
+ this.inboundObserverCompletion = new CompletableFuture<>();
+ this.configuredLoggers = new ArrayList<>();
+ this.channel = channelFactory.apply(apiServiceDescriptor);
+
+ // Reset the global log manager, get the root logger and remove the default log handlers.
+ LogManager logManager = LogManager.getLogManager();
+ logManager.reset();
+ Logger rootLogger = logManager.getLogger(ROOT_LOGGER_NAME);
+ for (Handler handler : rootLogger.getHandlers()) {
+ rootLogger.removeHandler(handler);
+ }
+
+ // Use the passed in logging options to configure the various logger levels.
+ DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
+ if (loggingOptions.getDefaultWorkerLogLevel() != null) {
+ rootLogger.setLevel(LEVEL_CONFIGURATION.get(loggingOptions.getDefaultWorkerLogLevel()));
+ }
+
+ if (loggingOptions.getWorkerLogLevelOverrides() != null) {
+ for (Map.Entry<String, DataflowWorkerLoggingOptions.Level> loggerOverride :
+ loggingOptions.getWorkerLogLevelOverrides().entrySet()) {
+ Logger logger = Logger.getLogger(loggerOverride.getKey());
+ logger.setLevel(LEVEL_CONFIGURATION.get(loggerOverride.getValue()));
+ configuredLoggers.add(logger);
+ }
+ }
+
+ BeamFnLoggingGrpc.BeamFnLoggingStub stub = BeamFnLoggingGrpc.newStub(channel);
+ inboundObserver = new LogControlObserver();
+ logRecordHandler = new LogRecordHandler(options.as(GcsOptions.class).getExecutorService());
+ logRecordHandler.setLevel(Level.ALL);
+ outboundObserver = streamObserverFactory.apply(stub::logging, inboundObserver);
+ rootLogger.addHandler(logRecordHandler);
+ }
+
+ @Override
+ public void close() throws Exception {
+ // Hang up with the server
+ logRecordHandler.close();
+
+ // Wait for the server to hang up
+ inboundObserverCompletion.get();
+
+ // Reset the logging configuration to what it is at startup
+ for (Logger logger : configuredLoggers) {
+ logger.setLevel(null);
+ }
+ configuredLoggers.clear();
+ LogManager.getLogManager().readConfiguration();
+
+ // Shut the channel down
+ channel.shutdown();
+ if (!channel.awaitTermination(10, TimeUnit.SECONDS)) {
+ channel.shutdownNow();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(BeamFnLoggingClient.class)
+ .add("apiServiceDescriptor", apiServiceDescriptor)
+ .toString();
+ }
+
+ private class LogRecordHandler extends Handler implements Runnable {
+ private final BlockingDeque<BeamFnApi.LogEntry> bufferedLogEntries =
+ new LinkedBlockingDeque<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
+ private final Future<?> bufferedLogWriter;
+ private final ThreadLocal<Consumer<BeamFnApi.LogEntry>> logEntryHandler;
+
+ private LogRecordHandler(ExecutorService executorService) {
+ bufferedLogWriter = executorService.submit(this);
+ logEntryHandler = new ThreadLocal<>();
+ }
+
+ @Override
+ public void publish(LogRecord record) {
+ BeamFnApi.LogEntry.Severity severity = LOG_LEVEL_MAP.get(record.getLevel());
+ if (severity == null) {
+ return;
+ }
+ BeamFnApi.LogEntry.Builder builder = BeamFnApi.LogEntry.newBuilder()
+ .setSeverity(severity)
+ .setLogLocation(record.getLoggerName())
+ .setMessage(FORMATTER.formatMessage(record))
+ .setThread(Integer.toString(record.getThreadID()))
+ .setTimestamp(Timestamp.newBuilder()
+ .setSeconds(record.getMillis() / 1000)
+ .setNanos((int) (record.getMillis() % 1000) * 1_000_000));
+ if (record.getThrown() != null) {
+ builder.setTrace(getStackTraceAsString(record.getThrown()));
+ }
+ // The thread that sends log records should never perform a blocking publish and
+ // only insert log records best effort. We detect which thread is logging
+ // by using the thread local, defaulting to the blocking publish.
+ MoreObjects.firstNonNull(
+ logEntryHandler.get(), this::blockingPublish).accept(builder.build());
+ }
+
+ /** Blocks caller till enough space exists to publish this log entry. */
+ private void blockingPublish(BeamFnApi.LogEntry logEntry) {
+ try {
+ bufferedLogEntries.put(logEntry);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void run() {
+ // Logging which occurs in this thread will attempt to publish log entries into the
+ // above handler which should never block if the queue is full otherwise
+ // this thread will get stuck.
+ logEntryHandler.set(bufferedLogEntries::offer);
+ List<BeamFnApi.LogEntry> additionalLogEntries =
+ new ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
+ try {
+ BeamFnApi.LogEntry logEntry;
+ while ((logEntry = bufferedLogEntries.take()) != POISON_PILL) {
+ BeamFnApi.LogEntry.List.Builder builder =
+ BeamFnApi.LogEntry.List.newBuilder().addLogEntries(logEntry);
+ bufferedLogEntries.drainTo(additionalLogEntries);
+ for (int i = 0; i < additionalLogEntries.size(); ++i) {
+ if (additionalLogEntries.get(i) == POISON_PILL) {
+ additionalLogEntries = additionalLogEntries.subList(0, i);
+ break;
+ }
+ }
+ builder.addAllLogEntries(additionalLogEntries);
+ outboundObserver.onNext(builder.build());
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public void flush() {
+ }
+
+ @Override
+ public void close() {
+ synchronized (outboundObserver) {
+ // If we are done, then a previous caller has already shutdown the queue processing thread
+ // hence we don't need to do it again.
+ if (!bufferedLogWriter.isDone()) {
+ // We check to see if we were able to successfully insert the poison pill at the end of
+ // the queue forcing the remainder of the elements to be processed or if the processing
+ // thread is done.
+ try {
+ // The order of these checks is important because short circuiting will cause us to
+ // insert into the queue first and only if it fails do we check that the thread is done.
+ while (!bufferedLogEntries.offer(POISON_PILL, 60, TimeUnit.SECONDS)
+ || !bufferedLogWriter.isDone()) {
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ waitTillFinish();
+ }
+ outboundObserver.onCompleted();
+ }
+ }
+
+ private void waitTillFinish() {
+ try {
+ bufferedLogWriter.get();
+ } catch (CancellationException e) {
+ // Ignore cancellations
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private class LogControlObserver implements StreamObserver<BeamFnApi.LogControl> {
+ @Override
+ public void onNext(BeamFnApi.LogControl value) {
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ inboundObserverCompletion.completeExceptionally(t);
+ }
+
+ @Override
+ public void onCompleted() {
+ inboundObserverCompletion.complete(null);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/package-info.java
new file mode 100644
index 0000000..7a4d0a8
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Logging service client and JUL logging handler adapter.
+ */
+package org.apache.beam.fn.harness.logging;
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/package-info.java
new file mode 100644
index 0000000..58080e4
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Top level package for Beam Java Fn Harness.
+ */
+package org.apache.beam.fn.harness;
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/AdvancingPhaser.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/AdvancingPhaser.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/AdvancingPhaser.java
new file mode 100644
index 0000000..2007139
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/AdvancingPhaser.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.stream;
+
+import java.util.concurrent.Phaser;
+
+/**
+ * A {@link Phaser} which never terminates. The default {@link Phaser} implementation terminates
+ * after the first advancement.
+ */
+public final class AdvancingPhaser extends Phaser {
+ public AdvancingPhaser(int numParties) {
+ super(numParties);
+ }
+
+ @Override
+ protected boolean onAdvance(int phase, int registeredParties) {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/BufferingStreamObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/BufferingStreamObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/BufferingStreamObserver.java
new file mode 100644
index 0000000..cda3a4b
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/BufferingStreamObserver.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.stream;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * A thread safe {@link StreamObserver} which uses a bounded queue to pass elements to a processing
+ * thread responsible for interacting with the underlying {@link CallStreamObserver}.
+ *
+ * <p>Flow control with the underlying {@link CallStreamObserver} is handled with a {@link Phaser}
+ * which waits for advancement of the phase if the {@link CallStreamObserver} is not ready. Callers
+ * are expected to advance the {@link Phaser} whenever the underlying {@link CallStreamObserver}
+ * becomes ready.
+ */
+@ThreadSafe
+public final class BufferingStreamObserver<T> implements StreamObserver<T> {
+ private static final Object POISON_PILL = new Object();
+ private final LinkedBlockingDeque<T> queue;
+ private final Phaser phaser;
+ private final CallStreamObserver<T> outboundObserver;
+ private final Future<?> queueDrainer;
+ private final int bufferSize;
+
+ public BufferingStreamObserver(
+ Phaser phaser,
+ CallStreamObserver<T> outboundObserver,
+ ExecutorService executor,
+ int bufferSize) {
+ this.phaser = phaser;
+ this.bufferSize = bufferSize;
+ this.queue = new LinkedBlockingDeque<>(bufferSize);
+ this.outboundObserver = outboundObserver;
+ this.queueDrainer = executor.submit(this::drainQueue);
+ }
+
+ private void drainQueue() {
+ try {
+ while (true) {
+ int currentPhase = phaser.getPhase();
+ while (outboundObserver.isReady()) {
+ T value = queue.take();
+ if (value != POISON_PILL) {
+ outboundObserver.onNext(value);
+ } else {
+ return;
+ }
+ }
+ phaser.awaitAdvance(currentPhase);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public void onNext(T value) {
+ try {
+ // Attempt to add an element to the bounded queue occasionally checking to see
+ // if the queue drainer is still alive.
+ while (!queue.offer(value, 60, TimeUnit.SECONDS)) {
+ checkState(!queueDrainer.isDone(), "Stream observer has finished.");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ synchronized (outboundObserver) {
+ // If we are done, then a previous caller has already shutdown the queue processing thread
+ // hence we don't need to do it again.
+ if (!queueDrainer.isDone()) {
+ // We check to see if we were able to successfully insert the poison pill at the front of
+ // the queue to cancel the processing thread eagerly or if the processing thread is done.
+ try {
+ // The order of these checks is important because short circuiting will cause us to
+ // insert into the queue first and only if it fails do we check that the thread is done.
+ while (!queue.offerFirst((T) POISON_PILL, 60, TimeUnit.SECONDS)
+ || !queueDrainer.isDone()) {
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ waitTillFinish();
+ }
+ outboundObserver.onError(t);
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ synchronized (outboundObserver) {
+ // If we are done, then a previous caller has already shutdown the queue processing thread
+ // hence we don't need to do it again.
+ if (!queueDrainer.isDone()) {
+ // We check to see if we were able to successfully insert the poison pill at the end of
+ // the queue forcing the remainder of the elements to be processed or if the processing
+ // thread is done.
+ try {
+ // The order of these checks is important because short circuiting will cause us to
+ // insert into the queue first and only if it fails do we check that the thread is done.
+ while (!queue.offer((T) POISON_PILL, 60, TimeUnit.SECONDS)
+ || !queueDrainer.isDone()) {
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ waitTillFinish();
+ }
+ outboundObserver.onCompleted();
+ }
+ }
+
+ @VisibleForTesting
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ private void waitTillFinish() {
+ try {
+ queueDrainer.get();
+ } catch (CancellationException e) {
+ // Cancellation is expected
+ return;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DirectStreamObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DirectStreamObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DirectStreamObserver.java
new file mode 100644
index 0000000..82a1aa4
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DirectStreamObserver.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.stream;
+
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.Phaser;
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * A {@link StreamObserver} which uses synchronization on the underlying
+ * {@link CallStreamObserver} to provide thread safety.
+ *
+ * <p>Flow control with the underlying {@link CallStreamObserver} is handled with a {@link Phaser}
+ * which waits for advancement of the phase if the {@link CallStreamObserver} is not ready.
+ * Creator is expected to advance the {@link Phaser} whenever the underlying
+ * {@link CallStreamObserver} becomes ready.
+ */
+@ThreadSafe
+public final class DirectStreamObserver<T> implements StreamObserver<T> {
+ private final Phaser phaser;
+ private final CallStreamObserver<T> outboundObserver;
+
+ public DirectStreamObserver(
+ Phaser phaser,
+ CallStreamObserver<T> outboundObserver) {
+ this.phaser = phaser;
+ this.outboundObserver = outboundObserver;
+ }
+
+ @Override
+ public void onNext(T value) {
+ int phase = phaser.getPhase();
+ if (!outboundObserver.isReady()) {
+ phaser.awaitAdvance(phase);
+ }
+ synchronized (outboundObserver) {
+ outboundObserver.onNext(value);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ synchronized (outboundObserver) {
+ outboundObserver.onError(t);
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ synchronized (outboundObserver) {
+ outboundObserver.onCompleted();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/ForwardingClientResponseObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/ForwardingClientResponseObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/ForwardingClientResponseObserver.java
new file mode 100644
index 0000000..ef641b0
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/ForwardingClientResponseObserver.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.stream;
+
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.ClientResponseObserver;
+import io.grpc.stub.StreamObserver;
+
+/**
+ * A {@link ClientResponseObserver} which delegates all {@link StreamObserver} calls.
+ *
+ * <p>Used to wrap existing {@link StreamObserver}s to be able to install an
+ * {@link ClientCallStreamObserver#setOnReadyHandler(Runnable) onReadyHandler}.
+ *
+ * <p>This is as thread-safe as the undering stream observer that is being wrapped.
+ */
+final class ForwardingClientResponseObserver<ReqT, RespT>
+ implements ClientResponseObserver<RespT, ReqT> {
+ private final Runnable onReadyHandler;
+ private final StreamObserver<ReqT> inboundObserver;
+
+ ForwardingClientResponseObserver(
+ StreamObserver<ReqT> inboundObserver, Runnable onReadyHandler) {
+ this.inboundObserver = inboundObserver;
+ this.onReadyHandler = onReadyHandler;
+ }
+
+ @Override
+ public void onNext(ReqT value) {
+ inboundObserver.onNext(value);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ inboundObserver.onError(t);
+ }
+
+ @Override
+ public void onCompleted() {
+ inboundObserver.onCompleted();
+ }
+
+ @Override
+ public void beforeStart(ClientCallStreamObserver<RespT> stream) {
+ stream.setOnReadyHandler(onReadyHandler);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java
new file mode 100644
index 0000000..9326e11
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.stream;
+
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Uses {@link PipelineOptions} to configure which underlying {@link StreamObserver} implementation
+ * to use.
+ */
+public abstract class StreamObserverFactory {
+ public static StreamObserverFactory fromOptions(PipelineOptions options) {
+ List<String> experiments = options.as(DataflowPipelineDebugOptions.class).getExperiments();
+ if (experiments != null && experiments.contains("beam_fn_api_buffered_stream")) {
+ int bufferSize = Buffered.DEFAULT_BUFFER_SIZE;
+ for (String experiment : experiments) {
+ if (experiment.startsWith("beam_fn_api_buffered_stream_buffer_size=")) {
+ bufferSize = Integer.parseInt(
+ experiment.substring("beam_fn_api_buffered_stream_buffer_size=".length()));
+ }
+ }
+ return new Buffered(options.as(GcsOptions.class).getExecutorService(), bufferSize);
+ }
+ return new Direct();
+ }
+
+ public abstract <ReqT, RespT> StreamObserver<RespT> from(
+ Function<StreamObserver<ReqT>, StreamObserver<RespT>> clientFactory,
+ StreamObserver<ReqT> responseObserver);
+
+ private static class Direct extends StreamObserverFactory {
+
+ @Override
+ public <ReqT, RespT> StreamObserver<RespT> from(
+ Function<StreamObserver<ReqT>, StreamObserver<RespT>> clientFactory,
+ StreamObserver<ReqT> inboundObserver) {
+ AdvancingPhaser phaser = new AdvancingPhaser(1);
+ CallStreamObserver<RespT> outboundObserver = (CallStreamObserver<RespT>) clientFactory.apply(
+ new ForwardingClientResponseObserver<ReqT, RespT>(
+ inboundObserver, phaser::arrive));
+ return new DirectStreamObserver<>(phaser, outboundObserver);
+ }
+ }
+
+ private static class Buffered extends StreamObserverFactory {
+ private static final int DEFAULT_BUFFER_SIZE = 64;
+ private final ExecutorService executorService;
+ private final int bufferSize;
+
+ private Buffered(ExecutorService executorService, int bufferSize) {
+ this.executorService = executorService;
+ this.bufferSize = bufferSize;
+ }
+
+ @Override
+ public <ReqT, RespT> StreamObserver<RespT> from(
+ Function<StreamObserver<ReqT>, StreamObserver<RespT>> clientFactory,
+ StreamObserver<ReqT> inboundObserver) {
+ AdvancingPhaser phaser = new AdvancingPhaser(1);
+ CallStreamObserver<RespT> outboundObserver = (CallStreamObserver<RespT>) clientFactory.apply(
+ new ForwardingClientResponseObserver<ReqT, RespT>(
+ inboundObserver, phaser::arrive));
+ return new BufferingStreamObserver<>(
+ phaser, outboundObserver, executorService, bufferSize);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/package-info.java
new file mode 100644
index 0000000..df4042c
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * gRPC stream management.
+ */
+package org.apache.beam.fn.harness.stream;