You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/16 18:12:03 UTC
[1/2] incubator-beam git commit: Exercise Dynamic Splitting in the
DirectRunner
Repository: incubator-beam
Updated Branches:
refs/heads/master dc94dbdd7 -> 938ac91a9
Exercise Dynamic Splitting in the DirectRunner
For sources that are above a certain size, the DirectRunner will run a
Thread which will split off half of the remaining work. This exercises
the concurrent behavior for splitAtFraction and getFractionConsumed
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fc80ba54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fc80ba54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fc80ba54
Branch: refs/heads/master
Commit: fc80ba542a7c90221e836fb90a9c3ad984b51670
Parents: dc94dbd
Author: Thomas Groh <tg...@google.com>
Authored: Tue Nov 1 17:23:03 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Nov 16 09:38:35 2016 -0800
----------------------------------------------------------------------
.../direct/BoundedReadEvaluatorFactory.java | 82 ++++++++-
.../runners/direct/StepTransformResult.java | 6 +
.../direct/BoundedReadEvaluatorFactoryTest.java | 184 +++++++++++++++++--
3 files changed, 247 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc80ba54/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index add1e8a..8becb91 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -18,10 +18,17 @@
package org.apache.beam.runners.direct;
import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
@@ -33,22 +40,35 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators}
* for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
*/
final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
- private static final Logger LOG = LoggerFactory.getLogger(BoundedReadEvaluatorFactory.class);
+ /**
+ * The required minimum size of a source to dynamically split. Produced {@link TransformEvaluator
+ * TransformEvaluators} will attempt to dynamically split all sources larger than the minimum
+ * dynamic split size.
+ */
+ private static final long REQUIRED_DYNAMIC_SPLIT_ORIGINAL_SIZE = 0;
private final EvaluationContext evaluationContext;
+ private final ExecutorService executor = Executors.newCachedThreadPool();
+
+ private final long minimumDynamicSplitSize;
BoundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
+ this(evaluationContext, REQUIRED_DYNAMIC_SPLIT_ORIGINAL_SIZE);
+ }
+
+ @VisibleForTesting
+ BoundedReadEvaluatorFactory(EvaluationContext evaluationContext, long minimumDynamicSplitSize) {
this.evaluationContext = evaluationContext;
+ this.minimumDynamicSplitSize = minimumDynamicSplitSize;
}
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -61,7 +81,8 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
private <OutputT> TransformEvaluator<?> createEvaluator(
final AppliedPTransform<?, PCollection<OutputT>, ?> transform) {
- return new BoundedReadEvaluator<>(transform, evaluationContext);
+ return new BoundedReadEvaluator<>(
+ transform, evaluationContext, minimumDynamicSplitSize, executor);
}
@Override
@@ -82,21 +103,29 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
private final EvaluationContext evaluationContext;
private Builder resultBuilder;
+ private final long minimumDynamicSplitSize;
+ private final ExecutorService produceSplitExecutor;
+
public BoundedReadEvaluator(
AppliedPTransform<?, PCollection<OutputT>, ?> transform,
- EvaluationContext evaluationContext) {
+ EvaluationContext evaluationContext,
+ long minimumDynamicSplitSize,
+ ExecutorService executor) {
this.transform = transform;
this.evaluationContext = evaluationContext;
resultBuilder = StepTransformResult.withoutHold(transform);
+ this.minimumDynamicSplitSize = minimumDynamicSplitSize;
+ this.produceSplitExecutor = executor;
}
@Override
public void processElement(WindowedValue<BoundedSourceShard<OutputT>> element)
- throws IOException {
+ throws Exception {
BoundedSource<OutputT> source = element.getValue().getSource();
try (final BoundedReader<OutputT> reader =
source.createReader(evaluationContext.getPipelineOptions())) {
boolean contentsRemaining = reader.start();
+ Future<BoundedSource<OutputT>> residualFuture = startDynamicSplitThread(source, reader);
UncommittedBundle<OutputT> output = evaluationContext.createBundle(transform.getOutput());
while (contentsRemaining) {
output.add(
@@ -105,6 +134,28 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
contentsRemaining = reader.advance();
}
resultBuilder.addOutput(output);
+ try {
+ BoundedSource<OutputT> residual = residualFuture.get();
+ if (residual != null) {
+ resultBuilder.addUnprocessedElements(
+ element.withValue(BoundedSourceShard.of(residual)));
+ }
+ } catch (ExecutionException exex) {
+ // Un-and-rewrap the exception thrown by attempting to split
+ throw UserCodeException.wrap(exex.getCause());
+ }
+ }
+ }
+
+ private Future<BoundedSource<OutputT>> startDynamicSplitThread(
+ BoundedSource<OutputT> source, BoundedReader<OutputT> reader) throws Exception {
+ if (source.getEstimatedSizeBytes(evaluationContext.getPipelineOptions())
+ > minimumDynamicSplitSize) {
+ return produceSplitExecutor.submit(new GenerateSplitAtHalfwayPoint<>(reader));
+ } else {
+ SettableFuture<BoundedSource<OutputT>> emptyFuture = SettableFuture.create();
+ emptyFuture.set(null);
+ return emptyFuture;
}
}
@@ -159,4 +210,23 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
return shards.build();
}
}
+
+ private static class GenerateSplitAtHalfwayPoint<T> implements Callable<BoundedSource<T>> {
+ private final BoundedReader<T> reader;
+
+ private GenerateSplitAtHalfwayPoint(BoundedReader<T> reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public BoundedSource<T> call() throws Exception {
+ // Splits at halfway of the remaining work.
+ Double currentlyConsumed = reader.getFractionConsumed();
+ if (currentlyConsumed == null || currentlyConsumed == 1.0) {
+ return null;
+ }
+ double halfwayBetweenCurrentAndCompletion = 0.5 + (currentlyConsumed / 2);
+ return reader.splitAtFraction(halfwayBetweenCurrentAndCompletion);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc80ba54/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index 989109f..5719e44 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Set;
@@ -117,6 +118,11 @@ public abstract class StepTransformResult implements TransformResult {
return this;
}
+ public Builder addUnprocessedElements(WindowedValue<?>... unprocessed) {
+ unprocessedElementsBuilder.addAll(Arrays.asList(unprocessed));
+ return this;
+ }
+
public Builder addUnprocessedElements(Iterable<? extends WindowedValue<?>> unprocessed) {
unprocessedElementsBuilder.addAll(unprocessed);
return this;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc80ba54/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index 8a76a53..9d8503a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -21,6 +21,7 @@ import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
@@ -36,14 +37,17 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.concurrent.CountDownLatch;
import org.apache.beam.runners.direct.BoundedReadEvaluatorFactory.BoundedSourceShard;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.OffsetBasedSource;
+import org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -83,7 +87,9 @@ public class BoundedReadEvaluatorFactoryTest {
TestPipeline p = TestPipeline.create();
longs = p.apply(Read.from(source));
- factory = new BoundedReadEvaluatorFactory(context);
+ factory =
+ new BoundedReadEvaluatorFactory(
+ context, Long.MAX_VALUE /* minimum size for dynamic splits */);
bundleFactory = ImmutableListBundleFactory.create();
}
@@ -123,6 +129,108 @@ public class BoundedReadEvaluatorFactoryTest {
}
@Test
+ public void boundedSourceEvaluatorProducesDynamicSplits() throws Exception {
+ BoundedReadEvaluatorFactory factory = new BoundedReadEvaluatorFactory(context, 0L);
+
+ when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
+ int numElements = 10;
+ Long[] elems = new Long[numElements];
+ for (int i = 0; i < numElements; i++) {
+ elems[i] = (long) i;
+ }
+ PCollection<Long> read =
+ TestPipeline.create().apply(Read.from(new TestSource<>(VarLongCoder.of(), 5, elems)));
+ AppliedPTransform<?, ?, ?> transform = read.getProducingTransformInternal();
+ Collection<CommittedBundle<?>> unreadInputs =
+ new BoundedReadEvaluatorFactory.InputProvider(context)
+ .getInitialInputs(transform,
+ 1);
+
+ Collection<WindowedValue<?>> outputs = new ArrayList<>();
+ int numIterations = 0;
+ while (!unreadInputs.isEmpty()) {
+ numIterations++;
+ UncommittedBundle<Long> outputBundle = bundleFactory.createBundle(read);
+ when(context.createBundle(read)).thenReturn(outputBundle);
+
+ Collection<CommittedBundle<?>> newUnreadInputs = new ArrayList<>();
+ for (CommittedBundle<?> shardBundle : unreadInputs) {
+ TransformEvaluator<?> evaluator =
+ factory.forApplication(transform, null);
+ for (WindowedValue<?> shard : shardBundle.getElements()) {
+ evaluator.processElement((WindowedValue) shard);
+ }
+ TransformResult result = evaluator.finishBundle();
+ assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+ assertThat(
+ Iterables.size(result.getOutputBundles()),
+ equalTo(Iterables.size(shardBundle.getElements())));
+ for (UncommittedBundle<?> output : result.getOutputBundles()) {
+ CommittedBundle<?> committed = output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ for (WindowedValue<?> val : committed.getElements()) {
+ outputs.add(val);
+ }
+ }
+ if (!Iterables.isEmpty(result.getUnprocessedElements())) {
+ newUnreadInputs.add(shardBundle.withElements((Iterable) result.getUnprocessedElements()));
+ }
+ }
+ unreadInputs = newUnreadInputs;
+ }
+
+ // We produced at least one split before we read 1000 elements, as we will attempt to split as
+ // quickly as possible.
+ assertThat(numIterations, greaterThan(1));
+ WindowedValue[] expectedValues = new WindowedValue[numElements];
+ for (long i = 0L; i < numElements; i++) {
+ expectedValues[(int) i] = gw(i);
+ }
+ assertThat(outputs, Matchers.<WindowedValue<?>>containsInAnyOrder(expectedValues));
+ }
+
+ @Test
+ public void boundedSourceEvaluatorDynamicSplitsUnsplittable() throws Exception {
+ BoundedReadEvaluatorFactory factory = new BoundedReadEvaluatorFactory(context, 0L);
+
+ PCollection<Long> read =
+ TestPipeline.create()
+ .apply(Read.from(SourceTestUtils.toUnsplittableSource(CountingSource.upTo(10L))));
+ AppliedPTransform<?, ?, ?> transform = read.getProducingTransformInternal();
+
+ when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
+ when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
+ Collection<CommittedBundle<?>> initialInputs =
+ new BoundedReadEvaluatorFactory.InputProvider(context).getInitialInputs(transform, 1);
+
+ UncommittedBundle<Long> outputBundle = bundleFactory.createBundle(read);
+ when(context.createBundle(read)).thenReturn(outputBundle);
+ List<WindowedValue<?>> outputs = new ArrayList<>();
+ for (CommittedBundle<?> shardBundle : initialInputs) {
+ TransformEvaluator<?> evaluator =
+ factory.forApplication(transform, null);
+ for (WindowedValue<?> shard : shardBundle.getElements()) {
+ evaluator.processElement((WindowedValue) shard);
+ }
+ TransformResult result = evaluator.finishBundle();
+ assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+ assertThat(
+ Iterables.size(result.getOutputBundles()),
+ equalTo(Iterables.size(shardBundle.getElements())));
+ for (UncommittedBundle<?> output : result.getOutputBundles()) {
+ CommittedBundle<?> committed = output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ for (WindowedValue<?> val : committed.getElements()) {
+ outputs.add(val);
+ }
+ }
+ }
+
+ assertThat(
+ outputs,
+ Matchers.<WindowedValue<?>>containsInAnyOrder(
+ gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
+ }
+
+ @Test
public void getInitialInputsSplitsIntoBundles() throws Exception {
when(context.createRootBundle())
.thenAnswer(
@@ -231,26 +339,37 @@ public class BoundedReadEvaluatorFactoryTest {
assertThat(TestSource.readerClosed, is(true));
}
- private static class TestSource<T> extends BoundedSource<T> {
+ private static class TestSource<T> extends OffsetBasedSource<T> {
private static boolean readerClosed;
private final Coder<T> coder;
private final T[] elems;
+ private final int awaitSplitIndex;
+
+ private transient CountDownLatch subrangesCompleted;
public TestSource(Coder<T> coder, T... elems) {
+ this(coder, elems.length, elems);
+ }
+
+ public TestSource(Coder<T> coder, int awaitSplitIndex, T... elems) {
+ super(0L, elems.length, 1L);
this.elems = elems;
this.coder = coder;
+ this.awaitSplitIndex = awaitSplitIndex;
readerClosed = false;
+
+ subrangesCompleted = new CountDownLatch(2);
}
@Override
- public List<? extends BoundedSource<T>> splitIntoBundles(
+ public List<? extends OffsetBasedSource<T>> splitIntoBundles(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
return ImmutableList.of(this);
}
@Override
public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
- return 0;
+ return elems.length;
}
@Override
@@ -260,7 +379,8 @@ public class BoundedReadEvaluatorFactoryTest {
@Override
public BoundedSource.BoundedReader<T> createReader(PipelineOptions options) throws IOException {
- return new TestReader<>(this, elems);
+ subrangesCompleted = new CountDownLatch(2);
+ return new TestReader<>(this, awaitSplitIndex, subrangesCompleted);
}
@Override
@@ -268,35 +388,61 @@ public class BoundedReadEvaluatorFactoryTest {
}
@Override
+ public long getMaxEndOffset(PipelineOptions options) throws Exception {
+ return elems.length;
+ }
+
+ @Override
+ public OffsetBasedSource<T> createSourceForSubrange(long start, long end) {
+ subrangesCompleted.countDown();
+ return new TestSource<>(coder, Arrays.copyOfRange(elems, (int) start, (int) end));
+ }
+
+ @Override
public Coder<T> getDefaultOutputCoder() {
return coder;
}
}
- private static class TestReader<T> extends BoundedReader<T> {
- private final BoundedSource<T> source;
- private final List<T> elems;
+ private static class TestReader<T> extends OffsetBasedReader<T> {
+ private final int sleepIndex;
+ private final CountDownLatch dynamicallySplit;
+
private int index;
- public TestReader(BoundedSource<T> source, T... elems) {
- this.source = source;
- this.elems = Arrays.asList(elems);
+ TestReader(OffsetBasedSource<T> source, int sleepIndex, CountDownLatch dynamicallySplit) {
+ super(source);
+ this.sleepIndex = sleepIndex;
+ this.dynamicallySplit = dynamicallySplit;
this.index = -1;
}
@Override
- public BoundedSource<T> getCurrentSource() {
- return source;
+ public TestSource<T> getCurrentSource() {
+ return (TestSource<T>) super.getCurrentSource();
}
@Override
- public boolean start() throws IOException {
- return advance();
+ protected long getCurrentOffset() throws NoSuchElementException {
+ return (long) index;
}
@Override
- public boolean advance() throws IOException {
- if (elems.size() > index + 1) {
+ public boolean startImpl() throws IOException {
+ return advanceImpl();
+ }
+
+ @Override
+ public boolean advanceImpl() throws IOException {
+ if (index == sleepIndex) {
+ try {
+ dynamicallySplit.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ }
+ }
+ if (getCurrentSource().elems.length > index + 1) {
index++;
return true;
}
@@ -305,7 +451,7 @@ public class BoundedReadEvaluatorFactoryTest {
@Override
public T getCurrent() throws NoSuchElementException {
- return elems.get(index);
+ return getCurrentSource().elems[index];
}
@Override
[2/2] incubator-beam git commit: This closes #1254
Posted by tg...@apache.org.
This closes #1254
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/938ac91a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/938ac91a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/938ac91a
Branch: refs/heads/master
Commit: 938ac91a91eb886b013e9e54899a6afd07fe3d9e
Parents: dc94dbd fc80ba5
Author: Thomas Groh <tg...@google.com>
Authored: Wed Nov 16 09:38:36 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Nov 16 09:38:36 2016 -0800
----------------------------------------------------------------------
.../direct/BoundedReadEvaluatorFactory.java | 82 ++++++++-
.../runners/direct/StepTransformResult.java | 6 +
.../direct/BoundedReadEvaluatorFactoryTest.java | 184 +++++++++++++++++--
3 files changed, 247 insertions(+), 25 deletions(-)
----------------------------------------------------------------------