You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/08/18 22:00:24 UTC

[1/2] beam git commit: [BEAM-1347] Convert an InputStream into an Iterable using the Beam Fn data specification

Repository: beam
Updated Branches:
  refs/heads/master ae9a2dcfd -> 95cd37faf


[BEAM-1347] Convert an InputStream into an Iterable<T> using the Beam Fn data specification

This is towards sharing common code that supports the Beam Fn State API and the Beam Fn Data API.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b949aa1b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b949aa1b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b949aa1b

Branch: refs/heads/master
Commit: b949aa1bbfd7fbb1a8159e6d650dae6196015e5c
Parents: ae9a2dc
Author: Luke Cwik <lc...@google.com>
Authored: Wed Aug 16 16:44:59 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Aug 18 14:59:49 2017 -0700

----------------------------------------------------------------------
 .../beam/fn/harness/stream/DataStreams.java     |  73 +++++++-
 .../beam/fn/harness/stream/DataStreamsTest.java | 165 ++++++++++++++-----
 2 files changed, 192 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b949aa1b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
index d23d784..6967160 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
@@ -17,19 +17,24 @@
  */
 package org.apache.beam.fn.harness.stream;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingInputStream;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.PushbackInputStream;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.concurrent.BlockingQueue;
 import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
+import org.apache.beam.sdk.coders.Coder;
 
 /**
  * {@link #inbound(Iterator)} treats multiple {@link ByteString}s as a single input stream and
- * {@link #outbound(CloseableThrowingConsumer)} treats a single {@link OutputStream} as mulitple
+ * {@link #outbound(CloseableThrowingConsumer)} treats a single {@link OutputStream} as multiple
  * {@link ByteString}s.
  */
 public class DataStreams {
@@ -100,6 +105,72 @@ public class DataStreams {
   }
 
   /**
+   * An adapter which converts an {@link InputStream} to an {@link Iterator} of {@code T} values
+   * using the specified {@link Coder}.
+   *
+   * <p>Note that this adapter follows the Beam Fn API specification for forcing values that decode
+   * consuming zero bytes to consuming exactly one byte.
+   *
+   * <p>Note that access to the underlying {@link InputStream} is lazy and will only be invoked on
+   * first access to {@link #next()} or {@link #hasNext()}.
+   */
+  public static class DataStreamDecoder<T> implements Iterator<T> {
+    private enum State { READ_REQUIRED, HAS_NEXT, EOF };
+
+    private final CountingInputStream countingInputStream;
+    private final PushbackInputStream pushbackInputStream;
+    private final Coder<T> coder;
+    private State currentState;
+    private T next;
+    public DataStreamDecoder(Coder<T> coder, InputStream inputStream) {
+      this.currentState = State.READ_REQUIRED;
+      this.coder = coder;
+      this.pushbackInputStream = new PushbackInputStream(inputStream, 1);
+      this.countingInputStream = new CountingInputStream(pushbackInputStream);
+    }
+
+    @Override
+    public boolean hasNext() {
+      switch (currentState) {
+        case EOF:
+          return false;
+        case READ_REQUIRED:
+          try {
+            int nextByte = pushbackInputStream.read();
+            if (nextByte == -1) {
+              currentState = State.EOF;
+              return false;
+            }
+
+            pushbackInputStream.unread(nextByte);
+            long count = countingInputStream.getCount();
+            next = coder.decode(countingInputStream);
+            // Skip one byte if decoding the value consumed 0 bytes.
+            if (countingInputStream.getCount() - count == 0) {
+              checkState(countingInputStream.read() != -1, "Unexpected EOF reached");
+            }
+            currentState = State.HAS_NEXT;
+          } catch (IOException e) {
+            throw new IllegalStateException(e);
+          }
+          // fall through expected
+        case HAS_NEXT:
+          return true;
+      }
+      throw new IllegalStateException(String.format("Unknown state %s", currentState));
+    }
+
+    @Override
+    public T next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      currentState = State.READ_REQUIRED;
+      return next;
+    }
+  }
+
+  /**
    * Allows for one or more writing threads to append values to this iterator while one reading
    * thread reads values. {@link #hasNext()} and {@link #next()} will block until a value is
    * available or this has been closed.

http://git-wip-us.apache.org/repos/asf/beam/blob/b949aa1b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java
index d141570..f7a87e1 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java
@@ -17,76 +17,151 @@
  */
 package org.apache.beam.fn.harness.stream;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assume.assumeTrue;
 
 import com.google.common.collect.Iterators;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
 import com.google.protobuf.ByteString;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.SynchronousQueue;
 import org.apache.beam.fn.harness.stream.DataStreams.BlockingQueueIterator;
+import org.apache.beam.fn.harness.stream.DataStreams.DataStreamDecoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /** Tests for {@link DataStreams}. */
-@RunWith(JUnit4.class)
+@RunWith(Enclosed.class)
 public class DataStreamsTest {
-  private static final ByteString BYTES_A = ByteString.copyFromUtf8("TestData");
-  private static final ByteString BYTES_B = ByteString.copyFromUtf8("SomeOtherTestData");
-
-  @Test
-  public void testEmptyRead() throws Exception {
-    assertEquals(ByteString.EMPTY, read());
-    assertEquals(ByteString.EMPTY, read(ByteString.EMPTY));
-    assertEquals(ByteString.EMPTY, read(ByteString.EMPTY, ByteString.EMPTY));
-  }
 
-  @Test
-  public void testRead() throws Exception {
-    assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, BYTES_B));
-    assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, ByteString.EMPTY, BYTES_B));
-    assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, BYTES_B, ByteString.EMPTY));
+  /** Tests for {@link DataStreams.Inbound}. */
+  @RunWith(JUnit4.class)
+  public static class InboundTest {
+    private static final ByteString BYTES_A = ByteString.copyFromUtf8("TestData");
+    private static final ByteString BYTES_B = ByteString.copyFromUtf8("SomeOtherTestData");
+
+    @Test
+    public void testEmptyRead() throws Exception {
+      assertEquals(ByteString.EMPTY, read());
+      assertEquals(ByteString.EMPTY, read(ByteString.EMPTY));
+      assertEquals(ByteString.EMPTY, read(ByteString.EMPTY, ByteString.EMPTY));
+    }
+
+    @Test
+    public void testRead() throws Exception {
+      assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, BYTES_B));
+      assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, ByteString.EMPTY, BYTES_B));
+      assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, BYTES_B, ByteString.EMPTY));
+    }
+
+    private static ByteString read(ByteString... bytes) throws IOException {
+      return ByteString.readFrom(DataStreams.inbound(Arrays.asList(bytes).iterator()));
+    }
   }
 
-  @Test(timeout = 10_000)
-  public void testBlockingQueueIteratorWithoutBlocking() throws Exception {
-    BlockingQueueIterator<String> iterator =
-        new BlockingQueueIterator<>(new ArrayBlockingQueue<>(3));
+  /** Tests for {@link DataStreams.BlockingQueueIterator}. */
+  @RunWith(JUnit4.class)
+  public static class BlockingQueueIteratorTest {
+    @Test(timeout = 10_000)
+    public void testBlockingQueueIteratorWithoutBlocking() throws Exception {
+      BlockingQueueIterator<String> iterator =
+          new BlockingQueueIterator<>(new ArrayBlockingQueue<>(3));
 
-    iterator.accept("A");
-    iterator.accept("B");
-    iterator.close();
+      iterator.accept("A");
+      iterator.accept("B");
+      iterator.close();
 
-    assertEquals(Arrays.asList("A", "B"),
-        Arrays.asList(Iterators.toArray(iterator, String.class)));
+      assertEquals(Arrays.asList("A", "B"),
+          Arrays.asList(Iterators.toArray(iterator, String.class)));
+    }
+
+    @Test(timeout = 10_000)
+    public void testBlockingQueueIteratorWithBlocking() throws Exception {
+      // The synchronous queue only allows for one element to transfer at a time and blocks
+      // the sending/receiving parties until both parties are there.
+      final BlockingQueueIterator<String> iterator =
+          new BlockingQueueIterator<>(new SynchronousQueue<>());
+      final CompletableFuture<List<String>> valuesFuture = new CompletableFuture<>();
+      Thread appender = new Thread() {
+        @Override
+        public void run() {
+          valuesFuture.complete(Arrays.asList(Iterators.toArray(iterator, String.class)));
+        }
+      };
+      appender.start();
+      iterator.accept("A");
+      iterator.accept("B");
+      iterator.close();
+      assertEquals(Arrays.asList("A", "B"), valuesFuture.get());
+      appender.join();
+    }
   }
 
-  @Test(timeout = 10_000)
-  public void testBlockingQueueIteratorWithBlocking() throws Exception {
-    // The synchronous queue only allows for one element to transfer at a time and blocks
-    // the sending/receiving parties until both parties are there.
-    final BlockingQueueIterator<String> iterator =
-        new BlockingQueueIterator<>(new SynchronousQueue<>());
-    final CompletableFuture<List<String>> valuesFuture = new CompletableFuture<>();
-    Thread appender = new Thread() {
-      @Override
-      public void run() {
-        valuesFuture.complete(Arrays.asList(Iterators.toArray(iterator, String.class)));
+  /** Tests for {@link DataStreams.DataStreamDecoder}. */
+  @RunWith(JUnit4.class)
+  public static class DataStreamDecoderTest {
+    @Rule public ExpectedException thrown = ExpectedException.none();
+
+    @Test
+    public void testEmptyInputStream() throws Exception {
+      testDecoderWith(StringUtf8Coder.of());
+    }
+
+    @Test
+    public void testNonEmptyInputStream() throws Exception {
+      testDecoderWith(StringUtf8Coder.of(), "A", "BC", "DEF", "GHIJ");
+    }
+
+    @Test
+    public void testNonEmptyInputStreamWithZeroLengthCoder() throws Exception {
+      CountingOutputStream countingOutputStream =
+          new CountingOutputStream(ByteStreams.nullOutputStream());
+      GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, countingOutputStream);
+      assumeTrue(countingOutputStream.getCount() == 0);
+
+      testDecoderWith(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE, GlobalWindow.INSTANCE);
+    }
+
+    private <T> void testDecoderWith(Coder<T> coder, T... expected) throws IOException {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      for (T value : expected) {
+        int size = baos.size();
+        coder.encode(value, baos);
+        // Pad an arbitrary byte when values encode to zero bytes
+        if (baos.size() - size == 0) {
+          baos.write(0);
+        }
       }
-    };
-    appender.start();
-    iterator.accept("A");
-    iterator.accept("B");
-    iterator.close();
-    assertEquals(Arrays.asList("A", "B"), valuesFuture.get());
-    appender.join();
-  }
 
-  private static ByteString read(ByteString... bytes) throws IOException {
-    return ByteString.readFrom(DataStreams.inbound(Arrays.asList(bytes).iterator()));
+      Iterator<T> decoder =
+          new DataStreamDecoder<>(coder, new ByteArrayInputStream(baos.toByteArray()));
+
+      Object[] actual = Iterators.toArray(decoder, Object.class);
+      assertArrayEquals(expected, actual);
+
+      assertFalse(decoder.hasNext());
+      assertFalse(decoder.hasNext());
+
+      thrown.expect(NoSuchElementException.class);
+      decoder.next();
+    }
   }
 }


[2/2] beam git commit: [BEAM-1347] Convert an InputStream into an Iterable using the Beam Fn data specification

Posted by lc...@apache.org.
[BEAM-1347] Convert an InputStream into an Iterable<T> using the Beam Fn data specification

This closes #3724


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/95cd37fa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/95cd37fa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/95cd37fa

Branch: refs/heads/master
Commit: 95cd37faff1c57dc4184ed001d8d168bc42d174e
Parents: ae9a2dc b949aa1
Author: Luke Cwik <lc...@google.com>
Authored: Fri Aug 18 15:00:15 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Aug 18 15:00:15 2017 -0700

----------------------------------------------------------------------
 .../beam/fn/harness/stream/DataStreams.java     |  73 +++++++-
 .../beam/fn/harness/stream/DataStreamsTest.java | 165 ++++++++++++++-----
 2 files changed, 192 insertions(+), 46 deletions(-)
----------------------------------------------------------------------