You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/08/18 22:00:24 UTC
[1/2] beam git commit: [BEAM-1347] Convert an InputStream into an
Iterable using the Beam Fn data specification
Repository: beam
Updated Branches:
refs/heads/master ae9a2dcfd -> 95cd37faf
[BEAM-1347] Convert an InputStream into an Iterable<T> using the Beam Fn data specification
This is towards sharing common code that supports the Beam Fn State API and the Beam Fn Data API.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b949aa1b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b949aa1b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b949aa1b
Branch: refs/heads/master
Commit: b949aa1bbfd7fbb1a8159e6d650dae6196015e5c
Parents: ae9a2dc
Author: Luke Cwik <lc...@google.com>
Authored: Wed Aug 16 16:44:59 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Aug 18 14:59:49 2017 -0700
----------------------------------------------------------------------
.../beam/fn/harness/stream/DataStreams.java | 73 +++++++-
.../beam/fn/harness/stream/DataStreamsTest.java | 165 ++++++++++++++-----
2 files changed, 192 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b949aa1b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
index d23d784..6967160 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
@@ -17,19 +17,24 @@
*/
package org.apache.beam.fn.harness.stream;
+import static com.google.common.base.Preconditions.checkState;
+
import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingInputStream;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.PushbackInputStream;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
+import org.apache.beam.sdk.coders.Coder;
/**
* {@link #inbound(Iterator)} treats multiple {@link ByteString}s as a single input stream and
- * {@link #outbound(CloseableThrowingConsumer)} treats a single {@link OutputStream} as mulitple
+ * {@link #outbound(CloseableThrowingConsumer)} treats a single {@link OutputStream} as multiple
* {@link ByteString}s.
*/
public class DataStreams {
@@ -100,6 +105,72 @@ public class DataStreams {
}
/**
+ * An adapter which converts an {@link InputStream} to an {@link Iterator} of {@code T} values
+ * using the specified {@link Coder}.
+ *
+ * <p>Note that this adapter follows the Beam Fn API specification for forcing values that decode
+ * consuming zero bytes to consuming exactly one byte.
+ *
+ * <p>Note that access to the underlying {@link InputStream} is lazy and will only be invoked on
+ * first access to {@link #next()} or {@link #hasNext()}.
+ */
+ public static class DataStreamDecoder<T> implements Iterator<T> {
+ private enum State { READ_REQUIRED, HAS_NEXT, EOF };
+
+ private final CountingInputStream countingInputStream;
+ private final PushbackInputStream pushbackInputStream;
+ private final Coder<T> coder;
+ private State currentState;
+ private T next;
+ public DataStreamDecoder(Coder<T> coder, InputStream inputStream) {
+ this.currentState = State.READ_REQUIRED;
+ this.coder = coder;
+ this.pushbackInputStream = new PushbackInputStream(inputStream, 1);
+ this.countingInputStream = new CountingInputStream(pushbackInputStream);
+ }
+
+ @Override
+ public boolean hasNext() {
+ switch (currentState) {
+ case EOF:
+ return false;
+ case READ_REQUIRED:
+ try {
+ int nextByte = pushbackInputStream.read();
+ if (nextByte == -1) {
+ currentState = State.EOF;
+ return false;
+ }
+
+ pushbackInputStream.unread(nextByte);
+ long count = countingInputStream.getCount();
+ next = coder.decode(countingInputStream);
+ // Skip one byte if decoding the value consumed 0 bytes.
+ if (countingInputStream.getCount() - count == 0) {
+ checkState(countingInputStream.read() != -1, "Unexpected EOF reached");
+ }
+ currentState = State.HAS_NEXT;
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ // fall through expected
+ case HAS_NEXT:
+ return true;
+ }
+ throw new IllegalStateException(String.format("Unknown state %s", currentState));
+ }
+
+ @Override
+ public T next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ currentState = State.READ_REQUIRED;
+ return next;
+ }
+ }
+
+ /**
* Allows for one or more writing threads to append values to this iterator while one reading
* thread reads values. {@link #hasNext()} and {@link #next()} will block until a value is
* available or this has been closed.
http://git-wip-us.apache.org/repos/asf/beam/blob/b949aa1b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java
index d141570..f7a87e1 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java
@@ -17,76 +17,151 @@
*/
package org.apache.beam.fn.harness.stream;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assume.assumeTrue;
import com.google.common.collect.Iterators;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
import com.google.protobuf.ByteString;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.SynchronousQueue;
import org.apache.beam.fn.harness.stream.DataStreams.BlockingQueueIterator;
+import org.apache.beam.fn.harness.stream.DataStreams.DataStreamDecoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link DataStreams}. */
-@RunWith(JUnit4.class)
+@RunWith(Enclosed.class)
public class DataStreamsTest {
- private static final ByteString BYTES_A = ByteString.copyFromUtf8("TestData");
- private static final ByteString BYTES_B = ByteString.copyFromUtf8("SomeOtherTestData");
-
- @Test
- public void testEmptyRead() throws Exception {
- assertEquals(ByteString.EMPTY, read());
- assertEquals(ByteString.EMPTY, read(ByteString.EMPTY));
- assertEquals(ByteString.EMPTY, read(ByteString.EMPTY, ByteString.EMPTY));
- }
- @Test
- public void testRead() throws Exception {
- assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, BYTES_B));
- assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, ByteString.EMPTY, BYTES_B));
- assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, BYTES_B, ByteString.EMPTY));
+ /** Tests for {@link DataStreams.Inbound}. */
+ @RunWith(JUnit4.class)
+ public static class InboundTest {
+ private static final ByteString BYTES_A = ByteString.copyFromUtf8("TestData");
+ private static final ByteString BYTES_B = ByteString.copyFromUtf8("SomeOtherTestData");
+
+ @Test
+ public void testEmptyRead() throws Exception {
+ assertEquals(ByteString.EMPTY, read());
+ assertEquals(ByteString.EMPTY, read(ByteString.EMPTY));
+ assertEquals(ByteString.EMPTY, read(ByteString.EMPTY, ByteString.EMPTY));
+ }
+
+ @Test
+ public void testRead() throws Exception {
+ assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, BYTES_B));
+ assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, ByteString.EMPTY, BYTES_B));
+ assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, BYTES_B, ByteString.EMPTY));
+ }
+
+ private static ByteString read(ByteString... bytes) throws IOException {
+ return ByteString.readFrom(DataStreams.inbound(Arrays.asList(bytes).iterator()));
+ }
}
- @Test(timeout = 10_000)
- public void testBlockingQueueIteratorWithoutBlocking() throws Exception {
- BlockingQueueIterator<String> iterator =
- new BlockingQueueIterator<>(new ArrayBlockingQueue<>(3));
+ /** Tests for {@link DataStreams.BlockingQueueIterator}. */
+ @RunWith(JUnit4.class)
+ public static class BlockingQueueIteratorTest {
+ @Test(timeout = 10_000)
+ public void testBlockingQueueIteratorWithoutBlocking() throws Exception {
+ BlockingQueueIterator<String> iterator =
+ new BlockingQueueIterator<>(new ArrayBlockingQueue<>(3));
- iterator.accept("A");
- iterator.accept("B");
- iterator.close();
+ iterator.accept("A");
+ iterator.accept("B");
+ iterator.close();
- assertEquals(Arrays.asList("A", "B"),
- Arrays.asList(Iterators.toArray(iterator, String.class)));
+ assertEquals(Arrays.asList("A", "B"),
+ Arrays.asList(Iterators.toArray(iterator, String.class)));
+ }
+
+ @Test(timeout = 10_000)
+ public void testBlockingQueueIteratorWithBlocking() throws Exception {
+ // The synchronous queue only allows for one element to transfer at a time and blocks
+ // the sending/receiving parties until both parties are there.
+ final BlockingQueueIterator<String> iterator =
+ new BlockingQueueIterator<>(new SynchronousQueue<>());
+ final CompletableFuture<List<String>> valuesFuture = new CompletableFuture<>();
+ Thread appender = new Thread() {
+ @Override
+ public void run() {
+ valuesFuture.complete(Arrays.asList(Iterators.toArray(iterator, String.class)));
+ }
+ };
+ appender.start();
+ iterator.accept("A");
+ iterator.accept("B");
+ iterator.close();
+ assertEquals(Arrays.asList("A", "B"), valuesFuture.get());
+ appender.join();
+ }
}
- @Test(timeout = 10_000)
- public void testBlockingQueueIteratorWithBlocking() throws Exception {
- // The synchronous queue only allows for one element to transfer at a time and blocks
- // the sending/receiving parties until both parties are there.
- final BlockingQueueIterator<String> iterator =
- new BlockingQueueIterator<>(new SynchronousQueue<>());
- final CompletableFuture<List<String>> valuesFuture = new CompletableFuture<>();
- Thread appender = new Thread() {
- @Override
- public void run() {
- valuesFuture.complete(Arrays.asList(Iterators.toArray(iterator, String.class)));
+ /** Tests for {@link DataStreams.DataStreamDecoder}. */
+ @RunWith(JUnit4.class)
+ public static class DataStreamDecoderTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testEmptyInputStream() throws Exception {
+ testDecoderWith(StringUtf8Coder.of());
+ }
+
+ @Test
+ public void testNonEmptyInputStream() throws Exception {
+ testDecoderWith(StringUtf8Coder.of(), "A", "BC", "DEF", "GHIJ");
+ }
+
+ @Test
+ public void testNonEmptyInputStreamWithZeroLengthCoder() throws Exception {
+ CountingOutputStream countingOutputStream =
+ new CountingOutputStream(ByteStreams.nullOutputStream());
+ GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, countingOutputStream);
+ assumeTrue(countingOutputStream.getCount() == 0);
+
+ testDecoderWith(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE, GlobalWindow.INSTANCE);
+ }
+
+ private <T> void testDecoderWith(Coder<T> coder, T... expected) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ for (T value : expected) {
+ int size = baos.size();
+ coder.encode(value, baos);
+ // Pad an arbitrary byte when values encode to zero bytes
+ if (baos.size() - size == 0) {
+ baos.write(0);
+ }
}
- };
- appender.start();
- iterator.accept("A");
- iterator.accept("B");
- iterator.close();
- assertEquals(Arrays.asList("A", "B"), valuesFuture.get());
- appender.join();
- }
- private static ByteString read(ByteString... bytes) throws IOException {
- return ByteString.readFrom(DataStreams.inbound(Arrays.asList(bytes).iterator()));
+ Iterator<T> decoder =
+ new DataStreamDecoder<>(coder, new ByteArrayInputStream(baos.toByteArray()));
+
+ Object[] actual = Iterators.toArray(decoder, Object.class);
+ assertArrayEquals(expected, actual);
+
+ assertFalse(decoder.hasNext());
+ assertFalse(decoder.hasNext());
+
+ thrown.expect(NoSuchElementException.class);
+ decoder.next();
+ }
}
}
[2/2] beam git commit: [BEAM-1347] Convert an InputStream into an
Iterable using the Beam Fn data specification
Posted by lc...@apache.org.
[BEAM-1347] Convert an InputStream into an Iterable<T> using the Beam Fn data specification
This closes #3724
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/95cd37fa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/95cd37fa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/95cd37fa
Branch: refs/heads/master
Commit: 95cd37faff1c57dc4184ed001d8d168bc42d174e
Parents: ae9a2dc b949aa1
Author: Luke Cwik <lc...@google.com>
Authored: Fri Aug 18 15:00:15 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Aug 18 15:00:15 2017 -0700
----------------------------------------------------------------------
.../beam/fn/harness/stream/DataStreams.java | 73 +++++++-
.../beam/fn/harness/stream/DataStreamsTest.java | 165 ++++++++++++++-----
2 files changed, 192 insertions(+), 46 deletions(-)
----------------------------------------------------------------------