You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/08/03 19:19:58 UTC

[1/2] beam git commit: [BEAM-1347] Add utility to be able to model inbound reading as a single input stream

Repository: beam
Updated Branches:
  refs/heads/master d4f9e9268 -> 38f189063


[BEAM-1347] Add utility to be able to model inbound reading as a single input stream


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ac7f9739
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ac7f9739
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ac7f9739

Branch: refs/heads/master
Commit: ac7f9739b01626abc559748ae983f6eb988430af
Parents: d4f9e92
Author: Luke Cwik <lc...@google.com>
Authored: Tue Jul 25 09:02:41 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Aug 3 12:19:17 2017 -0700

----------------------------------------------------------------------
 .../beam/fn/harness/stream/DataStreams.java     | 158 +++++++++++++++++++
 .../beam/fn/harness/stream/DataStreamsTest.java |  92 +++++++++++
 2 files changed, 250 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ac7f9739/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
new file mode 100644
index 0000000..d23d784
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.stream;
+
+import com.google.common.io.ByteStreams;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
+
+/**
+ * {@link #inbound(Iterator)} treats multiple {@link ByteString}s as a single input stream and
+ * {@link #outbound(CloseableThrowingConsumer)} treats a single {@link OutputStream} as mulitple
+ * {@link ByteString}s.
+ */
+public class DataStreams {
+  /**
+   * Converts multiple {@link ByteString}s into a single {@link InputStream}.
+   *
+   * <p>The iterator is accessed lazily. The supplied {@link Iterator} should block until
+   * either it knows that no more values will be provided or it has the next {@link ByteString}.
+   */
+  public static InputStream inbound(Iterator<ByteString> bytes) {
+    return new Inbound(bytes);
+  }
+
+  /**
+   * Converts a single {@link OutputStream} into multiple {@link ByteString}s.
+   */
+  public static OutputStream outbound(CloseableThrowingConsumer<ByteString> consumer) {
+    // TODO: Migrate logic from BeamFnDataBufferingOutboundObserver
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * An input stream which concatenates multiple {@link ByteString}s. Lazily accesses the
+   * first {@link Iterator} on first access of this input stream.
+   *
+   * <p>Closing this input stream has no effect.
+   */
+  private static class Inbound<T> extends InputStream {
+    private static final InputStream EMPTY_STREAM = new InputStream() {
+      @Override
+      public int read() throws IOException {
+        return -1;
+      }
+    };
+
+    private final Iterator<ByteString> bytes;
+    private InputStream currentStream;
+
+    public Inbound(Iterator<ByteString> bytes) {
+      this.currentStream = EMPTY_STREAM;
+      this.bytes = bytes;
+    }
+
+    @Override
+    public int read() throws IOException {
+      int rval = -1;
+      // Move on to the next stream if we have read nothing
+      while ((rval = currentStream.read()) == -1 && bytes.hasNext()) {
+        currentStream = bytes.next().newInput();
+      }
+      return rval;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+      int remainingLen = len;
+      while ((remainingLen -= ByteStreams.read(
+          currentStream, b, off + len - remainingLen, remainingLen)) > 0) {
+        if (bytes.hasNext()) {
+          currentStream = bytes.next().newInput();
+        } else {
+          int bytesRead = len - remainingLen;
+          return bytesRead > 0 ? bytesRead : -1;
+        }
+      }
+      return len - remainingLen;
+    }
+  }
+
+  /**
+   * Allows for one or more writing threads to append values to this iterator while one reading
+   * thread reads values. {@link #hasNext()} and {@link #next()} will block until a value is
+   * available or this has been closed.
+   *
+   * <p>External synchronization must be provided if multiple readers would like to access the
+   * {@link Iterator#hasNext()} and {@link Iterator#next()} methods.
+   *
+   * <p>The order or values which are appended to this iterator is nondeterministic when multiple
+   * threads call {@link #accept(Object)}.
+   */
+  public static class BlockingQueueIterator<T> implements
+      CloseableThrowingConsumer<T>, Iterator<T> {
+    private static final Object POISION_PILL = new Object();
+    private final BlockingQueue<T> queue;
+
+    /** Only accessed by {@link Iterator#hasNext()} and {@link Iterator#next()} methods. */
+    private T currentElement;
+
+    public BlockingQueueIterator(BlockingQueue<T> queue) {
+      this.queue = queue;
+    }
+
+    @Override
+    public void close() throws Exception {
+      queue.put((T) POISION_PILL);
+    }
+
+    @Override
+    public void accept(T t) throws Exception {
+      queue.put(t);
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (currentElement == null) {
+        try {
+          currentElement = queue.take();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IllegalStateException(e);
+        }
+      }
+      return currentElement != POISION_PILL;
+    }
+
+    @Override
+    public T next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      T rval = currentElement;
+      currentElement = null;
+      return rval;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ac7f9739/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java
new file mode 100644
index 0000000..d141570
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.stream;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.Iterators;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.SynchronousQueue;
+import org.apache.beam.fn.harness.stream.DataStreams.BlockingQueueIterator;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link DataStreams}. */
+@RunWith(JUnit4.class)
+public class DataStreamsTest {
+  private static final ByteString BYTES_A = ByteString.copyFromUtf8("TestData");
+  private static final ByteString BYTES_B = ByteString.copyFromUtf8("SomeOtherTestData");
+
+  @Test
+  public void testEmptyRead() throws Exception {
+    assertEquals(ByteString.EMPTY, read());
+    assertEquals(ByteString.EMPTY, read(ByteString.EMPTY));
+    assertEquals(ByteString.EMPTY, read(ByteString.EMPTY, ByteString.EMPTY));
+  }
+
+  @Test
+  public void testRead() throws Exception {
+    assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, BYTES_B));
+    assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, ByteString.EMPTY, BYTES_B));
+    assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, BYTES_B, ByteString.EMPTY));
+  }
+
+  @Test(timeout = 10_000)
+  public void testBlockingQueueIteratorWithoutBlocking() throws Exception {
+    BlockingQueueIterator<String> iterator =
+        new BlockingQueueIterator<>(new ArrayBlockingQueue<>(3));
+
+    iterator.accept("A");
+    iterator.accept("B");
+    iterator.close();
+
+    assertEquals(Arrays.asList("A", "B"),
+        Arrays.asList(Iterators.toArray(iterator, String.class)));
+  }
+
+  @Test(timeout = 10_000)
+  public void testBlockingQueueIteratorWithBlocking() throws Exception {
+    // The synchronous queue only allows for one element to transfer at a time and blocks
+    // the sending/receiving parties until both parties are there.
+    final BlockingQueueIterator<String> iterator =
+        new BlockingQueueIterator<>(new SynchronousQueue<>());
+    final CompletableFuture<List<String>> valuesFuture = new CompletableFuture<>();
+    Thread appender = new Thread() {
+      @Override
+      public void run() {
+        valuesFuture.complete(Arrays.asList(Iterators.toArray(iterator, String.class)));
+      }
+    };
+    appender.start();
+    iterator.accept("A");
+    iterator.accept("B");
+    iterator.close();
+    assertEquals(Arrays.asList("A", "B"), valuesFuture.get());
+    appender.join();
+  }
+
+  private static ByteString read(ByteString... bytes) throws IOException {
+    return ByteString.readFrom(DataStreams.inbound(Arrays.asList(bytes).iterator()));
+  }
+}


[2/2] beam git commit: [BEAM-1347] Add utility to be able to model inbound reading as a single input stream

Posted by lc...@apache.org.
[BEAM-1347] Add utility to be able to model inbound reading as a single input stream

This closes #3638


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/38f18906
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/38f18906
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/38f18906

Branch: refs/heads/master
Commit: 38f18906335c5ed4cebd1dddb6f94688589670a4
Parents: d4f9e92 ac7f973
Author: Luke Cwik <lc...@google.com>
Authored: Thu Aug 3 12:19:49 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Aug 3 12:19:49 2017 -0700

----------------------------------------------------------------------
 .../beam/fn/harness/stream/DataStreams.java     | 158 +++++++++++++++++++
 .../beam/fn/harness/stream/DataStreamsTest.java |  92 +++++++++++
 2 files changed, 250 insertions(+)
----------------------------------------------------------------------