You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/08/03 19:19:58 UTC
[1/2] beam git commit: [BEAM-1347] Add utility to be able to model
inbound reading as a single input stream
Repository: beam
Updated Branches:
refs/heads/master d4f9e9268 -> 38f189063
[BEAM-1347] Add utility to be able to model inbound reading as a single input stream
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ac7f9739
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ac7f9739
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ac7f9739
Branch: refs/heads/master
Commit: ac7f9739b01626abc559748ae983f6eb988430af
Parents: d4f9e92
Author: Luke Cwik <lc...@google.com>
Authored: Tue Jul 25 09:02:41 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Aug 3 12:19:17 2017 -0700
----------------------------------------------------------------------
.../beam/fn/harness/stream/DataStreams.java | 158 +++++++++++++++++++
.../beam/fn/harness/stream/DataStreamsTest.java | 92 +++++++++++
2 files changed, 250 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ac7f9739/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
new file mode 100644
index 0000000..d23d784
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.stream;
+
+import com.google.common.io.ByteStreams;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
+
+/**
+ * {@link #inbound(Iterator)} treats multiple {@link ByteString}s as a single input stream and
+ * {@link #outbound(CloseableThrowingConsumer)} treats a single {@link OutputStream} as mulitple
+ * {@link ByteString}s.
+ */
+public class DataStreams {
+ /**
+ * Converts multiple {@link ByteString}s into a single {@link InputStream}.
+ *
+ * <p>The iterator is accessed lazily. The supplied {@link Iterator} should block until
+ * either it knows that no more values will be provided or it has the next {@link ByteString}.
+ */
+ public static InputStream inbound(Iterator<ByteString> bytes) {
+ return new Inbound(bytes);
+ }
+
+ /**
+ * Converts a single {@link OutputStream} into multiple {@link ByteString}s.
+ */
+ public static OutputStream outbound(CloseableThrowingConsumer<ByteString> consumer) {
+ // TODO: Migrate logic from BeamFnDataBufferingOutboundObserver
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * An input stream which concatenates multiple {@link ByteString}s. Lazily accesses the
+ * first {@link Iterator} on first access of this input stream.
+ *
+ * <p>Closing this input stream has no effect.
+ */
+ private static class Inbound<T> extends InputStream {
+ private static final InputStream EMPTY_STREAM = new InputStream() {
+ @Override
+ public int read() throws IOException {
+ return -1;
+ }
+ };
+
+ private final Iterator<ByteString> bytes;
+ private InputStream currentStream;
+
+ public Inbound(Iterator<ByteString> bytes) {
+ this.currentStream = EMPTY_STREAM;
+ this.bytes = bytes;
+ }
+
+ @Override
+ public int read() throws IOException {
+ int rval = -1;
+ // Move on to the next stream if we have read nothing
+ while ((rval = currentStream.read()) == -1 && bytes.hasNext()) {
+ currentStream = bytes.next().newInput();
+ }
+ return rval;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int remainingLen = len;
+ while ((remainingLen -= ByteStreams.read(
+ currentStream, b, off + len - remainingLen, remainingLen)) > 0) {
+ if (bytes.hasNext()) {
+ currentStream = bytes.next().newInput();
+ } else {
+ int bytesRead = len - remainingLen;
+ return bytesRead > 0 ? bytesRead : -1;
+ }
+ }
+ return len - remainingLen;
+ }
+ }
+
+ /**
+ * Allows for one or more writing threads to append values to this iterator while one reading
+ * thread reads values. {@link #hasNext()} and {@link #next()} will block until a value is
+ * available or this has been closed.
+ *
+ * <p>External synchronization must be provided if multiple readers would like to access the
+ * {@link Iterator#hasNext()} and {@link Iterator#next()} methods.
+ *
+ * <p>The order or values which are appended to this iterator is nondeterministic when multiple
+ * threads call {@link #accept(Object)}.
+ */
+ public static class BlockingQueueIterator<T> implements
+ CloseableThrowingConsumer<T>, Iterator<T> {
+ private static final Object POISION_PILL = new Object();
+ private final BlockingQueue<T> queue;
+
+ /** Only accessed by {@link Iterator#hasNext()} and {@link Iterator#next()} methods. */
+ private T currentElement;
+
+ public BlockingQueueIterator(BlockingQueue<T> queue) {
+ this.queue = queue;
+ }
+
+ @Override
+ public void close() throws Exception {
+ queue.put((T) POISION_PILL);
+ }
+
+ @Override
+ public void accept(T t) throws Exception {
+ queue.put(t);
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (currentElement == null) {
+ try {
+ currentElement = queue.take();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ }
+ }
+ return currentElement != POISION_PILL;
+ }
+
+ @Override
+ public T next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ T rval = currentElement;
+ currentElement = null;
+ return rval;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ac7f9739/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java
new file mode 100644
index 0000000..d141570
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.stream;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.Iterators;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.SynchronousQueue;
+import org.apache.beam.fn.harness.stream.DataStreams.BlockingQueueIterator;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link DataStreams}. */
+@RunWith(JUnit4.class)
+public class DataStreamsTest {
+ private static final ByteString BYTES_A = ByteString.copyFromUtf8("TestData");
+ private static final ByteString BYTES_B = ByteString.copyFromUtf8("SomeOtherTestData");
+
+ @Test
+ public void testEmptyRead() throws Exception {
+ assertEquals(ByteString.EMPTY, read());
+ assertEquals(ByteString.EMPTY, read(ByteString.EMPTY));
+ assertEquals(ByteString.EMPTY, read(ByteString.EMPTY, ByteString.EMPTY));
+ }
+
+ @Test
+ public void testRead() throws Exception {
+ assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, BYTES_B));
+ assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, ByteString.EMPTY, BYTES_B));
+ assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, BYTES_B, ByteString.EMPTY));
+ }
+
+ @Test(timeout = 10_000)
+ public void testBlockingQueueIteratorWithoutBlocking() throws Exception {
+ BlockingQueueIterator<String> iterator =
+ new BlockingQueueIterator<>(new ArrayBlockingQueue<>(3));
+
+ iterator.accept("A");
+ iterator.accept("B");
+ iterator.close();
+
+ assertEquals(Arrays.asList("A", "B"),
+ Arrays.asList(Iterators.toArray(iterator, String.class)));
+ }
+
+ @Test(timeout = 10_000)
+ public void testBlockingQueueIteratorWithBlocking() throws Exception {
+ // The synchronous queue only allows for one element to transfer at a time and blocks
+ // the sending/receiving parties until both parties are there.
+ final BlockingQueueIterator<String> iterator =
+ new BlockingQueueIterator<>(new SynchronousQueue<>());
+ final CompletableFuture<List<String>> valuesFuture = new CompletableFuture<>();
+ Thread appender = new Thread() {
+ @Override
+ public void run() {
+ valuesFuture.complete(Arrays.asList(Iterators.toArray(iterator, String.class)));
+ }
+ };
+ appender.start();
+ iterator.accept("A");
+ iterator.accept("B");
+ iterator.close();
+ assertEquals(Arrays.asList("A", "B"), valuesFuture.get());
+ appender.join();
+ }
+
+ private static ByteString read(ByteString... bytes) throws IOException {
+ return ByteString.readFrom(DataStreams.inbound(Arrays.asList(bytes).iterator()));
+ }
+}
[2/2] beam git commit: [BEAM-1347] Add utility to be able to model
inbound reading as a single input stream
Posted by lc...@apache.org.
[BEAM-1347] Add utility to be able to model inbound reading as a single input stream
This closes #3638
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/38f18906
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/38f18906
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/38f18906
Branch: refs/heads/master
Commit: 38f18906335c5ed4cebd1dddb6f94688589670a4
Parents: d4f9e92 ac7f973
Author: Luke Cwik <lc...@google.com>
Authored: Thu Aug 3 12:19:49 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Aug 3 12:19:49 2017 -0700
----------------------------------------------------------------------
.../beam/fn/harness/stream/DataStreams.java | 158 +++++++++++++++++++
.../beam/fn/harness/stream/DataStreamsTest.java | 92 +++++++++++
2 files changed, 250 insertions(+)
----------------------------------------------------------------------