You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/16 18:12:03 UTC

[1/2] incubator-beam git commit: Exercise Dynamic Splitting in the DirectRunner

Repository: incubator-beam
Updated Branches:
  refs/heads/master dc94dbdd7 -> 938ac91a9


Exercise Dynamic Splitting in the DirectRunner

For sources that are above a certain size, the DirectRunner will run a
Thread which will split off half of the remaining work. This exercises
the concurrent behavior for splitAtFraction and getFractionConsumed


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fc80ba54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fc80ba54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fc80ba54

Branch: refs/heads/master
Commit: fc80ba542a7c90221e836fb90a9c3ad984b51670
Parents: dc94dbd
Author: Thomas Groh <tg...@google.com>
Authored: Tue Nov 1 17:23:03 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Nov 16 09:38:35 2016 -0800

----------------------------------------------------------------------
 .../direct/BoundedReadEvaluatorFactory.java     |  82 ++++++++-
 .../runners/direct/StepTransformResult.java     |   6 +
 .../direct/BoundedReadEvaluatorFactoryTest.java | 184 +++++++++++++++++--
 3 files changed, 247 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc80ba54/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index add1e8a..8becb91 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -18,10 +18,17 @@
 package org.apache.beam.runners.direct;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.SettableFuture;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
@@ -33,22 +40,35 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators}
  * for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
  */
 final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
-  private static final Logger LOG = LoggerFactory.getLogger(BoundedReadEvaluatorFactory.class);
+  /**
+   * The required minimum size of a source to dynamically split. Produced {@link TransformEvaluator
+   * TransformEvaluators} will attempt to dynamically split all sources larger than the minimum
+   * dynamic split size.
+   */
+  private static final long REQUIRED_DYNAMIC_SPLIT_ORIGINAL_SIZE = 0;
   private final EvaluationContext evaluationContext;
+  private final ExecutorService executor = Executors.newCachedThreadPool();
+
+  private final long minimumDynamicSplitSize;
 
   BoundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
+    this(evaluationContext, REQUIRED_DYNAMIC_SPLIT_ORIGINAL_SIZE);
+  }
+
+  @VisibleForTesting
+  BoundedReadEvaluatorFactory(EvaluationContext evaluationContext, long minimumDynamicSplitSize) {
     this.evaluationContext = evaluationContext;
+    this.minimumDynamicSplitSize = minimumDynamicSplitSize;
   }
 
   @SuppressWarnings({"unchecked", "rawtypes"})
@@ -61,7 +81,8 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
 
   private <OutputT> TransformEvaluator<?> createEvaluator(
       final AppliedPTransform<?, PCollection<OutputT>, ?> transform) {
-    return new BoundedReadEvaluator<>(transform, evaluationContext);
+    return new BoundedReadEvaluator<>(
+        transform, evaluationContext, minimumDynamicSplitSize, executor);
   }
 
   @Override
@@ -82,21 +103,29 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
     private final EvaluationContext evaluationContext;
     private Builder resultBuilder;
 
+    private final long minimumDynamicSplitSize;
+    private final ExecutorService produceSplitExecutor;
+
     public BoundedReadEvaluator(
         AppliedPTransform<?, PCollection<OutputT>, ?> transform,
-        EvaluationContext evaluationContext) {
+        EvaluationContext evaluationContext,
+        long minimumDynamicSplitSize,
+        ExecutorService executor) {
       this.transform = transform;
       this.evaluationContext = evaluationContext;
       resultBuilder = StepTransformResult.withoutHold(transform);
+      this.minimumDynamicSplitSize = minimumDynamicSplitSize;
+      this.produceSplitExecutor = executor;
     }
 
     @Override
     public void processElement(WindowedValue<BoundedSourceShard<OutputT>> element)
-        throws IOException {
+        throws Exception {
       BoundedSource<OutputT> source = element.getValue().getSource();
       try (final BoundedReader<OutputT> reader =
           source.createReader(evaluationContext.getPipelineOptions())) {
         boolean contentsRemaining = reader.start();
+        Future<BoundedSource<OutputT>> residualFuture = startDynamicSplitThread(source, reader);
         UncommittedBundle<OutputT> output = evaluationContext.createBundle(transform.getOutput());
         while (contentsRemaining) {
           output.add(
@@ -105,6 +134,28 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
           contentsRemaining = reader.advance();
         }
         resultBuilder.addOutput(output);
+        try {
+          BoundedSource<OutputT> residual = residualFuture.get();
+          if (residual != null) {
+            resultBuilder.addUnprocessedElements(
+                element.withValue(BoundedSourceShard.of(residual)));
+          }
+        } catch (ExecutionException exex) {
+          // Un-and-rewrap the exception thrown by attempting to split
+          throw UserCodeException.wrap(exex.getCause());
+        }
+      }
+    }
+
+    private Future<BoundedSource<OutputT>> startDynamicSplitThread(
+        BoundedSource<OutputT> source, BoundedReader<OutputT> reader) throws Exception {
+      if (source.getEstimatedSizeBytes(evaluationContext.getPipelineOptions())
+          > minimumDynamicSplitSize) {
+        return produceSplitExecutor.submit(new GenerateSplitAtHalfwayPoint<>(reader));
+      } else {
+        SettableFuture<BoundedSource<OutputT>> emptyFuture = SettableFuture.create();
+        emptyFuture.set(null);
+        return emptyFuture;
       }
     }
 
@@ -159,4 +210,23 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
       return shards.build();
     }
   }
+
+  private static class GenerateSplitAtHalfwayPoint<T> implements Callable<BoundedSource<T>> {
+    private final BoundedReader<T> reader;
+
+    private GenerateSplitAtHalfwayPoint(BoundedReader<T> reader) {
+      this.reader = reader;
+    }
+
+    @Override
+    public BoundedSource<T> call() throws Exception {
+      // Splits at halfway of the remaining work.
+      Double currentlyConsumed = reader.getFractionConsumed();
+      if (currentlyConsumed == null || currentlyConsumed == 1.0) {
+        return null;
+      }
+      double halfwayBetweenCurrentAndCompletion = 0.5 + (currentlyConsumed / 2);
+      return reader.splitAtFraction(halfwayBetweenCurrentAndCompletion);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc80ba54/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index 989109f..5719e44 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.Set;
@@ -117,6 +118,11 @@ public abstract class StepTransformResult implements TransformResult {
       return this;
     }
 
+    public Builder addUnprocessedElements(WindowedValue<?>... unprocessed) {
+      unprocessedElementsBuilder.addAll(Arrays.asList(unprocessed));
+      return this;
+    }
+
     public Builder addUnprocessedElements(Iterable<? extends WindowedValue<?>> unprocessed) {
       unprocessedElementsBuilder.addAll(unprocessed);
       return this;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc80ba54/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index 8a76a53..9d8503a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -21,6 +21,7 @@ import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
@@ -36,14 +37,17 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.concurrent.CountDownLatch;
 import org.apache.beam.runners.direct.BoundedReadEvaluatorFactory.BoundedSourceShard;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.OffsetBasedSource;
+import org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -83,7 +87,9 @@ public class BoundedReadEvaluatorFactoryTest {
     TestPipeline p = TestPipeline.create();
     longs = p.apply(Read.from(source));
 
-    factory = new BoundedReadEvaluatorFactory(context);
+    factory =
+        new BoundedReadEvaluatorFactory(
+            context, Long.MAX_VALUE /* minimum size for dynamic splits */);
     bundleFactory = ImmutableListBundleFactory.create();
   }
 
@@ -123,6 +129,108 @@ public class BoundedReadEvaluatorFactoryTest {
   }
 
   @Test
+  public void boundedSourceEvaluatorProducesDynamicSplits() throws Exception {
+    BoundedReadEvaluatorFactory factory = new BoundedReadEvaluatorFactory(context, 0L);
+
+    when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
+    int numElements = 10;
+    Long[] elems = new Long[numElements];
+    for (int i = 0; i < numElements; i++) {
+      elems[i] = (long) i;
+    }
+    PCollection<Long> read =
+        TestPipeline.create().apply(Read.from(new TestSource<>(VarLongCoder.of(), 5, elems)));
+    AppliedPTransform<?, ?, ?> transform = read.getProducingTransformInternal();
+    Collection<CommittedBundle<?>> unreadInputs =
+        new BoundedReadEvaluatorFactory.InputProvider(context)
+            .getInitialInputs(transform,
+                1);
+
+    Collection<WindowedValue<?>> outputs = new ArrayList<>();
+    int numIterations = 0;
+    while (!unreadInputs.isEmpty()) {
+      numIterations++;
+      UncommittedBundle<Long> outputBundle = bundleFactory.createBundle(read);
+      when(context.createBundle(read)).thenReturn(outputBundle);
+
+      Collection<CommittedBundle<?>> newUnreadInputs = new ArrayList<>();
+      for (CommittedBundle<?> shardBundle : unreadInputs) {
+        TransformEvaluator<?> evaluator =
+            factory.forApplication(transform, null);
+        for (WindowedValue<?> shard : shardBundle.getElements()) {
+          evaluator.processElement((WindowedValue) shard);
+        }
+        TransformResult result = evaluator.finishBundle();
+        assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+        assertThat(
+            Iterables.size(result.getOutputBundles()),
+            equalTo(Iterables.size(shardBundle.getElements())));
+        for (UncommittedBundle<?> output : result.getOutputBundles()) {
+          CommittedBundle<?> committed = output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+          for (WindowedValue<?> val : committed.getElements()) {
+            outputs.add(val);
+          }
+        }
+        if (!Iterables.isEmpty(result.getUnprocessedElements())) {
+          newUnreadInputs.add(shardBundle.withElements((Iterable) result.getUnprocessedElements()));
+        }
+      }
+      unreadInputs = newUnreadInputs;
+    }
+
+    // We produced at least one split before we read 1000 elements, as we will attempt to split as
+    // quickly as possible.
+    assertThat(numIterations, greaterThan(1));
+    WindowedValue[] expectedValues = new WindowedValue[numElements];
+    for (long i = 0L; i < numElements; i++) {
+      expectedValues[(int) i] = gw(i);
+    }
+    assertThat(outputs, Matchers.<WindowedValue<?>>containsInAnyOrder(expectedValues));
+  }
+
+  @Test
+  public void boundedSourceEvaluatorDynamicSplitsUnsplittable() throws Exception {
+    BoundedReadEvaluatorFactory factory = new BoundedReadEvaluatorFactory(context, 0L);
+
+    PCollection<Long> read =
+        TestPipeline.create()
+            .apply(Read.from(SourceTestUtils.toUnsplittableSource(CountingSource.upTo(10L))));
+    AppliedPTransform<?, ?, ?> transform = read.getProducingTransformInternal();
+
+    when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
+    when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
+    Collection<CommittedBundle<?>> initialInputs =
+        new BoundedReadEvaluatorFactory.InputProvider(context).getInitialInputs(transform, 1);
+
+    UncommittedBundle<Long> outputBundle = bundleFactory.createBundle(read);
+    when(context.createBundle(read)).thenReturn(outputBundle);
+    List<WindowedValue<?>> outputs = new ArrayList<>();
+    for (CommittedBundle<?> shardBundle : initialInputs) {
+      TransformEvaluator<?> evaluator =
+          factory.forApplication(transform, null);
+      for (WindowedValue<?> shard : shardBundle.getElements()) {
+        evaluator.processElement((WindowedValue) shard);
+      }
+      TransformResult result = evaluator.finishBundle();
+      assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+      assertThat(
+          Iterables.size(result.getOutputBundles()),
+          equalTo(Iterables.size(shardBundle.getElements())));
+      for (UncommittedBundle<?> output : result.getOutputBundles()) {
+        CommittedBundle<?> committed = output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+        for (WindowedValue<?> val : committed.getElements()) {
+          outputs.add(val);
+        }
+      }
+    }
+
+    assertThat(
+        outputs,
+        Matchers.<WindowedValue<?>>containsInAnyOrder(
+            gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
+  }
+
+  @Test
   public void getInitialInputsSplitsIntoBundles() throws Exception {
     when(context.createRootBundle())
         .thenAnswer(
@@ -231,26 +339,37 @@ public class BoundedReadEvaluatorFactoryTest {
     assertThat(TestSource.readerClosed, is(true));
   }
 
-  private static class TestSource<T> extends BoundedSource<T> {
+  private static class TestSource<T> extends OffsetBasedSource<T> {
     private static boolean readerClosed;
     private final Coder<T> coder;
     private final T[] elems;
+    private final int awaitSplitIndex;
+
+    private transient CountDownLatch subrangesCompleted;
 
     public TestSource(Coder<T> coder, T... elems) {
+      this(coder, elems.length, elems);
+    }
+
+    public TestSource(Coder<T> coder, int awaitSplitIndex, T... elems) {
+      super(0L, elems.length, 1L);
       this.elems = elems;
       this.coder = coder;
+      this.awaitSplitIndex = awaitSplitIndex;
       readerClosed = false;
+
+      subrangesCompleted = new CountDownLatch(2);
     }
 
     @Override
-    public List<? extends BoundedSource<T>> splitIntoBundles(
+    public List<? extends OffsetBasedSource<T>> splitIntoBundles(
         long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
       return ImmutableList.of(this);
     }
 
     @Override
     public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-      return 0;
+      return elems.length;
     }
 
     @Override
@@ -260,7 +379,8 @@ public class BoundedReadEvaluatorFactoryTest {
 
     @Override
     public BoundedSource.BoundedReader<T> createReader(PipelineOptions options) throws IOException {
-      return new TestReader<>(this, elems);
+      subrangesCompleted = new CountDownLatch(2);
+      return new TestReader<>(this, awaitSplitIndex, subrangesCompleted);
     }
 
     @Override
@@ -268,35 +388,61 @@ public class BoundedReadEvaluatorFactoryTest {
     }
 
     @Override
+    public long getMaxEndOffset(PipelineOptions options) throws Exception {
+      return elems.length;
+    }
+
+    @Override
+    public OffsetBasedSource<T> createSourceForSubrange(long start, long end) {
+      subrangesCompleted.countDown();
+      return new TestSource<>(coder, Arrays.copyOfRange(elems, (int) start, (int) end));
+    }
+
+    @Override
     public Coder<T> getDefaultOutputCoder() {
       return coder;
     }
   }
 
-  private static class TestReader<T> extends BoundedReader<T> {
-    private final BoundedSource<T> source;
-    private final List<T> elems;
+  private static class TestReader<T> extends OffsetBasedReader<T> {
+    private final int sleepIndex;
+    private final CountDownLatch dynamicallySplit;
+
     private int index;
 
-    public TestReader(BoundedSource<T> source, T... elems) {
-      this.source = source;
-      this.elems = Arrays.asList(elems);
+    TestReader(OffsetBasedSource<T> source, int sleepIndex, CountDownLatch dynamicallySplit) {
+      super(source);
+      this.sleepIndex = sleepIndex;
+      this.dynamicallySplit = dynamicallySplit;
       this.index = -1;
     }
 
     @Override
-    public BoundedSource<T> getCurrentSource() {
-      return source;
+    public TestSource<T> getCurrentSource() {
+      return (TestSource<T>) super.getCurrentSource();
     }
 
     @Override
-    public boolean start() throws IOException {
-      return advance();
+    protected long getCurrentOffset() throws NoSuchElementException {
+      return (long) index;
     }
 
     @Override
-    public boolean advance() throws IOException {
-      if (elems.size() > index + 1) {
+    public boolean startImpl() throws IOException {
+      return advanceImpl();
+    }
+
+    @Override
+    public boolean advanceImpl() throws IOException {
+      if (index == sleepIndex) {
+        try {
+          dynamicallySplit.await();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IOException(e);
+        }
+      }
+      if (getCurrentSource().elems.length > index + 1) {
         index++;
         return true;
       }
@@ -305,7 +451,7 @@ public class BoundedReadEvaluatorFactoryTest {
 
     @Override
     public T getCurrent() throws NoSuchElementException {
-      return elems.get(index);
+      return getCurrentSource().elems[index];
     }
 
     @Override


[2/2] incubator-beam git commit: This closes #1254

Posted by tg...@apache.org.
This closes #1254


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/938ac91a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/938ac91a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/938ac91a

Branch: refs/heads/master
Commit: 938ac91a91eb886b013e9e54899a6afd07fe3d9e
Parents: dc94dbd fc80ba5
Author: Thomas Groh <tg...@google.com>
Authored: Wed Nov 16 09:38:36 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Nov 16 09:38:36 2016 -0800

----------------------------------------------------------------------
 .../direct/BoundedReadEvaluatorFactory.java     |  82 ++++++++-
 .../runners/direct/StepTransformResult.java     |   6 +
 .../direct/BoundedReadEvaluatorFactoryTest.java | 184 +++++++++++++++++--
 3 files changed, 247 insertions(+), 25 deletions(-)
----------------------------------------------------------------------