You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/06 23:25:39 UTC
[2/2] incubator-beam git commit: Implement InProcessCreate with an
OffsetBasedSource
Implement InProcessCreate with an OffsetBasedSource
This allows dynamic splits to be exercised in the create.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/788611d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/788611d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/788611d6
Branch: refs/heads/master
Commit: 788611d6997f297f2e795dd6dda88e29fe055368
Parents: 15a8334
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 18 11:00:20 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Apr 6 14:25:09 2016 -0700
----------------------------------------------------------------------
.../sdk/runners/inprocess/InProcessCreate.java | 147 ++++++++++---------
.../runners/inprocess/InProcessCreateTest.java | 62 ++++++--
2 files changed, 123 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/788611d6/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
index efda8fc..4dbc34d 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
@@ -21,6 +21,8 @@ import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.io.BoundedSource;
+import com.google.cloud.dataflow.sdk.io.OffsetBasedSource;
+import com.google.cloud.dataflow.sdk.io.OffsetBasedSource.OffsetBasedReader;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.Create;
@@ -32,12 +34,8 @@ import com.google.cloud.dataflow.sdk.values.PInput;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.PeekingIterator;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.NoSuchElementException;
@@ -74,7 +72,7 @@ class InProcessCreate<T> extends ForwardingPTransform<PInput, PCollection<T>> {
}
InMemorySource<T> source;
try {
- source = new InMemorySource<>(original.getElements(), elementCoder);
+ source = InMemorySource.fromIterable(original.getElements(), elementCoder);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -89,56 +87,35 @@ class InProcessCreate<T> extends ForwardingPTransform<PInput, PCollection<T>> {
}
@VisibleForTesting
- static class InMemorySource<T> extends BoundedSource<T> {
- private final Collection<byte[]> allElementsBytes;
+ static class InMemorySource<T> extends OffsetBasedSource<T> {
+ private final List<byte[]> allElementsBytes;
private final long totalSize;
private final Coder<T> coder;
- public InMemorySource(Iterable<T> elements, Coder<T> elemCoder)
+ public static <T> InMemorySource<T> fromIterable(Iterable<T> elements, Coder<T> elemCoder)
throws CoderException, IOException {
- allElementsBytes = new ArrayList<>();
+ ImmutableList.Builder<byte[]> allElementsBytes = ImmutableList.builder();
long totalSize = 0L;
for (T element : elements) {
byte[] bytes = CoderUtils.encodeToByteArray(elemCoder, element);
allElementsBytes.add(bytes);
totalSize += bytes.length;
}
- this.totalSize = totalSize;
- this.coder = elemCoder;
+ return new InMemorySource<>(allElementsBytes.build(), totalSize, elemCoder);
}
/**
* Create a new source with the specified bytes. The new source owns the input element bytes,
* which must not be modified after this constructor is called.
*/
- private InMemorySource(Collection<byte[]> elementBytes, long totalSize, Coder<T> coder) {
+ private InMemorySource(List<byte[]> elementBytes, long totalSize, Coder<T> coder) {
+ super(0, elementBytes.size(), 1);
this.allElementsBytes = ImmutableList.copyOf(elementBytes);
this.totalSize = totalSize;
this.coder = coder;
}
@Override
- public List<? extends BoundedSource<T>> splitIntoBundles(
- long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
- ImmutableList.Builder<InMemorySource<T>> resultBuilder = ImmutableList.builder();
- long currentSourceSize = 0L;
- List<byte[]> currentElems = new ArrayList<>();
- for (byte[] elemBytes : allElementsBytes) {
- currentElems.add(elemBytes);
- currentSourceSize += elemBytes.length;
- if (currentSourceSize >= desiredBundleSizeBytes) {
- resultBuilder.add(new InMemorySource<>(currentElems, currentSourceSize, coder));
- currentElems.clear();
- currentSourceSize = 0L;
- }
- }
- if (!currentElems.isEmpty()) {
- resultBuilder.add(new InMemorySource<>(currentElems, currentSourceSize, coder));
- }
- return resultBuilder.build();
- }
-
- @Override
public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
return totalSize;
}
@@ -150,7 +127,7 @@ class InProcessCreate<T> extends ForwardingPTransform<PInput, PCollection<T>> {
@Override
public BoundedSource.BoundedReader<T> createReader(PipelineOptions options) throws IOException {
- return new BytesReader();
+ return new BytesReader<>(this);
}
@Override
@@ -161,50 +138,80 @@ class InProcessCreate<T> extends ForwardingPTransform<PInput, PCollection<T>> {
return coder;
}
- private class BytesReader extends BoundedReader<T> {
- private final PeekingIterator<byte[]> iter;
- /**
- * Use an optional to distinguish between null next element (as Optional.absent()) and no next
- * element (next is null).
- */
- @Nullable private Optional<T> next;
+ @Override
+ public long getMaxEndOffset(PipelineOptions options) throws Exception {
+ return allElementsBytes.size();
+ }
- public BytesReader() {
- this.iter = Iterators.peekingIterator(allElementsBytes.iterator());
- }
+ @Override
+ public OffsetBasedSource<T> createSourceForSubrange(long start, long end) {
+ List<byte[]> primaryElems = allElementsBytes.subList((int) start, (int) end);
+ long primarySizeEstimate =
+ (long) (totalSize * primaryElems.size() / (double) allElementsBytes.size());
+ return new InMemorySource<>(primaryElems, primarySizeEstimate, coder);
+ }
- @Override
- public BoundedSource<T> getCurrentSource() {
- return InMemorySource.this;
+ @Override
+ public long getBytesPerOffset() {
+ if (allElementsBytes.size() == 0) {
+ return 0L;
}
+ return totalSize / allElementsBytes.size();
+ }
+ }
- @Override
- public boolean start() throws IOException {
- return advance();
- }
+ private static class BytesReader<T> extends OffsetBasedReader<T> {
+ private int index;
+ /**
+ * Use an optional to distinguish between null next element (as Optional.absent()) and no next
+ * element (next is null).
+ */
+ @Nullable private Optional<T> next;
- @Override
- public boolean advance() throws IOException {
- boolean hasNext = iter.hasNext();
- if (hasNext) {
- next = Optional.fromNullable(CoderUtils.decodeFromByteArray(coder, iter.next()));
- } else {
- next = null;
- }
- return hasNext;
- }
+ public BytesReader(InMemorySource<T> source) {
+ super(source);
+ index = -1;
+ }
- @Override
- @Nullable
- public T getCurrent() throws NoSuchElementException {
- if (next == null) {
- throw new NoSuchElementException();
- }
- return next.orNull();
+ @Override
+ @Nullable
+ public T getCurrent() throws NoSuchElementException {
+ if (next == null) {
+ throw new NoSuchElementException();
}
+ return next.orNull();
+ }
+
+ @Override
+ public void close() throws IOException {}
+
+ @Override
+ protected long getCurrentOffset() {
+ return index;
+ }
- @Override
- public void close() throws IOException {}
+ @Override
+ protected boolean startImpl() throws IOException {
+ return advanceImpl();
+ }
+
+ @Override
+ public synchronized InMemorySource<T> getCurrentSource() {
+ return (InMemorySource<T>) super.getCurrentSource();
+ }
+
+ @Override
+ protected boolean advanceImpl() throws IOException {
+ InMemorySource<T> source = getCurrentSource();
+ index++;
+ if (index >= source.allElementsBytes.size()) {
+ return false;
+ }
+ next =
+ Optional.fromNullable(
+ CoderUtils.decodeFromByteArray(
+ source.coder, source.allElementsBytes.get(index)));
+ return true;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/788611d6/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreateTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreateTest.java
index 227c388..1956b20 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreateTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreateTest.java
@@ -24,11 +24,11 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.coders.NullableCoder;
-import com.google.cloud.dataflow.sdk.coders.StandardCoder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
import com.google.cloud.dataflow.sdk.io.BoundedSource;
@@ -36,6 +36,7 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessCreate.InMemorySource;
import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
import com.google.cloud.dataflow.sdk.testing.SourceTestUtils;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
import com.google.cloud.dataflow.sdk.transforms.Create;
@@ -46,6 +47,7 @@ import com.google.common.collect.ImmutableList;
import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -54,8 +56,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
-import java.util.Collections;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Random;
/**
* Tests for {@link InProcessCreate}.
@@ -66,6 +69,7 @@ public class InProcessCreateTest {
public ExpectedException thrown = ExpectedException.none();
@Test
+ @Category(RunnableOnService.class)
public void testConvertsCreate() {
TestPipeline p = TestPipeline.create();
Create.Values<Integer> og = Create.of(1, 2, 3);
@@ -78,6 +82,7 @@ public class InProcessCreateTest {
}
@Test
+ @Category(RunnableOnService.class)
public void testConvertsCreateWithNullElements() {
Create.Values<String> og =
Create.<String>of("foo", null, "spam", "ham", null, "eggs")
@@ -133,7 +138,7 @@ public class InProcessCreateTest {
return myString.equals(((UnserializableRecord) o).myString);
}
- static class UnserializableRecordCoder extends StandardCoder<UnserializableRecord> {
+ static class UnserializableRecordCoder extends AtomicCoder<UnserializableRecord> {
private final Coder<String> stringCoder = StringUtf8Coder.of();
@Override
@@ -151,17 +156,28 @@ public class InProcessCreateTest {
throws CoderException, IOException {
return new UnserializableRecord(stringCoder.decode(inStream, context.nested()));
}
+ }
+ }
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return Collections.emptyList();
- }
+ @Test
+ @Category(RunnableOnService.class)
+ public void testConvertsUnserializableElements() throws Exception {
+ List<UnserializableRecord> elements =
+ ImmutableList.of(
+ new UnserializableRecord("foo"),
+ new UnserializableRecord("bar"),
+ new UnserializableRecord("baz"));
+ InProcessCreate<UnserializableRecord> create =
+ InProcessCreate.from(
+ Create.of(elements).withCoder(new UnserializableRecord.UnserializableRecordCoder()));
- @Override
- public void verifyDeterministic() throws Coder.NonDeterministicException {
- stringCoder.verifyDeterministic();
- }
- }
+ TestPipeline p = TestPipeline.create();
+ DataflowAssert.that(p.apply(create))
+ .containsInAnyOrder(
+ new UnserializableRecord("foo"),
+ new UnserializableRecord("bar"),
+ new UnserializableRecord("baz"));
+ p.run();
}
@Test
@@ -172,14 +188,15 @@ public class InProcessCreateTest {
new UnserializableRecord("bar"),
new UnserializableRecord("baz"));
InMemorySource<UnserializableRecord> source =
- new InMemorySource<>(elements, new UnserializableRecord.UnserializableRecordCoder());
+ InMemorySource.fromIterable(elements, new UnserializableRecord.UnserializableRecordCoder());
SerializableUtils.ensureSerializable(source);
}
@Test
public void testSplitIntoBundles() throws Exception {
InProcessCreate.InMemorySource<Integer> source =
- new InMemorySource<>(ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), BigEndianIntegerCoder.of());
+ InMemorySource.fromIterable(
+ ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), BigEndianIntegerCoder.of());
PipelineOptions options = PipelineOptionsFactory.create();
List<? extends BoundedSource<Integer>> splitSources = source.splitIntoBundles(12, options);
assertThat(splitSources, hasSize(3));
@@ -189,7 +206,7 @@ public class InProcessCreateTest {
@Test
public void testDoesNotProduceSortedKeys() throws Exception {
InProcessCreate.InMemorySource<String> source =
- new InMemorySource<>(ImmutableList.of("spam", "ham", "eggs"), StringUtf8Coder.of());
+ InMemorySource.fromIterable(ImmutableList.of("spam", "ham", "eggs"), StringUtf8Coder.of());
assertThat(source.producesSortedKeys(PipelineOptionsFactory.create()), is(false));
}
@@ -197,9 +214,22 @@ public class InProcessCreateTest {
public void testGetDefaultOutputCoderReturnsConstructorCoder() throws Exception {
Coder<Integer> coder = VarIntCoder.of();
InProcessCreate.InMemorySource<Integer> source =
- new InMemorySource<>(ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), coder);
+ InMemorySource.fromIterable(ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), coder);
Coder<Integer> defaultCoder = source.getDefaultOutputCoder();
assertThat(defaultCoder, equalTo(coder));
}
+
+ @Test
+ public void testSplitAtFraction() throws Exception {
+ List<Integer> elements = new ArrayList<>();
+ Random random = new Random();
+ for (int i = 0; i < 25; i++) {
+ elements.add(random.nextInt());
+ }
+ InProcessCreate.InMemorySource<Integer> source =
+ InMemorySource.fromIterable(elements, VarIntCoder.of());
+
+ SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create());
+ }
}