You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/30 05:45:57 UTC
[1/2] incubator-beam git commit: Copy and use
UnboundedReadFromBoundedSource in dataflow runner
Repository: incubator-beam
Updated Branches:
refs/heads/master e01efbda3 -> 12b6ff8d7
Copy and use UnboundedReadFromBoundedSource in dataflow runner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4f2ac394
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4f2ac394
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4f2ac394
Branch: refs/heads/master
Commit: 4f2ac394c207e0807042013fcab75296d8cf61df
Parents: e01efbd
Author: Pei He <pe...@google.com>
Authored: Mon Jun 27 17:29:14 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jun 29 22:45:44 2016 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 34 +-
.../DataflowUnboundedReadFromBoundedSource.java | 547 +++++++++++++++++++
.../runners/dataflow/DataflowRunnerTest.java | 30 -
3 files changed, 577 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f2ac394/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 70dd94f..2ba6c7b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -30,6 +30,7 @@ import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTran
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
import org.apache.beam.runners.dataflow.internal.AssignWindows;
import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
+import org.apache.beam.runners.dataflow.internal.DataflowUnboundedReadFromBoundedSource;
import org.apache.beam.runners.dataflow.internal.IsmFormat;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
@@ -60,6 +61,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.BigQueryIO;
+import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.PubsubIO;
import org.apache.beam.sdk.io.PubsubUnboundedSink;
@@ -350,11 +352,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
builder.put(View.AsIterable.class, StreamingViewAsIterable.class);
builder.put(Write.Bound.class, StreamingWrite.class);
builder.put(Read.Unbounded.class, StreamingUnboundedRead.class);
- builder.put(Read.Bounded.class, UnsupportedIO.class);
- builder.put(AvroIO.Read.Bound.class, UnsupportedIO.class);
+ builder.put(Read.Bounded.class, StreamingBoundedRead.class);
builder.put(AvroIO.Write.Bound.class, UnsupportedIO.class);
- builder.put(BigQueryIO.Read.Bound.class, UnsupportedIO.class);
- builder.put(TextIO.Read.Bound.class, UnsupportedIO.class);
builder.put(TextIO.Write.Bound.class, UnsupportedIO.class);
builder.put(Window.Bound.class, AssignWindows.class);
// In streaming mode must use either the custom Pubsub unbounded source/sink or
@@ -2366,6 +2365,33 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
/**
+ * Specialized implementation for {@link org.apache.beam.sdk.io.Read.Bounded Read.Bounded} for the
+ * Dataflow runner in streaming mode.
+ */
+ private static class StreamingBoundedRead<T> extends PTransform<PInput, PCollection<T>> {
+ private final BoundedSource<T> source;
+
+ /** Builds an instance of this class from the overridden transform. */
+ @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
+ public StreamingBoundedRead(DataflowRunner runner, Read.Bounded<T> transform) {
+ this.source = transform.getSource();
+ }
+
+ @Override
+ protected Coder<T> getDefaultOutputCoder() {
+ return source.getDefaultOutputCoder();
+ }
+
+ @Override
+ public final PCollection<T> apply(PInput input) {
+ source.validate();
+
+ return Pipeline.applyTransform(input, new DataflowUnboundedReadFromBoundedSource<>(source))
+ .setIsBoundedInternal(IsBounded.BOUNDED);
+ }
+ }
+
+ /**
* Specialized implementation for
* {@link org.apache.beam.sdk.transforms.Create.Values Create.Values} for the
* Dataflow runner in streaming mode.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f2ac394/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
new file mode 100644
index 0000000..5e035bc
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.internal;
+
+import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}.
+ *
+ * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles},
+ * and element timestamps are propagated. While any elements remain, the watermark is the beginning
+ * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced
+ * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
+ *
+ * <p>Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner
+ * {@link BoundedSource}.
+ * Sources that cannot be split are read entirely into memory, so this transform does not work well
+ * with large, unsplittable sources.
+ *
+ * <p>This transform is intended to be used by a runner during pipeline translation to convert
+ * a Read.Bounded into a Read.Unbounded.
+ *
+ * @deprecated This class is copied from beam runners core in order to avoid pipeline construction
+ * time dependency. It should be replaced in the dataflow worker as an execution time dependency.
+ */
+@Deprecated
+public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PInput, PCollection<T>> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DataflowUnboundedReadFromBoundedSource.class);
+
+ private final BoundedSource<T> source;
+
+ /**
+ * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}.
+ */
+ public DataflowUnboundedReadFromBoundedSource(BoundedSource<T> source) {
+ this.source = source;
+ }
+
+ @Override
+ public PCollection<T> apply(PInput input) {
+ return input.getPipeline().apply(
+ Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
+ }
+
+ @Override
+ protected Coder<T> getDefaultOutputCoder() {
+ return source.getDefaultOutputCoder();
+ }
+
+ @Override
+ public String getKindString() {
+ return "Read(" + approximateSimpleName(source.getClass()) + ")";
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ // We explicitly do not register base-class data, instead we use the delegate inner source.
+ builder
+ .add(DisplayData.item("source", source.getClass()))
+ .include(source);
+ }
+
+ /**
+ * A {@code BoundedSource} to {@code UnboundedSource} adapter.
+ */
+ @VisibleForTesting
+ public static class BoundedToUnboundedSourceAdapter<T>
+ extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>> {
+
+ private BoundedSource<T> boundedSource;
+
+ public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) {
+ this.boundedSource = boundedSource;
+ }
+
+ @Override
+ public void validate() {
+ boundedSource.validate();
+ }
+
+ @Override
+ public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits(
+ int desiredNumSplits, PipelineOptions options) throws Exception {
+ try {
+ long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits;
+ if (desiredBundleSize <= 0) {
+ LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.",
+ boundedSource);
+ return ImmutableList.of(this);
+ }
+ List<? extends BoundedSource<T>> splits
+ = boundedSource.splitIntoBundles(desiredBundleSize, options);
+ if (splits == null) {
+ LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource);
+ return ImmutableList.of(this);
+ }
+ return Lists.transform(
+ splits,
+ new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>() {
+ @Override
+ public BoundedToUnboundedSourceAdapter<T> apply(BoundedSource<T> input) {
+ return new BoundedToUnboundedSourceAdapter<>(input);
+ }});
+ } catch (Exception e) {
+ LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e);
+ return ImmutableList.of(this);
+ }
+ }
+
+ @Override
+ public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint)
+ throws IOException {
+ if (checkpoint == null) {
+ return new Reader(null /* residualElements */, boundedSource, options);
+ } else {
+ return new Reader(checkpoint.residualElements, checkpoint.residualSource, options);
+ }
+ }
+
+ @Override
+ public Coder<T> getDefaultOutputCoder() {
+ return boundedSource.getDefaultOutputCoder();
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
+ return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
+ }
+
+ @VisibleForTesting
+ static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
+ private final @Nullable List<TimestampedValue<T>> residualElements;
+ private final @Nullable BoundedSource<T> residualSource;
+
+ public Checkpoint(
+ @Nullable List<TimestampedValue<T>> residualElements,
+ @Nullable BoundedSource<T> residualSource) {
+ this.residualElements = residualElements;
+ this.residualSource = residualSource;
+ }
+
+ @Override
+ public void finalizeCheckpoint() {}
+
+ @VisibleForTesting
+ @Nullable List<TimestampedValue<T>> getResidualElements() {
+ return residualElements;
+ }
+
+ @VisibleForTesting
+ @Nullable BoundedSource<T> getResidualSource() {
+ return residualSource;
+ }
+ }
+
+ @VisibleForTesting
+ static class CheckpointCoder<T> extends StandardCoder<Checkpoint<T>> {
+
+ @JsonCreator
+ public static CheckpointCoder<?> of(
+ @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+ List<Coder<?>> components) {
+ checkArgument(components.size() == 1,
+ "Expecting 1 components, got %s", components.size());
+ return new CheckpointCoder<>(components.get(0));
+ }
+
+ // The coder for a list of residual elements and their timestamps
+ private final Coder<List<TimestampedValue<T>>> elemsCoder;
+ // The coder from the BoundedReader for coding each element
+ private final Coder<T> elemCoder;
+ // The nullable and serializable coder for the BoundedSource.
+ @SuppressWarnings("rawtypes")
+ private final Coder<BoundedSource> sourceCoder;
+
+ CheckpointCoder(Coder<T> elemCoder) {
+ this.elemsCoder = NullableCoder.of(
+ ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder)));
+ this.elemCoder = elemCoder;
+ this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class));
+ }
+
+ @Override
+ public void encode(Checkpoint<T> value, OutputStream outStream, Context context)
+ throws CoderException, IOException {
+ Context nested = context.nested();
+ elemsCoder.encode(value.residualElements, outStream, nested);
+ sourceCoder.encode(value.residualSource, outStream, nested);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Checkpoint<T> decode(InputStream inStream, Context context)
+ throws CoderException, IOException {
+ Context nested = context.nested();
+ return new Checkpoint<>(
+ elemsCoder.decode(inStream, nested),
+ sourceCoder.decode(inStream, nested));
+ }
+
+ @Override
+ public List<Coder<?>> getCoderArguments() {
+ return Arrays.<Coder<?>>asList(elemCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ throw new NonDeterministicException(this,
+ "CheckpointCoder uses Java Serialization, which may be non-deterministic.");
+ }
+ }
+
+ /**
+ * An {@code UnboundedReader<T>} that wraps a {@code BoundedSource<T>} into
+ * {@link ResidualElements} and {@link ResidualSource}.
+ *
+ * <p>In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains
+ * the {@code BoundedSource<T>}. After the first checkpoint, the {@code BoundedSource<T>} will
+ * be split into {@link ResidualElements} and {@link ResidualSource}.
+ */
+ @VisibleForTesting
+ class Reader extends UnboundedReader<T> {
+ private ResidualElements residualElements;
+ private @Nullable ResidualSource residualSource;
+ private final PipelineOptions options;
+ private boolean done;
+
+ Reader(
+ @Nullable List<TimestampedValue<T>> residualElementsList,
+ @Nullable BoundedSource<T> residualSource,
+ PipelineOptions options) {
+ init(residualElementsList, residualSource, options);
+ this.options = checkNotNull(options, "options");
+ this.done = false;
+ }
+
+ private void init(
+ @Nullable List<TimestampedValue<T>> residualElementsList,
+ @Nullable BoundedSource<T> residualSource,
+ PipelineOptions options) {
+ this.residualElements = residualElementsList == null
+ ? new ResidualElements(Collections.<TimestampedValue<T>>emptyList())
+ : new ResidualElements(residualElementsList);
+ this.residualSource =
+ residualSource == null ? null : new ResidualSource(residualSource, options);
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (residualElements.advance()) {
+ return true;
+ } else if (residualSource != null && residualSource.advance()) {
+ return true;
+ } else {
+ done = true;
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (residualSource != null) {
+ residualSource.close();
+ }
+ }
+
+ @Override
+ public T getCurrent() throws NoSuchElementException {
+ if (residualElements.hasCurrent()) {
+ return residualElements.getCurrent();
+ } else if (residualSource != null) {
+ return residualSource.getCurrent();
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ if (residualElements.hasCurrent()) {
+ return residualElements.getCurrentTimestamp();
+ } else if (residualSource != null) {
+ return residualSource.getCurrentTimestamp();
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public Instant getWatermark() {
+ return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p>If only part of the {@link ResidualElements} is consumed, the new
+ * checkpoint will contain the remaining elements in {@link ResidualElements} and
+ * the {@link ResidualSource}.
+ *
+ * <p>If all {@link ResidualElements} and part of the
+ * {@link ResidualSource} are consumed, the new checkpoint is done by splitting
+ * {@link ResidualSource} into new {@link ResidualElements} and {@link ResidualSource}.
+ * {@link ResidualSource} is the source split from the current source,
+ * and {@link ResidualElements} contains rest elements from the current source after
+ * the splitting. For unsplittable source, it will put all remaining elements into
+ * the {@link ResidualElements}.
+ */
+ @Override
+ public Checkpoint<T> getCheckpointMark() {
+ Checkpoint<T> newCheckpoint;
+ if (!residualElements.done()) {
+ // Part of residualElements are consumed.
+ // Checkpoints the remaining elements and residualSource.
+ newCheckpoint = new Checkpoint<>(
+ residualElements.getRestElements(),
+ residualSource == null ? null : residualSource.getSource());
+ } else if (residualSource != null) {
+ newCheckpoint = residualSource.getCheckpointMark();
+ } else {
+ newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */);
+ }
+ // Re-initialize since the residualElements and the residualSource might be
+ // consumed or split by checkpointing.
+ init(newCheckpoint.residualElements, newCheckpoint.residualSource, options);
+ return newCheckpoint;
+ }
+
+ @Override
+ public BoundedToUnboundedSourceAdapter<T> getCurrentSource() {
+ return BoundedToUnboundedSourceAdapter.this;
+ }
+ }
+
+ private class ResidualElements {
+ private final List<TimestampedValue<T>> elementsList;
+ private @Nullable Iterator<TimestampedValue<T>> elementsIterator;
+ private @Nullable TimestampedValue<T> currentT;
+ private boolean hasCurrent;
+ private boolean done;
+
+ ResidualElements(List<TimestampedValue<T>> residualElementsList) {
+ this.elementsList = checkNotNull(residualElementsList, "residualElementsList");
+ this.elementsIterator = null;
+ this.currentT = null;
+ this.hasCurrent = false;
+ this.done = false;
+ }
+
+ public boolean advance() {
+ if (elementsIterator == null) {
+ elementsIterator = elementsList.iterator();
+ }
+ if (elementsIterator.hasNext()) {
+ currentT = elementsIterator.next();
+ hasCurrent = true;
+ return true;
+ } else {
+ done = true;
+ hasCurrent = false;
+ return false;
+ }
+ }
+
+ boolean hasCurrent() {
+ return hasCurrent;
+ }
+
+ boolean done() {
+ return done;
+ }
+
+ TimestampedValue<T> getCurrentTimestampedValue() {
+ if (!hasCurrent) {
+ throw new NoSuchElementException();
+ }
+ return currentT;
+ }
+
+ T getCurrent() {
+ return getCurrentTimestampedValue().getValue();
+ }
+
+ Instant getCurrentTimestamp() {
+ return getCurrentTimestampedValue().getTimestamp();
+ }
+
+ List<TimestampedValue<T>> getRestElements() {
+ if (elementsIterator == null) {
+ return elementsList;
+ } else {
+ List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
+ while (elementsIterator.hasNext()) {
+ newResidualElements.add(elementsIterator.next());
+ }
+ return newResidualElements;
+ }
+ }
+ }
+
+ private class ResidualSource {
+ private BoundedSource<T> residualSource;
+ private PipelineOptions options;
+ private @Nullable BoundedReader<T> reader;
+ private boolean closed;
+
+ public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) {
+ this.residualSource = checkNotNull(residualSource, "residualSource");
+ this.options = checkNotNull(options, "options");
+ this.reader = null;
+ this.closed = false;
+ }
+
+ private boolean advance() throws IOException {
+ if (reader == null && !closed) {
+ reader = residualSource.createReader(options);
+ return reader.start();
+ } else {
+ return reader.advance();
+ }
+ }
+
+ T getCurrent() throws NoSuchElementException {
+ if (reader == null) {
+ throw new NoSuchElementException();
+ }
+ return reader.getCurrent();
+ }
+
+ Instant getCurrentTimestamp() throws NoSuchElementException {
+ if (reader == null) {
+ throw new NoSuchElementException();
+ }
+ return reader.getCurrentTimestamp();
+ }
+
+ void close() throws IOException {
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ closed = true;
+ }
+
+ BoundedSource<T> getSource() {
+ return residualSource;
+ }
+
+ Checkpoint<T> getCheckpointMark() {
+ if (reader == null) {
+ // Reader hasn't started, checkpoint the residualSource.
+ return new Checkpoint<>(null /* residualElements */, residualSource);
+ } else {
+ // Part of residualSource are consumed.
+ // Splits the residualSource and tracks the new residualElements in current source.
+ BoundedSource<T> residualSplit = null;
+ Double fractionConsumed = reader.getFractionConsumed();
+ if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) {
+ double fractionRest = 1 - fractionConsumed;
+ int splitAttempts = 8;
+ for (int i = 0; i < 8 && residualSplit == null; ++i) {
+ double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts;
+ residualSplit = reader.splitAtFraction(fractionToSplit);
+ }
+ }
+ List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
+ try {
+ while (advance()) {
+ newResidualElements.add(
+ TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp()));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read elements from the bounded reader.", e);
+ }
+ return new Checkpoint<>(newResidualElements, residualSplit);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f2ac394/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 999dc3a..0cf1ade 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -56,8 +56,6 @@ import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.io.AvroSource;
-import org.apache.beam.sdk.io.BigQueryIO;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -898,34 +896,6 @@ public class DataflowRunnerTest {
}
@Test
- public void testBoundedSourceUnsupportedInStreaming() throws Exception {
- testUnsupportedSource(
- AvroSource.readFromFileWithClass("foo", String.class), "Read.Bounded", true);
- }
-
- @Test
- public void testBigQueryIOSourceUnsupportedInStreaming() throws Exception {
- testUnsupportedSource(
- BigQueryIO.Read.from("project:bar.baz").withoutValidation(), "BigQueryIO.Read", true);
- }
-
- @Test
- public void testAvroIOSourceUnsupportedInStreaming() throws Exception {
- testUnsupportedSource(
- AvroIO.Read.from("foo"), "AvroIO.Read", true);
- }
-
- @Test
- public void testTextIOSourceUnsupportedInStreaming() throws Exception {
- testUnsupportedSource(TextIO.Read.from("foo"), "TextIO.Read", true);
- }
-
- @Test
- public void testReadBoundedSourceUnsupportedInStreaming() throws Exception {
- testUnsupportedSource(Read.from(AvroSource.from("/tmp/test")), "Read.Bounded", true);
- }
-
- @Test
public void testReadUnboundedUnsupportedInBatch() throws Exception {
testUnsupportedSource(Read.from(new TestCountingSource(1)), "Read.Unbounded", false);
}
[2/2] incubator-beam git commit: Closes #550
Posted by dh...@apache.org.
Closes #550
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/12b6ff8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/12b6ff8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/12b6ff8d
Branch: refs/heads/master
Commit: 12b6ff8d79b03b195d184acf5d718a82254e880e
Parents: e01efbd 4f2ac39
Author: Dan Halperin <dh...@google.com>
Authored: Wed Jun 29 22:45:45 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jun 29 22:45:45 2016 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 34 +-
.../DataflowUnboundedReadFromBoundedSource.java | 547 +++++++++++++++++++
.../runners/dataflow/DataflowRunnerTest.java | 30 -
3 files changed, 577 insertions(+), 34 deletions(-)
----------------------------------------------------------------------