You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/06 23:25:39 UTC

[2/2] incubator-beam git commit: Implement InProcessCreate with an OffsetBasedSource

Implement InProcessCreate with an OffsetBasedSource

This allows dynamic splits to be exercised in the create.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/788611d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/788611d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/788611d6

Branch: refs/heads/master
Commit: 788611d6997f297f2e795dd6dda88e29fe055368
Parents: 15a8334
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 18 11:00:20 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Apr 6 14:25:09 2016 -0700

----------------------------------------------------------------------
 .../sdk/runners/inprocess/InProcessCreate.java  | 147 ++++++++++---------
 .../runners/inprocess/InProcessCreateTest.java  |  62 ++++++--
 2 files changed, 123 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/788611d6/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
index efda8fc..4dbc34d 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
@@ -21,6 +21,8 @@ import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
 import com.google.cloud.dataflow.sdk.coders.Coder;
 import com.google.cloud.dataflow.sdk.coders.CoderException;
 import com.google.cloud.dataflow.sdk.io.BoundedSource;
+import com.google.cloud.dataflow.sdk.io.OffsetBasedSource;
+import com.google.cloud.dataflow.sdk.io.OffsetBasedSource.OffsetBasedReader;
 import com.google.cloud.dataflow.sdk.io.Read;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.transforms.Create;
@@ -32,12 +34,8 @@ import com.google.cloud.dataflow.sdk.values.PInput;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.PeekingIterator;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.NoSuchElementException;
 
@@ -74,7 +72,7 @@ class InProcessCreate<T> extends ForwardingPTransform<PInput, PCollection<T>> {
     }
     InMemorySource<T> source;
     try {
-      source = new InMemorySource<>(original.getElements(), elementCoder);
+      source = InMemorySource.fromIterable(original.getElements(), elementCoder);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -89,56 +87,35 @@ class InProcessCreate<T> extends ForwardingPTransform<PInput, PCollection<T>> {
   }
 
   @VisibleForTesting
-  static class InMemorySource<T> extends BoundedSource<T> {
-    private final Collection<byte[]> allElementsBytes;
+  static class InMemorySource<T> extends OffsetBasedSource<T> {
+    private final List<byte[]> allElementsBytes;
     private final long totalSize;
     private final Coder<T> coder;
 
-    public InMemorySource(Iterable<T> elements, Coder<T> elemCoder)
+    public static <T> InMemorySource<T> fromIterable(Iterable<T> elements, Coder<T> elemCoder)
         throws CoderException, IOException {
-      allElementsBytes = new ArrayList<>();
+      ImmutableList.Builder<byte[]> allElementsBytes = ImmutableList.builder();
       long totalSize = 0L;
       for (T element : elements) {
         byte[] bytes = CoderUtils.encodeToByteArray(elemCoder, element);
         allElementsBytes.add(bytes);
         totalSize += bytes.length;
       }
-      this.totalSize = totalSize;
-      this.coder = elemCoder;
+      return new InMemorySource<>(allElementsBytes.build(), totalSize, elemCoder);
     }
 
     /**
      * Create a new source with the specified bytes. The new source owns the input element bytes,
      * which must not be modified after this constructor is called.
      */
-    private InMemorySource(Collection<byte[]> elementBytes, long totalSize, Coder<T> coder) {
+    private InMemorySource(List<byte[]> elementBytes, long totalSize, Coder<T> coder) {
+      super(0, elementBytes.size(), 1);
       this.allElementsBytes = ImmutableList.copyOf(elementBytes);
       this.totalSize = totalSize;
       this.coder = coder;
     }
 
     @Override
-    public List<? extends BoundedSource<T>> splitIntoBundles(
-        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-      ImmutableList.Builder<InMemorySource<T>> resultBuilder = ImmutableList.builder();
-      long currentSourceSize = 0L;
-      List<byte[]> currentElems = new ArrayList<>();
-      for (byte[] elemBytes : allElementsBytes) {
-        currentElems.add(elemBytes);
-        currentSourceSize += elemBytes.length;
-        if (currentSourceSize >= desiredBundleSizeBytes) {
-          resultBuilder.add(new InMemorySource<>(currentElems, currentSourceSize, coder));
-          currentElems.clear();
-          currentSourceSize = 0L;
-        }
-      }
-      if (!currentElems.isEmpty()) {
-        resultBuilder.add(new InMemorySource<>(currentElems, currentSourceSize, coder));
-      }
-      return resultBuilder.build();
-    }
-
-    @Override
     public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
       return totalSize;
     }
@@ -150,7 +127,7 @@ class InProcessCreate<T> extends ForwardingPTransform<PInput, PCollection<T>> {
 
     @Override
     public BoundedSource.BoundedReader<T> createReader(PipelineOptions options) throws IOException {
-      return new BytesReader();
+      return new BytesReader<>(this);
     }
 
     @Override
@@ -161,50 +138,80 @@ class InProcessCreate<T> extends ForwardingPTransform<PInput, PCollection<T>> {
       return coder;
     }
 
-    private class BytesReader extends BoundedReader<T> {
-      private final PeekingIterator<byte[]> iter;
-      /**
-       * Use an optional to distinguish between null next element (as Optional.absent()) and no next
-       * element (next is null).
-       */
-      @Nullable private Optional<T> next;
+    @Override
+    public long getMaxEndOffset(PipelineOptions options) throws Exception {
+      return allElementsBytes.size();
+    }
 
-      public BytesReader() {
-        this.iter = Iterators.peekingIterator(allElementsBytes.iterator());
-      }
+    @Override
+    public OffsetBasedSource<T> createSourceForSubrange(long start, long end) {
+      List<byte[]> primaryElems = allElementsBytes.subList((int) start, (int) end);
+      long primarySizeEstimate =
+          (long) (totalSize * primaryElems.size() / (double) allElementsBytes.size());
+      return new InMemorySource<>(primaryElems, primarySizeEstimate, coder);
+    }
 
-      @Override
-      public BoundedSource<T> getCurrentSource() {
-        return InMemorySource.this;
+    @Override
+    public long getBytesPerOffset() {
+      if (allElementsBytes.size() == 0) {
+        return 0L;
       }
+      return totalSize / allElementsBytes.size();
+    }
+  }
 
-      @Override
-      public boolean start() throws IOException {
-        return advance();
-      }
+  private static class BytesReader<T> extends OffsetBasedReader<T> {
+    private int index;
+    /**
+     * Use an optional to distinguish between null next element (as Optional.absent()) and no next
+     * element (next is null).
+     */
+    @Nullable private Optional<T> next;
 
-      @Override
-      public boolean advance() throws IOException {
-        boolean hasNext = iter.hasNext();
-        if (hasNext) {
-          next = Optional.fromNullable(CoderUtils.decodeFromByteArray(coder, iter.next()));
-        } else {
-          next = null;
-        }
-        return hasNext;
-      }
+    public BytesReader(InMemorySource<T> source) {
+      super(source);
+      index = -1;
+    }
 
-      @Override
-      @Nullable
-      public T getCurrent() throws NoSuchElementException {
-        if (next == null) {
-          throw new NoSuchElementException();
-        }
-        return next.orNull();
+    @Override
+    @Nullable
+    public T getCurrent() throws NoSuchElementException {
+      if (next == null) {
+        throw new NoSuchElementException();
       }
+      return next.orNull();
+    }
+
+    @Override
+    public void close() throws IOException {}
+
+    @Override
+    protected long getCurrentOffset() {
+      return index;
+    }
 
-      @Override
-      public void close() throws IOException {}
+    @Override
+    protected boolean startImpl() throws IOException {
+      return advanceImpl();
+    }
+
+    @Override
+    public synchronized InMemorySource<T> getCurrentSource() {
+      return (InMemorySource<T>) super.getCurrentSource();
+    }
+
+    @Override
+    protected boolean advanceImpl() throws IOException {
+      InMemorySource<T> source = getCurrentSource();
+      index++;
+      if (index >= source.allElementsBytes.size()) {
+        return false;
+      }
+      next =
+          Optional.fromNullable(
+              CoderUtils.decodeFromByteArray(
+                  source.coder, source.allElementsBytes.get(index)));
+      return true;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/788611d6/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreateTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreateTest.java
index 227c388..1956b20 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreateTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreateTest.java
@@ -24,11 +24,11 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
 import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder;
 import com.google.cloud.dataflow.sdk.coders.Coder;
 import com.google.cloud.dataflow.sdk.coders.CoderException;
 import com.google.cloud.dataflow.sdk.coders.NullableCoder;
-import com.google.cloud.dataflow.sdk.coders.StandardCoder;
 import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
 import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
 import com.google.cloud.dataflow.sdk.io.BoundedSource;
@@ -36,6 +36,7 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessCreate.InMemorySource;
 import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
 import com.google.cloud.dataflow.sdk.testing.SourceTestUtils;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.Create;
@@ -46,6 +47,7 @@ import com.google.common.collect.ImmutableList;
 import org.hamcrest.Matchers;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -54,8 +56,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 
 /**
  * Tests for {@link InProcessCreate}.
@@ -66,6 +69,7 @@ public class InProcessCreateTest {
   public ExpectedException thrown = ExpectedException.none();
 
   @Test
+  @Category(RunnableOnService.class)
   public void testConvertsCreate() {
     TestPipeline p = TestPipeline.create();
     Create.Values<Integer> og = Create.of(1, 2, 3);
@@ -78,6 +82,7 @@ public class InProcessCreateTest {
   }
 
   @Test
+  @Category(RunnableOnService.class)
   public void testConvertsCreateWithNullElements() {
     Create.Values<String> og =
         Create.<String>of("foo", null, "spam", "ham", null, "eggs")
@@ -133,7 +138,7 @@ public class InProcessCreateTest {
       return myString.equals(((UnserializableRecord) o).myString);
     }
 
-    static class UnserializableRecordCoder extends StandardCoder<UnserializableRecord> {
+    static class UnserializableRecordCoder extends AtomicCoder<UnserializableRecord> {
       private final Coder<String> stringCoder = StringUtf8Coder.of();
 
       @Override
@@ -151,17 +156,28 @@ public class InProcessCreateTest {
           throws CoderException, IOException {
         return new UnserializableRecord(stringCoder.decode(inStream, context.nested()));
       }
+    }
+  }
 
-      @Override
-      public List<? extends Coder<?>> getCoderArguments() {
-        return Collections.emptyList();
-      }
+  @Test
+  @Category(RunnableOnService.class)
+  public void testConvertsUnserializableElements() throws Exception {
+    List<UnserializableRecord> elements =
+        ImmutableList.of(
+            new UnserializableRecord("foo"),
+            new UnserializableRecord("bar"),
+            new UnserializableRecord("baz"));
+    InProcessCreate<UnserializableRecord> create =
+        InProcessCreate.from(
+            Create.of(elements).withCoder(new UnserializableRecord.UnserializableRecordCoder()));
 
-      @Override
-      public void verifyDeterministic() throws Coder.NonDeterministicException {
-        stringCoder.verifyDeterministic();
-      }
-    }
+    TestPipeline p = TestPipeline.create();
+    DataflowAssert.that(p.apply(create))
+        .containsInAnyOrder(
+            new UnserializableRecord("foo"),
+            new UnserializableRecord("bar"),
+            new UnserializableRecord("baz"));
+    p.run();
   }
 
   @Test
@@ -172,14 +188,15 @@ public class InProcessCreateTest {
             new UnserializableRecord("bar"),
             new UnserializableRecord("baz"));
     InMemorySource<UnserializableRecord> source =
-        new InMemorySource<>(elements, new UnserializableRecord.UnserializableRecordCoder());
+        InMemorySource.fromIterable(elements, new UnserializableRecord.UnserializableRecordCoder());
     SerializableUtils.ensureSerializable(source);
   }
 
   @Test
   public void testSplitIntoBundles() throws Exception {
     InProcessCreate.InMemorySource<Integer> source =
-        new InMemorySource<>(ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), BigEndianIntegerCoder.of());
+        InMemorySource.fromIterable(
+            ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), BigEndianIntegerCoder.of());
     PipelineOptions options = PipelineOptionsFactory.create();
     List<? extends BoundedSource<Integer>> splitSources = source.splitIntoBundles(12, options);
     assertThat(splitSources, hasSize(3));
@@ -189,7 +206,7 @@ public class InProcessCreateTest {
   @Test
   public void testDoesNotProduceSortedKeys() throws Exception {
     InProcessCreate.InMemorySource<String> source =
-        new InMemorySource<>(ImmutableList.of("spam", "ham", "eggs"), StringUtf8Coder.of());
+        InMemorySource.fromIterable(ImmutableList.of("spam", "ham", "eggs"), StringUtf8Coder.of());
     assertThat(source.producesSortedKeys(PipelineOptionsFactory.create()), is(false));
   }
 
@@ -197,9 +214,22 @@ public class InProcessCreateTest {
   public void testGetDefaultOutputCoderReturnsConstructorCoder() throws Exception {
     Coder<Integer> coder = VarIntCoder.of();
     InProcessCreate.InMemorySource<Integer> source =
-        new InMemorySource<>(ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), coder);
+        InMemorySource.fromIterable(ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), coder);
 
     Coder<Integer> defaultCoder = source.getDefaultOutputCoder();
     assertThat(defaultCoder, equalTo(coder));
   }
+
+  @Test
+  public void testSplitAtFraction() throws Exception {
+    List<Integer> elements = new ArrayList<>();
+    Random random = new Random();
+    for (int i = 0; i < 25; i++) {
+      elements.add(random.nextInt());
+    }
+    InProcessCreate.InMemorySource<Integer> source =
+        InMemorySource.fromIterable(elements, VarIntCoder.of());
+
+    SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create());
+  }
 }