You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/10 17:16:12 UTC
[1/3] beam git commit: [BEAM-386] Move UnboundedReadFromBoundedSource
to core-construction-java
Repository: beam
Updated Branches:
refs/heads/master e53f959f9 -> e0e39a975
http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java
deleted file mode 100644
index cfb5ebc..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java
+++ /dev/null
@@ -1,547 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.NullableCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.NameUtils;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}.
- *
- * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles},
- * and element timestamps are propagated. While any elements remain, the watermark is the beginning
- * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced
- * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
- *
- * <p>Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner
- * {@link BoundedSource}.
- * Sources that cannot be split are read entirely into memory, so this transform does not work well
- * with large, unsplittable sources.
- *
- * <p>This transform is intended to be used by a runner during pipeline translation to convert
- * a Read.Bounded into a Read.Unbounded.
- *
- * @deprecated This class is copied from beam runners core in order to avoid pipeline construction
- * time dependency. It should be replaced in the dataflow worker as an execution time dependency.
- */
-@Deprecated
-class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PCollection<T>> {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(DataflowUnboundedReadFromBoundedSource.class);
-
- private final BoundedSource<T> source;
-
- /**
- * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}.
- */
- public DataflowUnboundedReadFromBoundedSource(BoundedSource<T> source) {
- this.source = source;
- }
-
- @Override
- public PCollection<T> expand(PBegin input) {
- return input.getPipeline().apply(
- Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
- }
-
- @Override
- protected Coder<T> getDefaultOutputCoder() {
- return source.getDefaultOutputCoder();
- }
-
- @Override
- public String getKindString() {
- return String.format("Read(%s)", NameUtils.approximateSimpleName(source, "AnonymousSource"));
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- // We explicitly do not register base-class data, instead we use the delegate inner source.
- builder
- .add(DisplayData.item("source", source.getClass()))
- .include("source", source);
- }
-
- /**
- * A {@code BoundedSource} to {@code UnboundedSource} adapter.
- */
- @VisibleForTesting
- public static class BoundedToUnboundedSourceAdapter<T>
- extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>> {
-
- private BoundedSource<T> boundedSource;
-
- public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) {
- this.boundedSource = boundedSource;
- }
-
- @Override
- public void validate() {
- boundedSource.validate();
- }
-
- @Override
- public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits(
- int desiredNumSplits, PipelineOptions options) throws Exception {
- try {
- long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits;
- if (desiredBundleSize <= 0) {
- LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.",
- boundedSource);
- return ImmutableList.of(this);
- }
- List<? extends BoundedSource<T>> splits =
- boundedSource.splitIntoBundles(desiredBundleSize, options);
- if (splits == null) {
- LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource);
- return ImmutableList.of(this);
- }
- return Lists.transform(
- splits,
- new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>() {
- @Override
- public BoundedToUnboundedSourceAdapter<T> apply(BoundedSource<T> input) {
- return new BoundedToUnboundedSourceAdapter<>(input);
- }});
- } catch (Exception e) {
- LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e);
- return ImmutableList.of(this);
- }
- }
-
- @Override
- public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint)
- throws IOException {
- if (checkpoint == null) {
- return new Reader(null /* residualElements */, boundedSource, options);
- } else {
- return new Reader(checkpoint.residualElements, checkpoint.residualSource, options);
- }
- }
-
- @Override
- public Coder<T> getDefaultOutputCoder() {
- return boundedSource.getDefaultOutputCoder();
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- @Override
- public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
- return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder.add(DisplayData.item("source", boundedSource.getClass()));
- builder.include("source", boundedSource);
- }
-
- @VisibleForTesting
- static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
- private final @Nullable List<TimestampedValue<T>> residualElements;
- private final @Nullable BoundedSource<T> residualSource;
-
- public Checkpoint(
- @Nullable List<TimestampedValue<T>> residualElements,
- @Nullable BoundedSource<T> residualSource) {
- this.residualElements = residualElements;
- this.residualSource = residualSource;
- }
-
- @Override
- public void finalizeCheckpoint() {}
-
- @VisibleForTesting
- @Nullable List<TimestampedValue<T>> getResidualElements() {
- return residualElements;
- }
-
- @VisibleForTesting
- @Nullable BoundedSource<T> getResidualSource() {
- return residualSource;
- }
- }
-
- @VisibleForTesting
- static class CheckpointCoder<T> extends StandardCoder<Checkpoint<T>> {
-
- @JsonCreator
- public static CheckpointCoder<?> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Coder<?>> components) {
- checkArgument(components.size() == 1,
- "Expecting 1 components, got %s", components.size());
- return new CheckpointCoder<>(components.get(0));
- }
-
- // The coder for a list of residual elements and their timestamps
- private final Coder<List<TimestampedValue<T>>> elemsCoder;
- // The coder from the BoundedReader for coding each element
- private final Coder<T> elemCoder;
- // The nullable and serializable coder for the BoundedSource.
- @SuppressWarnings("rawtypes")
- private final Coder<BoundedSource> sourceCoder;
-
- CheckpointCoder(Coder<T> elemCoder) {
- this.elemsCoder = NullableCoder.of(
- ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder)));
- this.elemCoder = elemCoder;
- this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class));
- }
-
- @Override
- public void encode(Checkpoint<T> value, OutputStream outStream, Context context)
- throws CoderException, IOException {
- elemsCoder.encode(value.residualElements, outStream, context.nested());
- sourceCoder.encode(value.residualSource, outStream, context);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Checkpoint<T> decode(InputStream inStream, Context context)
- throws CoderException, IOException {
- return new Checkpoint<>(
- elemsCoder.decode(inStream, context.nested()),
- sourceCoder.decode(inStream, context));
- }
-
- @Override
- public List<Coder<?>> getCoderArguments() {
- return Arrays.<Coder<?>>asList(elemCoder);
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- throw new NonDeterministicException(this,
- "CheckpointCoder uses Java Serialization, which may be non-deterministic.");
- }
- }
-
- /**
- * An {@code UnboundedReader<T>} that wraps a {@code BoundedSource<T>} into
- * {@link ResidualElements} and {@link ResidualSource}.
- *
- * <p>In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains
- * the {@code BoundedSource<T>}. After the first checkpoint, the {@code BoundedSource<T>} will
- * be split into {@link ResidualElements} and {@link ResidualSource}.
- */
- @VisibleForTesting
- class Reader extends UnboundedReader<T> {
- private ResidualElements residualElements;
- private @Nullable ResidualSource residualSource;
- private final PipelineOptions options;
- private boolean done;
-
- Reader(
- @Nullable List<TimestampedValue<T>> residualElementsList,
- @Nullable BoundedSource<T> residualSource,
- PipelineOptions options) {
- init(residualElementsList, residualSource, options);
- this.options = checkNotNull(options, "options");
- this.done = false;
- }
-
- private void init(
- @Nullable List<TimestampedValue<T>> residualElementsList,
- @Nullable BoundedSource<T> residualSource,
- PipelineOptions options) {
- this.residualElements = residualElementsList == null
- ? new ResidualElements(Collections.<TimestampedValue<T>>emptyList())
- : new ResidualElements(residualElementsList);
- this.residualSource =
- residualSource == null ? null : new ResidualSource(residualSource, options);
- }
-
- @Override
- public boolean start() throws IOException {
- return advance();
- }
-
- @Override
- public boolean advance() throws IOException {
- if (residualElements.advance()) {
- return true;
- } else if (residualSource != null && residualSource.advance()) {
- return true;
- } else {
- done = true;
- return false;
- }
- }
-
- @Override
- public void close() throws IOException {
- if (residualSource != null) {
- residualSource.close();
- }
- }
-
- @Override
- public T getCurrent() throws NoSuchElementException {
- if (residualElements.hasCurrent()) {
- return residualElements.getCurrent();
- } else if (residualSource != null) {
- return residualSource.getCurrent();
- } else {
- throw new NoSuchElementException();
- }
- }
-
- @Override
- public Instant getCurrentTimestamp() throws NoSuchElementException {
- if (residualElements.hasCurrent()) {
- return residualElements.getCurrentTimestamp();
- } else if (residualSource != null) {
- return residualSource.getCurrentTimestamp();
- } else {
- throw new NoSuchElementException();
- }
- }
-
- @Override
- public Instant getWatermark() {
- return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE;
- }
-
- /**
- * {@inheritDoc}
- *
- * <p>If only part of the {@link ResidualElements} is consumed, the new
- * checkpoint will contain the remaining elements in {@link ResidualElements} and
- * the {@link ResidualSource}.
- *
- * <p>If all {@link ResidualElements} and part of the
- * {@link ResidualSource} are consumed, the new checkpoint is done by splitting
- * {@link ResidualSource} into new {@link ResidualElements} and {@link ResidualSource}.
- * {@link ResidualSource} is the source split from the current source,
- * and {@link ResidualElements} contains rest elements from the current source after
- * the splitting. For unsplittable source, it will put all remaining elements into
- * the {@link ResidualElements}.
- */
- @Override
- public Checkpoint<T> getCheckpointMark() {
- Checkpoint<T> newCheckpoint;
- if (!residualElements.done()) {
- // Part of residualElements are consumed.
- // Checkpoints the remaining elements and residualSource.
- newCheckpoint = new Checkpoint<>(
- residualElements.getRestElements(),
- residualSource == null ? null : residualSource.getSource());
- } else if (residualSource != null) {
- newCheckpoint = residualSource.getCheckpointMark();
- } else {
- newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */);
- }
- // Re-initialize since the residualElements and the residualSource might be
- // consumed or split by checkpointing.
- init(newCheckpoint.residualElements, newCheckpoint.residualSource, options);
- return newCheckpoint;
- }
-
- @Override
- public BoundedToUnboundedSourceAdapter<T> getCurrentSource() {
- return BoundedToUnboundedSourceAdapter.this;
- }
- }
-
- private class ResidualElements {
- private final List<TimestampedValue<T>> elementsList;
- private @Nullable Iterator<TimestampedValue<T>> elementsIterator;
- private @Nullable TimestampedValue<T> currentT;
- private boolean hasCurrent;
- private boolean done;
-
- ResidualElements(List<TimestampedValue<T>> residualElementsList) {
- this.elementsList = checkNotNull(residualElementsList, "residualElementsList");
- this.elementsIterator = null;
- this.currentT = null;
- this.hasCurrent = false;
- this.done = false;
- }
-
- public boolean advance() {
- if (elementsIterator == null) {
- elementsIterator = elementsList.iterator();
- }
- if (elementsIterator.hasNext()) {
- currentT = elementsIterator.next();
- hasCurrent = true;
- return true;
- } else {
- done = true;
- hasCurrent = false;
- return false;
- }
- }
-
- boolean hasCurrent() {
- return hasCurrent;
- }
-
- boolean done() {
- return done;
- }
-
- TimestampedValue<T> getCurrentTimestampedValue() {
- if (!hasCurrent) {
- throw new NoSuchElementException();
- }
- return currentT;
- }
-
- T getCurrent() {
- return getCurrentTimestampedValue().getValue();
- }
-
- Instant getCurrentTimestamp() {
- return getCurrentTimestampedValue().getTimestamp();
- }
-
- List<TimestampedValue<T>> getRestElements() {
- if (elementsIterator == null) {
- return elementsList;
- } else {
- List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
- while (elementsIterator.hasNext()) {
- newResidualElements.add(elementsIterator.next());
- }
- return newResidualElements;
- }
- }
- }
-
- private class ResidualSource {
- private BoundedSource<T> residualSource;
- private PipelineOptions options;
- private @Nullable BoundedReader<T> reader;
- private boolean closed;
-
- public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) {
- this.residualSource = checkNotNull(residualSource, "residualSource");
- this.options = checkNotNull(options, "options");
- this.reader = null;
- this.closed = false;
- }
-
- private boolean advance() throws IOException {
- if (reader == null && !closed) {
- reader = residualSource.createReader(options);
- return reader.start();
- } else {
- return reader.advance();
- }
- }
-
- T getCurrent() throws NoSuchElementException {
- if (reader == null) {
- throw new NoSuchElementException();
- }
- return reader.getCurrent();
- }
-
- Instant getCurrentTimestamp() throws NoSuchElementException {
- if (reader == null) {
- throw new NoSuchElementException();
- }
- return reader.getCurrentTimestamp();
- }
-
- void close() throws IOException {
- if (reader != null) {
- reader.close();
- reader = null;
- }
- closed = true;
- }
-
- BoundedSource<T> getSource() {
- return residualSource;
- }
-
- Checkpoint<T> getCheckpointMark() {
- if (reader == null) {
- // Reader hasn't started, checkpoint the residualSource.
- return new Checkpoint<>(null /* residualElements */, residualSource);
- } else {
- // Part of residualSource are consumed.
- // Splits the residualSource and tracks the new residualElements in current source.
- BoundedSource<T> residualSplit = null;
- Double fractionConsumed = reader.getFractionConsumed();
- if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) {
- double fractionRest = 1 - fractionConsumed;
- int splitAttempts = 8;
- for (int i = 0; i < 8 && residualSplit == null; ++i) {
- double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts;
- residualSplit = reader.splitAtFraction(fractionToSplit);
- }
- }
- List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
- try {
- while (advance()) {
- newResidualElements.add(
- TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp()));
- }
- } catch (IOException e) {
- throw new RuntimeException("Failed to read elements from the bounded reader.", e);
- }
- return new Checkpoint<>(newResidualElements, residualSplit);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSourceTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSourceTest.java
deleted file mode 100644
index c479332..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSourceTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.dataflow;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@see DataflowUnboundedReadFromBoundedSource}.
- */
-@RunWith(JUnit4.class)
-public class DataflowUnboundedReadFromBoundedSourceTest {
- @Test
- public void testKind() {
- DataflowUnboundedReadFromBoundedSource<?> read = new
- DataflowUnboundedReadFromBoundedSource<>(new NoopNamedSource());
-
- assertEquals("Read(NoopNamedSource)", read.getKindString());
- }
-
- @Test
- public void testKindAnonymousSource() {
- NoopNamedSource anonSource = new NoopNamedSource() {};
- DataflowUnboundedReadFromBoundedSource<?> read = new
- DataflowUnboundedReadFromBoundedSource<>(anonSource);
-
- assertEquals("Read(AnonymousSource)", read.getKindString());
- }
-
- /** Source implementation only useful for its identity. */
- static class NoopNamedSource extends BoundedSource<String> {
- @Override
- public List<? extends BoundedSource<String>> splitIntoBundles(long desiredBundleSizeBytes,
- PipelineOptions options) throws Exception {
- return null;
- }
- @Override
- public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
- return 0;
- }
- @Override
- public BoundedReader<String> createReader(
- PipelineOptions options) throws IOException {
- return null;
- }
- @Override
- public void validate() {
-
- }
- @Override
- public Coder<String> getDefaultOutputCoder() {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index 988a82b..fcc00f9 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -31,9 +31,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.apache.beam.runners.core.UnboundedReadFromBoundedSource;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.stateful.SparkTimerInternals;
[2/3] beam git commit: [BEAM-386] Move UnboundedReadFromBoundedSource
to core-construction-java
Posted by dh...@apache.org.
[BEAM-386] Move UnboundedReadFromBoundedSource to core-construction-java
Now that it's in the construction package, we can delete the
Dataflow-specific clone that we couldn't use from runners-core.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/66229142
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/66229142
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/66229142
Branch: refs/heads/master
Commit: 662291426af2c82c35085053fac0e9c2b845339f
Parents: e53f959
Author: Dan Halperin <dh...@google.com>
Authored: Mon Apr 10 07:08:32 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Apr 10 10:16:04 2017 -0700
----------------------------------------------------------------------
.../translation/ApexPipelineTranslator.java | 2 +-
runners/core-construction-java/pom.xml | 15 +
.../UnboundedReadFromBoundedSource.java | 542 ++++++++++++++++++
.../UnboundedReadFromBoundedSourceTest.java | 373 +++++++++++++
runners/core-java/pom.xml | 5 -
.../core/UnboundedReadFromBoundedSource.java | 542 ------------------
.../UnboundedReadFromBoundedSourceTest.java | 373 -------------
.../beam/runners/dataflow/DataflowRunner.java | 3 +-
.../DataflowUnboundedReadFromBoundedSource.java | 547 -------------------
...aflowUnboundedReadFromBoundedSourceTest.java | 79 ---
.../beam/runners/spark/TestSparkRunner.java | 2 +-
11 files changed, 934 insertions(+), 1549 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
index 42ff144..fdeefc7 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
@@ -24,8 +24,8 @@ import java.util.Map;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView;
import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
-import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
import org.apache.beam.runners.core.construction.PrimitiveCreate;
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.runners.TransformHierarchy;
http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index 9619280..78b6819 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -60,6 +60,16 @@
</dependency>
<dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
@@ -69,6 +79,11 @@
<artifactId>guava</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
<!-- test dependencies -->
<dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
new file mode 100644
index 0000000..6b7bd71
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.NameUtils;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}.
+ *
+ * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles},
+ * and element timestamps are propagated. While any elements remain, the watermark is the beginning
+ * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced
+ * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
+ *
+ * <p>Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner
+ * {@link BoundedSource}.
+ * Sources that cannot be split are read entirely into memory, so this transform does not work well
+ * with large, unsplittable sources.
+ *
+ * <p>This transform is intended to be used by a runner during pipeline translation to convert
+ * a Read.Bounded into a Read.Unbounded.
+ */
+public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PCollection<T>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(UnboundedReadFromBoundedSource.class);
+
+ private final BoundedSource<T> source;
+
+ /**
+ * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}.
+ */
+ public UnboundedReadFromBoundedSource(BoundedSource<T> source) {
+ this.source = source;
+ }
+
+ @Override
+ public PCollection<T> expand(PBegin input) {
+ return input.getPipeline().apply(
+ Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
+ }
+
+ @Override
+ protected Coder<T> getDefaultOutputCoder() {
+ return source.getDefaultOutputCoder();
+ }
+
+ @Override
+ public String getKindString() {
+ return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ // We explicitly do not register base-class data, instead we use the delegate inner source.
+ builder
+ .add(DisplayData.item("source", source.getClass()))
+ .include("source", source);
+ }
+
+ /**
+ * A {@code BoundedSource} to {@code UnboundedSource} adapter.
+ */
+ @VisibleForTesting
+ public static class BoundedToUnboundedSourceAdapter<T>
+ extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>> {
+
+ private BoundedSource<T> boundedSource;
+
+ public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) {
+ this.boundedSource = boundedSource;
+ }
+
+ @Override
+ public void validate() {
+ boundedSource.validate();
+ }
+
+ @Override
+ public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits(
+ int desiredNumSplits, PipelineOptions options) throws Exception {
+ try {
+ long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits;
+ if (desiredBundleSize <= 0) {
+ LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.",
+ boundedSource);
+ return ImmutableList.of(this);
+ }
+ List<? extends BoundedSource<T>> splits =
+ boundedSource.splitIntoBundles(desiredBundleSize, options);
+ if (splits == null) {
+ LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource);
+ return ImmutableList.of(this);
+ }
+ return Lists.transform(
+ splits,
+ new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>() {
+ @Override
+ public BoundedToUnboundedSourceAdapter<T> apply(BoundedSource<T> input) {
+ return new BoundedToUnboundedSourceAdapter<>(input);
+ }});
+ } catch (Exception e) {
+ LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e);
+ return ImmutableList.of(this);
+ }
+ }
+
+ @Override
+ public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint)
+ throws IOException {
+ if (checkpoint == null) {
+ return new Reader(null /* residualElements */, boundedSource, options);
+ } else {
+ return new Reader(checkpoint.residualElements, checkpoint.residualSource, options);
+ }
+ }
+
+ @Override
+ public Coder<T> getDefaultOutputCoder() {
+ return boundedSource.getDefaultOutputCoder();
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
+ return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
+ }
+
+ @VisibleForTesting
+ static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
+ private final @Nullable List<TimestampedValue<T>> residualElements;
+ private final @Nullable BoundedSource<T> residualSource;
+
+ public Checkpoint(
+ @Nullable List<TimestampedValue<T>> residualElements,
+ @Nullable BoundedSource<T> residualSource) {
+ this.residualElements = residualElements;
+ this.residualSource = residualSource;
+ }
+
+ @Override
+ public void finalizeCheckpoint() {}
+
+ @VisibleForTesting
+ @Nullable List<TimestampedValue<T>> getResidualElements() {
+ return residualElements;
+ }
+
+ @VisibleForTesting
+ @Nullable BoundedSource<T> getResidualSource() {
+ return residualSource;
+ }
+ }
+
+ @VisibleForTesting
+ static class CheckpointCoder<T> extends StandardCoder<Checkpoint<T>> {
+
+ @JsonCreator
+ public static CheckpointCoder<?> of(
+ @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+ List<Coder<?>> components) {
+ checkArgument(components.size() == 1,
+ "Expecting 1 components, got %s", components.size());
+ return new CheckpointCoder<>(components.get(0));
+ }
+
+ // The coder for a list of residual elements and their timestamps
+ private final Coder<List<TimestampedValue<T>>> elemsCoder;
+ // The coder from the BoundedReader for coding each element
+ private final Coder<T> elemCoder;
+ // The nullable and serializable coder for the BoundedSource.
+ @SuppressWarnings("rawtypes")
+ private final Coder<BoundedSource> sourceCoder;
+
+ CheckpointCoder(Coder<T> elemCoder) {
+ this.elemsCoder = NullableCoder.of(
+ ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder)));
+ this.elemCoder = elemCoder;
+ this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class));
+ }
+
+ @Override
+ public void encode(Checkpoint<T> value, OutputStream outStream, Context context)
+ throws CoderException, IOException {
+ elemsCoder.encode(value.residualElements, outStream, context.nested());
+ sourceCoder.encode(value.residualSource, outStream, context);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Checkpoint<T> decode(InputStream inStream, Context context)
+ throws CoderException, IOException {
+ return new Checkpoint<>(
+ elemsCoder.decode(inStream, context.nested()),
+ sourceCoder.decode(inStream, context));
+ }
+
+ @Override
+ public List<Coder<?>> getCoderArguments() {
+ return Arrays.<Coder<?>>asList(elemCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ throw new NonDeterministicException(this,
+ "CheckpointCoder uses Java Serialization, which may be non-deterministic.");
+ }
+ }
+
+ /**
+ * An {@code UnboundedReader<T>} that wraps a {@code BoundedSource<T>} into
+ * {@link ResidualElements} and {@link ResidualSource}.
+ *
+ * <p>In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains
+ * the {@code BoundedSource<T>}. After the first checkpoint, the {@code BoundedSource<T>} will
+ * be split into {@link ResidualElements} and {@link ResidualSource}.
+ */
+ @VisibleForTesting
+ class Reader extends UnboundedReader<T> {
+ private ResidualElements residualElements;
+ private @Nullable ResidualSource residualSource;
+ private final PipelineOptions options;
+ private boolean done;
+
+ Reader(
+ @Nullable List<TimestampedValue<T>> residualElementsList,
+ @Nullable BoundedSource<T> residualSource,
+ PipelineOptions options) {
+ init(residualElementsList, residualSource, options);
+ this.options = checkNotNull(options, "options");
+ this.done = false;
+ }
+
+ private void init(
+ @Nullable List<TimestampedValue<T>> residualElementsList,
+ @Nullable BoundedSource<T> residualSource,
+ PipelineOptions options) {
+ this.residualElements = residualElementsList == null
+ ? new ResidualElements(Collections.<TimestampedValue<T>>emptyList())
+ : new ResidualElements(residualElementsList);
+ this.residualSource =
+ residualSource == null ? null : new ResidualSource(residualSource, options);
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (residualElements.advance()) {
+ return true;
+ } else if (residualSource != null && residualSource.advance()) {
+ return true;
+ } else {
+ done = true;
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (residualSource != null) {
+ residualSource.close();
+ }
+ }
+
+ @Override
+ public T getCurrent() throws NoSuchElementException {
+ if (residualElements.hasCurrent()) {
+ return residualElements.getCurrent();
+ } else if (residualSource != null) {
+ return residualSource.getCurrent();
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ if (residualElements.hasCurrent()) {
+ return residualElements.getCurrentTimestamp();
+ } else if (residualSource != null) {
+ return residualSource.getCurrentTimestamp();
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public Instant getWatermark() {
+ return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p>If only part of the {@link ResidualElements} is consumed, the new
+ * checkpoint will contain the remaining elements in {@link ResidualElements} and
+ * the {@link ResidualSource}.
+ *
+ * <p>If all {@link ResidualElements} and part of the
+ * {@link ResidualSource} are consumed, the new checkpoint is done by splitting
+ * {@link ResidualSource} into new {@link ResidualElements} and {@link ResidualSource}.
+ * {@link ResidualSource} is the source split from the current source,
+ * and {@link ResidualElements} contains rest elements from the current source after
+ * the splitting. For unsplittable source, it will put all remaining elements into
+ * the {@link ResidualElements}.
+ */
+ @Override
+ public Checkpoint<T> getCheckpointMark() {
+ Checkpoint<T> newCheckpoint;
+ if (!residualElements.done()) {
+ // Part of residualElements are consumed.
+ // Checkpoints the remaining elements and residualSource.
+ newCheckpoint = new Checkpoint<>(
+ residualElements.getRestElements(),
+ residualSource == null ? null : residualSource.getSource());
+ } else if (residualSource != null) {
+ newCheckpoint = residualSource.getCheckpointMark();
+ } else {
+ newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */);
+ }
+ // Re-initialize since the residualElements and the residualSource might be
+ // consumed or split by checkpointing.
+ init(newCheckpoint.residualElements, newCheckpoint.residualSource, options);
+ return newCheckpoint;
+ }
+
+ @Override
+ public BoundedToUnboundedSourceAdapter<T> getCurrentSource() {
+ return BoundedToUnboundedSourceAdapter.this;
+ }
+ }
+
+ private class ResidualElements {
+ private final List<TimestampedValue<T>> elementsList;
+ private @Nullable Iterator<TimestampedValue<T>> elementsIterator;
+ private @Nullable TimestampedValue<T> currentT;
+ private boolean hasCurrent;
+ private boolean done;
+
+ ResidualElements(List<TimestampedValue<T>> residualElementsList) {
+ this.elementsList = checkNotNull(residualElementsList, "residualElementsList");
+ this.elementsIterator = null;
+ this.currentT = null;
+ this.hasCurrent = false;
+ this.done = false;
+ }
+
+ public boolean advance() {
+ if (elementsIterator == null) {
+ elementsIterator = elementsList.iterator();
+ }
+ if (elementsIterator.hasNext()) {
+ currentT = elementsIterator.next();
+ hasCurrent = true;
+ return true;
+ } else {
+ done = true;
+ hasCurrent = false;
+ return false;
+ }
+ }
+
+ boolean hasCurrent() {
+ return hasCurrent;
+ }
+
+ boolean done() {
+ return done;
+ }
+
+ TimestampedValue<T> getCurrentTimestampedValue() {
+ if (!hasCurrent) {
+ throw new NoSuchElementException();
+ }
+ return currentT;
+ }
+
+ T getCurrent() {
+ return getCurrentTimestampedValue().getValue();
+ }
+
+ Instant getCurrentTimestamp() {
+ return getCurrentTimestampedValue().getTimestamp();
+ }
+
+ List<TimestampedValue<T>> getRestElements() {
+ if (elementsIterator == null) {
+ return elementsList;
+ } else {
+ List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
+ while (elementsIterator.hasNext()) {
+ newResidualElements.add(elementsIterator.next());
+ }
+ return newResidualElements;
+ }
+ }
+ }
+
+ private class ResidualSource {
+ private BoundedSource<T> residualSource;
+ private PipelineOptions options;
+ private @Nullable BoundedReader<T> reader;
+ private boolean closed;
+ private boolean readerDone;
+
+ public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) {
+ this.residualSource = checkNotNull(residualSource, "residualSource");
+ this.options = checkNotNull(options, "options");
+ this.reader = null;
+ this.closed = false;
+ this.readerDone = false;
+ }
+
+ private boolean advance() throws IOException {
+ checkArgument(!closed, "advance() call on closed %s", getClass().getName());
+ if (readerDone) {
+ return false;
+ }
+ if (reader == null) {
+ reader = residualSource.createReader(options);
+ readerDone = !reader.start();
+ } else {
+ readerDone = !reader.advance();
+ }
+ return !readerDone;
+ }
+
+ T getCurrent() throws NoSuchElementException {
+ if (reader == null) {
+ throw new NoSuchElementException();
+ }
+ return reader.getCurrent();
+ }
+
+ Instant getCurrentTimestamp() throws NoSuchElementException {
+ if (reader == null) {
+ throw new NoSuchElementException();
+ }
+ return reader.getCurrentTimestamp();
+ }
+
+ void close() throws IOException {
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ closed = true;
+ }
+
+ BoundedSource<T> getSource() {
+ return residualSource;
+ }
+
+ Checkpoint<T> getCheckpointMark() {
+ if (reader == null) {
+ // Reader hasn't started, checkpoint the residualSource.
+ return new Checkpoint<>(null /* residualElements */, residualSource);
+ } else {
+ // Part of residualSource are consumed.
+ // Splits the residualSource and tracks the new residualElements in current source.
+ BoundedSource<T> residualSplit = null;
+ Double fractionConsumed = reader.getFractionConsumed();
+ if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) {
+ double fractionRest = 1 - fractionConsumed;
+ int splitAttempts = 8;
+ for (int i = 0; i < 8 && residualSplit == null; ++i) {
+ double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts;
+ residualSplit = reader.splitAtFraction(fractionToSplit);
+ }
+ }
+ List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
+ try {
+ while (advance()) {
+ newResidualElements.add(
+ TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp()));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read elements from the bounded reader.", e);
+ }
+ return new Checkpoint<>(newResidualElements, residualSplit);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
new file mode 100644
index 0000000..c905cf5
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint;
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.CheckpointCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Distinct;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.Min;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Unit tests for {@link UnboundedReadFromBoundedSource}.
+ */
+@RunWith(JUnit4.class)
+public class UnboundedReadFromBoundedSourceTest {
+
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @Rule
+ public transient ExpectedException thrown = ExpectedException.none();
+
+ @Rule
+ public TestPipeline p = TestPipeline.create();
+
+ @Test
+ public void testCheckpointCoderNulls() throws Exception {
+ CheckpointCoder<String> coder = new CheckpointCoder<>(StringUtf8Coder.of());
+ Checkpoint<String> emptyCheckpoint = new Checkpoint<>(null, null);
+ Checkpoint<String> decodedEmptyCheckpoint = CoderUtils.decodeFromByteArray(
+ coder,
+ CoderUtils.encodeToByteArray(coder, emptyCheckpoint));
+ assertNull(decodedEmptyCheckpoint.getResidualElements());
+ assertNull(decodedEmptyCheckpoint.getResidualSource());
+ }
+
+ @Test
+ public void testCheckpointCoderIsSerializableWithWellKnownCoderType() throws Exception {
+ CoderProperties.coderSerializable(new CheckpointCoder<>(GlobalWindow.Coder.INSTANCE));
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testBoundedToUnboundedSourceAdapter() throws Exception {
+ long numElements = 100;
+ BoundedSource<Long> boundedSource = CountingSource.upTo(numElements);
+ UnboundedSource<Long, Checkpoint<Long>> unboundedSource =
+ new BoundedToUnboundedSourceAdapter<>(boundedSource);
+
+ PCollection<Long> output =
+ p.apply(Read.from(unboundedSource).withMaxNumRecords(numElements));
+
+ // Count == numElements
+ PAssert
+ .thatSingleton(output.apply("Count", Count.<Long>globally()))
+ .isEqualTo(numElements);
+ // Unique count == numElements
+ PAssert
+ .thatSingleton(output.apply(Distinct.<Long>create())
+ .apply("UniqueCount", Count.<Long>globally()))
+ .isEqualTo(numElements);
+ // Min == 0
+ PAssert
+ .thatSingleton(output.apply("Min", Min.<Long>globally()))
+ .isEqualTo(0L);
+ // Max == numElements-1
+ PAssert
+ .thatSingleton(output.apply("Max", Max.<Long>globally()))
+ .isEqualTo(numElements - 1);
+ p.run();
+ }
+
+ @Test
+ public void testCountingSourceToUnboundedCheckpoint() throws Exception {
+ long numElements = 100;
+ BoundedSource<Long> countingSource = CountingSource.upTo(numElements);
+ List<Long> expected = Lists.newArrayList();
+ for (long i = 0; i < numElements; ++i) {
+ expected.add(i);
+ }
+ testBoundedToUnboundedSourceAdapterCheckpoint(countingSource, expected);
+ }
+
+ @Test
+ public void testUnsplittableSourceToUnboundedCheckpoint() throws Exception {
+ String baseName = "test-input";
+ File compressedFile = tmpFolder.newFile(baseName + ".gz");
+ byte[] input = generateInput(100);
+ writeFile(compressedFile, input);
+
+ BoundedSource<Byte> source = new UnsplittableSource(compressedFile.getPath(), 1);
+ List<Byte> expected = Lists.newArrayList();
+ for (byte i : input) {
+ expected.add(i);
+ }
+ testBoundedToUnboundedSourceAdapterCheckpoint(source, expected);
+ }
+
+ private <T> void testBoundedToUnboundedSourceAdapterCheckpoint(
+ BoundedSource<T> boundedSource,
+ List<T> expectedElements) throws Exception {
+ BoundedToUnboundedSourceAdapter<T> unboundedSource =
+ new BoundedToUnboundedSourceAdapter<>(boundedSource);
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ BoundedToUnboundedSourceAdapter<T>.Reader reader =
+ unboundedSource.createReader(options, null);
+
+ List<T> actual = Lists.newArrayList();
+ for (boolean hasNext = reader.start(); hasNext; hasNext = reader.advance()) {
+ actual.add(reader.getCurrent());
+ // checkpoint every 9 elements
+ if (actual.size() % 9 == 0) {
+ Checkpoint<T> checkpoint = reader.getCheckpointMark();
+ checkpoint.finalizeCheckpoint();
+ }
+ }
+ Checkpoint<T> checkpointDone = reader.getCheckpointMark();
+ assertTrue(checkpointDone.getResidualElements() == null
+ || checkpointDone.getResidualElements().isEmpty());
+
+ assertEquals(expectedElements.size(), actual.size());
+ assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual));
+ }
+
+ @Test
+ public void testCountingSourceToUnboundedCheckpointRestart() throws Exception {
+ long numElements = 100;
+ BoundedSource<Long> countingSource = CountingSource.upTo(numElements);
+ List<Long> expected = Lists.newArrayList();
+ for (long i = 0; i < numElements; ++i) {
+ expected.add(i);
+ }
+ testBoundedToUnboundedSourceAdapterCheckpointRestart(countingSource, expected);
+ }
+
+ @Test
+ public void testUnsplittableSourceToUnboundedCheckpointRestart() throws Exception {
+ String baseName = "test-input";
+ File compressedFile = tmpFolder.newFile(baseName + ".gz");
+ byte[] input = generateInput(1000);
+ writeFile(compressedFile, input);
+
+ BoundedSource<Byte> source = new UnsplittableSource(compressedFile.getPath(), 1);
+ List<Byte> expected = Lists.newArrayList();
+ for (byte i : input) {
+ expected.add(i);
+ }
+ testBoundedToUnboundedSourceAdapterCheckpointRestart(source, expected);
+ }
+
+ private <T> void testBoundedToUnboundedSourceAdapterCheckpointRestart(
+ BoundedSource<T> boundedSource,
+ List<T> expectedElements) throws Exception {
+ BoundedToUnboundedSourceAdapter<T> unboundedSource =
+ new BoundedToUnboundedSourceAdapter<>(boundedSource);
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ BoundedToUnboundedSourceAdapter<T>.Reader reader =
+ unboundedSource.createReader(options, null);
+
+ List<T> actual = Lists.newArrayList();
+ for (boolean hasNext = reader.start(); hasNext;) {
+ actual.add(reader.getCurrent());
+ // checkpoint every 9 elements
+ if (actual.size() % 9 == 0) {
+ Checkpoint<T> checkpoint = reader.getCheckpointMark();
+ Coder<Checkpoint<T>> checkpointCoder = unboundedSource.getCheckpointMarkCoder();
+ Checkpoint<T> decodedCheckpoint = CoderUtils.decodeFromByteArray(
+ checkpointCoder,
+ CoderUtils.encodeToByteArray(checkpointCoder, checkpoint));
+ reader.close();
+ checkpoint.finalizeCheckpoint();
+
+ BoundedToUnboundedSourceAdapter<T>.Reader restarted =
+ unboundedSource.createReader(options, decodedCheckpoint);
+ reader = restarted;
+ hasNext = reader.start();
+ } else {
+ hasNext = reader.advance();
+ }
+ }
+ Checkpoint<T> checkpointDone = reader.getCheckpointMark();
+ assertTrue(checkpointDone.getResidualElements() == null
+ || checkpointDone.getResidualElements().isEmpty());
+
+ assertEquals(expectedElements.size(), actual.size());
+ assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual));
+ }
+
+ @Test
+ public void testReadBeforeStart() throws Exception {
+ thrown.expect(NoSuchElementException.class);
+
+ BoundedSource<Long> countingSource = CountingSource.upTo(100);
+ BoundedToUnboundedSourceAdapter<Long> unboundedSource =
+ new BoundedToUnboundedSourceAdapter<>(countingSource);
+ PipelineOptions options = PipelineOptionsFactory.create();
+
+ unboundedSource.createReader(options, null).getCurrent();
+ }
+
+ @Test
+ public void testReadFromCheckpointBeforeStart() throws Exception {
+ thrown.expect(NoSuchElementException.class);
+
+ BoundedSource<Long> countingSource = CountingSource.upTo(100);
+ BoundedToUnboundedSourceAdapter<Long> unboundedSource =
+ new BoundedToUnboundedSourceAdapter<>(countingSource);
+ PipelineOptions options = PipelineOptionsFactory.create();
+
+ List<TimestampedValue<Long>> elements =
+ ImmutableList.of(TimestampedValue.of(1L, new Instant(1L)));
+ Checkpoint<Long> checkpoint = new Checkpoint<>(elements, countingSource);
+ unboundedSource.createReader(options, checkpoint).getCurrent();
+ }
+
+ /**
+ * Generate byte array of given size.
+ */
+ private static byte[] generateInput(int size) {
+ // Arbitrary but fixed seed
+ Random random = new Random(285930);
+ byte[] buff = new byte[size];
+ random.nextBytes(buff);
+ return buff;
+ }
+
+ /**
+ * Writes a single output file.
+ */
+ private static void writeFile(File file, byte[] input) throws IOException {
+ try (OutputStream os = new FileOutputStream(file)) {
+ os.write(input);
+ }
+ }
+
+ /**
+ * Unsplittable source for use in tests.
+ */
+ private static class UnsplittableSource extends FileBasedSource<Byte> {
+ public UnsplittableSource(String fileOrPatternSpec, long minBundleSize) {
+ super(fileOrPatternSpec, minBundleSize);
+ }
+
+ public UnsplittableSource(
+ String fileName, long minBundleSize, long startOffset, long endOffset) {
+ super(fileName, minBundleSize, startOffset, endOffset);
+ }
+
+ @Override
+ protected FileBasedSource<Byte> createForSubrangeOfFile(String fileName, long start, long end) {
+ return new UnsplittableSource(fileName, getMinBundleSize(), start, end);
+ }
+
+ @Override
+ protected FileBasedReader<Byte> createSingleFileReader(PipelineOptions options) {
+ return new UnsplittableReader(this);
+ }
+
+ @Override
+ public Coder<Byte> getDefaultOutputCoder() {
+ return SerializableCoder.of(Byte.class);
+ }
+
+ private static class UnsplittableReader extends FileBasedReader<Byte> {
+ ByteBuffer buff = ByteBuffer.allocate(1);
+ Byte current;
+ long offset;
+ ReadableByteChannel channel;
+
+ public UnsplittableReader(UnsplittableSource source) {
+ super(source);
+ offset = source.getStartOffset() - 1;
+ }
+
+ @Override
+ public Byte getCurrent() throws NoSuchElementException {
+ return current;
+ }
+
+ @Override
+ public boolean allowsDynamicSplitting() {
+ return false;
+ }
+
+ @Override
+ protected boolean isAtSplitPoint() {
+ return true;
+ }
+
+ @Override
+ protected void startReading(ReadableByteChannel channel) throws IOException {
+ this.channel = channel;
+ }
+
+ @Override
+ protected boolean readNextRecord() throws IOException {
+ buff.clear();
+ if (channel.read(buff) != 1) {
+ return false;
+ }
+ current = buff.get(0);
+ offset += 1;
+ return true;
+ }
+
+ @Override
+ protected long getCurrentOffset() {
+ return offset;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index 631e8c0..affd1a9 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -105,11 +105,6 @@
<artifactId>joda-time</artifactId>
</dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
-
<!-- test dependencies -->
<!-- Utilities such as WindowMatchers -->
http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
deleted file mode 100644
index 0c173a0..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
+++ /dev/null
@@ -1,542 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.NullableCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.NameUtils;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}.
- *
- * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles},
- * and element timestamps are propagated. While any elements remain, the watermark is the beginning
- * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced
- * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
- *
- * <p>Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner
- * {@link BoundedSource}.
- * Sources that cannot be split are read entirely into memory, so this transform does not work well
- * with large, unsplittable sources.
- *
- * <p>This transform is intended to be used by a runner during pipeline translation to convert
- * a Read.Bounded into a Read.Unbounded.
- */
-public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PCollection<T>> {
-
- private static final Logger LOG = LoggerFactory.getLogger(UnboundedReadFromBoundedSource.class);
-
- private final BoundedSource<T> source;
-
- /**
- * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}.
- */
- public UnboundedReadFromBoundedSource(BoundedSource<T> source) {
- this.source = source;
- }
-
- @Override
- public PCollection<T> expand(PBegin input) {
- return input.getPipeline().apply(
- Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
- }
-
- @Override
- protected Coder<T> getDefaultOutputCoder() {
- return source.getDefaultOutputCoder();
- }
-
- @Override
- public String getKindString() {
- return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- // We explicitly do not register base-class data, instead we use the delegate inner source.
- builder
- .add(DisplayData.item("source", source.getClass()))
- .include("source", source);
- }
-
- /**
- * A {@code BoundedSource} to {@code UnboundedSource} adapter.
- */
- @VisibleForTesting
- public static class BoundedToUnboundedSourceAdapter<T>
- extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>> {
-
- private BoundedSource<T> boundedSource;
-
- public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) {
- this.boundedSource = boundedSource;
- }
-
- @Override
- public void validate() {
- boundedSource.validate();
- }
-
- @Override
- public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits(
- int desiredNumSplits, PipelineOptions options) throws Exception {
- try {
- long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits;
- if (desiredBundleSize <= 0) {
- LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.",
- boundedSource);
- return ImmutableList.of(this);
- }
- List<? extends BoundedSource<T>> splits =
- boundedSource.splitIntoBundles(desiredBundleSize, options);
- if (splits == null) {
- LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource);
- return ImmutableList.of(this);
- }
- return Lists.transform(
- splits,
- new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>() {
- @Override
- public BoundedToUnboundedSourceAdapter<T> apply(BoundedSource<T> input) {
- return new BoundedToUnboundedSourceAdapter<>(input);
- }});
- } catch (Exception e) {
- LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e);
- return ImmutableList.of(this);
- }
- }
-
- @Override
- public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint)
- throws IOException {
- if (checkpoint == null) {
- return new Reader(null /* residualElements */, boundedSource, options);
- } else {
- return new Reader(checkpoint.residualElements, checkpoint.residualSource, options);
- }
- }
-
- @Override
- public Coder<T> getDefaultOutputCoder() {
- return boundedSource.getDefaultOutputCoder();
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- @Override
- public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
- return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
- }
-
- @VisibleForTesting
- static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
- private final @Nullable List<TimestampedValue<T>> residualElements;
- private final @Nullable BoundedSource<T> residualSource;
-
- public Checkpoint(
- @Nullable List<TimestampedValue<T>> residualElements,
- @Nullable BoundedSource<T> residualSource) {
- this.residualElements = residualElements;
- this.residualSource = residualSource;
- }
-
- @Override
- public void finalizeCheckpoint() {}
-
- @VisibleForTesting
- @Nullable List<TimestampedValue<T>> getResidualElements() {
- return residualElements;
- }
-
- @VisibleForTesting
- @Nullable BoundedSource<T> getResidualSource() {
- return residualSource;
- }
- }
-
- @VisibleForTesting
- static class CheckpointCoder<T> extends StandardCoder<Checkpoint<T>> {
-
- @JsonCreator
- public static CheckpointCoder<?> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Coder<?>> components) {
- checkArgument(components.size() == 1,
- "Expecting 1 components, got %s", components.size());
- return new CheckpointCoder<>(components.get(0));
- }
-
- // The coder for a list of residual elements and their timestamps
- private final Coder<List<TimestampedValue<T>>> elemsCoder;
- // The coder from the BoundedReader for coding each element
- private final Coder<T> elemCoder;
- // The nullable and serializable coder for the BoundedSource.
- @SuppressWarnings("rawtypes")
- private final Coder<BoundedSource> sourceCoder;
-
- CheckpointCoder(Coder<T> elemCoder) {
- this.elemsCoder = NullableCoder.of(
- ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder)));
- this.elemCoder = elemCoder;
- this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class));
- }
-
- @Override
- public void encode(Checkpoint<T> value, OutputStream outStream, Context context)
- throws CoderException, IOException {
- elemsCoder.encode(value.residualElements, outStream, context.nested());
- sourceCoder.encode(value.residualSource, outStream, context);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Checkpoint<T> decode(InputStream inStream, Context context)
- throws CoderException, IOException {
- return new Checkpoint<>(
- elemsCoder.decode(inStream, context.nested()),
- sourceCoder.decode(inStream, context));
- }
-
- @Override
- public List<Coder<?>> getCoderArguments() {
- return Arrays.<Coder<?>>asList(elemCoder);
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- throw new NonDeterministicException(this,
- "CheckpointCoder uses Java Serialization, which may be non-deterministic.");
- }
- }
-
- /**
- * An {@code UnboundedReader<T>} that wraps a {@code BoundedSource<T>} into
- * {@link ResidualElements} and {@link ResidualSource}.
- *
- * <p>In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains
- * the {@code BoundedSource<T>}. After the first checkpoint, the {@code BoundedSource<T>} will
- * be split into {@link ResidualElements} and {@link ResidualSource}.
- */
- @VisibleForTesting
- class Reader extends UnboundedReader<T> {
- private ResidualElements residualElements;
- private @Nullable ResidualSource residualSource;
- private final PipelineOptions options;
- private boolean done;
-
- Reader(
- @Nullable List<TimestampedValue<T>> residualElementsList,
- @Nullable BoundedSource<T> residualSource,
- PipelineOptions options) {
- init(residualElementsList, residualSource, options);
- this.options = checkNotNull(options, "options");
- this.done = false;
- }
-
- private void init(
- @Nullable List<TimestampedValue<T>> residualElementsList,
- @Nullable BoundedSource<T> residualSource,
- PipelineOptions options) {
- this.residualElements = residualElementsList == null
- ? new ResidualElements(Collections.<TimestampedValue<T>>emptyList())
- : new ResidualElements(residualElementsList);
- this.residualSource =
- residualSource == null ? null : new ResidualSource(residualSource, options);
- }
-
- @Override
- public boolean start() throws IOException {
- return advance();
- }
-
- @Override
- public boolean advance() throws IOException {
- if (residualElements.advance()) {
- return true;
- } else if (residualSource != null && residualSource.advance()) {
- return true;
- } else {
- done = true;
- return false;
- }
- }
-
- @Override
- public void close() throws IOException {
- if (residualSource != null) {
- residualSource.close();
- }
- }
-
- @Override
- public T getCurrent() throws NoSuchElementException {
- if (residualElements.hasCurrent()) {
- return residualElements.getCurrent();
- } else if (residualSource != null) {
- return residualSource.getCurrent();
- } else {
- throw new NoSuchElementException();
- }
- }
-
- @Override
- public Instant getCurrentTimestamp() throws NoSuchElementException {
- if (residualElements.hasCurrent()) {
- return residualElements.getCurrentTimestamp();
- } else if (residualSource != null) {
- return residualSource.getCurrentTimestamp();
- } else {
- throw new NoSuchElementException();
- }
- }
-
- @Override
- public Instant getWatermark() {
- return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE;
- }
-
- /**
- * {@inheritDoc}
- *
- * <p>If only part of the {@link ResidualElements} is consumed, the new
- * checkpoint will contain the remaining elements in {@link ResidualElements} and
- * the {@link ResidualSource}.
- *
- * <p>If all {@link ResidualElements} and part of the
- * {@link ResidualSource} are consumed, the new checkpoint is done by splitting
- * {@link ResidualSource} into new {@link ResidualElements} and {@link ResidualSource}.
- * {@link ResidualSource} is the source split from the current source,
- * and {@link ResidualElements} contains rest elements from the current source after
- * the splitting. For unsplittable source, it will put all remaining elements into
- * the {@link ResidualElements}.
- */
- @Override
- public Checkpoint<T> getCheckpointMark() {
- Checkpoint<T> newCheckpoint;
- if (!residualElements.done()) {
- // Part of residualElements are consumed.
- // Checkpoints the remaining elements and residualSource.
- newCheckpoint = new Checkpoint<>(
- residualElements.getRestElements(),
- residualSource == null ? null : residualSource.getSource());
- } else if (residualSource != null) {
- newCheckpoint = residualSource.getCheckpointMark();
- } else {
- newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */);
- }
- // Re-initialize since the residualElements and the residualSource might be
- // consumed or split by checkpointing.
- init(newCheckpoint.residualElements, newCheckpoint.residualSource, options);
- return newCheckpoint;
- }
-
- @Override
- public BoundedToUnboundedSourceAdapter<T> getCurrentSource() {
- return BoundedToUnboundedSourceAdapter.this;
- }
- }
-
- private class ResidualElements {
- private final List<TimestampedValue<T>> elementsList;
- private @Nullable Iterator<TimestampedValue<T>> elementsIterator;
- private @Nullable TimestampedValue<T> currentT;
- private boolean hasCurrent;
- private boolean done;
-
- ResidualElements(List<TimestampedValue<T>> residualElementsList) {
- this.elementsList = checkNotNull(residualElementsList, "residualElementsList");
- this.elementsIterator = null;
- this.currentT = null;
- this.hasCurrent = false;
- this.done = false;
- }
-
- public boolean advance() {
- if (elementsIterator == null) {
- elementsIterator = elementsList.iterator();
- }
- if (elementsIterator.hasNext()) {
- currentT = elementsIterator.next();
- hasCurrent = true;
- return true;
- } else {
- done = true;
- hasCurrent = false;
- return false;
- }
- }
-
- boolean hasCurrent() {
- return hasCurrent;
- }
-
- boolean done() {
- return done;
- }
-
- TimestampedValue<T> getCurrentTimestampedValue() {
- if (!hasCurrent) {
- throw new NoSuchElementException();
- }
- return currentT;
- }
-
- T getCurrent() {
- return getCurrentTimestampedValue().getValue();
- }
-
- Instant getCurrentTimestamp() {
- return getCurrentTimestampedValue().getTimestamp();
- }
-
- List<TimestampedValue<T>> getRestElements() {
- if (elementsIterator == null) {
- return elementsList;
- } else {
- List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
- while (elementsIterator.hasNext()) {
- newResidualElements.add(elementsIterator.next());
- }
- return newResidualElements;
- }
- }
- }
-
- private class ResidualSource {
- private BoundedSource<T> residualSource;
- private PipelineOptions options;
- private @Nullable BoundedReader<T> reader;
- private boolean closed;
- private boolean readerDone;
-
- public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) {
- this.residualSource = checkNotNull(residualSource, "residualSource");
- this.options = checkNotNull(options, "options");
- this.reader = null;
- this.closed = false;
- this.readerDone = false;
- }
-
- private boolean advance() throws IOException {
- checkArgument(!closed, "advance() call on closed %s", getClass().getName());
- if (readerDone) {
- return false;
- }
- if (reader == null) {
- reader = residualSource.createReader(options);
- readerDone = !reader.start();
- } else {
- readerDone = !reader.advance();
- }
- return !readerDone;
- }
-
- T getCurrent() throws NoSuchElementException {
- if (reader == null) {
- throw new NoSuchElementException();
- }
- return reader.getCurrent();
- }
-
- Instant getCurrentTimestamp() throws NoSuchElementException {
- if (reader == null) {
- throw new NoSuchElementException();
- }
- return reader.getCurrentTimestamp();
- }
-
- void close() throws IOException {
- if (reader != null) {
- reader.close();
- reader = null;
- }
- closed = true;
- }
-
- BoundedSource<T> getSource() {
- return residualSource;
- }
-
- Checkpoint<T> getCheckpointMark() {
- if (reader == null) {
- // Reader hasn't started, checkpoint the residualSource.
- return new Checkpoint<>(null /* residualElements */, residualSource);
- } else {
- // Part of residualSource are consumed.
- // Splits the residualSource and tracks the new residualElements in current source.
- BoundedSource<T> residualSplit = null;
- Double fractionConsumed = reader.getFractionConsumed();
- if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) {
- double fractionRest = 1 - fractionConsumed;
- int splitAttempts = 8;
- for (int i = 0; i < 8 && residualSplit == null; ++i) {
- double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts;
- residualSplit = reader.splitAtFraction(fractionToSplit);
- }
- }
- List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
- try {
- while (advance()) {
- newResidualElements.add(
- TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp()));
- }
- } catch (IOException e) {
- throw new RuntimeException("Failed to read elements from the bounded reader.", e);
- }
- return new Checkpoint<>(newResidualElements, residualSplit);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
deleted file mode 100644
index e1968cb..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Random;
-import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
-import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint;
-import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.CheckpointCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.CountingSource;
-import org.apache.beam.sdk.io.FileBasedSource;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Distinct;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Unit tests for {@link UnboundedReadFromBoundedSource}.
- */
-@RunWith(JUnit4.class)
-public class UnboundedReadFromBoundedSourceTest {
-
- @Rule
- public TemporaryFolder tmpFolder = new TemporaryFolder();
-
- @Rule
- public transient ExpectedException thrown = ExpectedException.none();
-
- @Rule
- public TestPipeline p = TestPipeline.create();
-
- @Test
- public void testCheckpointCoderNulls() throws Exception {
- CheckpointCoder<String> coder = new CheckpointCoder<>(StringUtf8Coder.of());
- Checkpoint<String> emptyCheckpoint = new Checkpoint<>(null, null);
- Checkpoint<String> decodedEmptyCheckpoint = CoderUtils.decodeFromByteArray(
- coder,
- CoderUtils.encodeToByteArray(coder, emptyCheckpoint));
- assertNull(decodedEmptyCheckpoint.getResidualElements());
- assertNull(decodedEmptyCheckpoint.getResidualSource());
- }
-
- @Test
- public void testCheckpointCoderIsSerializableWithWellKnownCoderType() throws Exception {
- CoderProperties.coderSerializable(new CheckpointCoder<>(GlobalWindow.Coder.INSTANCE));
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void testBoundedToUnboundedSourceAdapter() throws Exception {
- long numElements = 100;
- BoundedSource<Long> boundedSource = CountingSource.upTo(numElements);
- UnboundedSource<Long, Checkpoint<Long>> unboundedSource =
- new BoundedToUnboundedSourceAdapter<>(boundedSource);
-
- PCollection<Long> output =
- p.apply(Read.from(unboundedSource).withMaxNumRecords(numElements));
-
- // Count == numElements
- PAssert
- .thatSingleton(output.apply("Count", Count.<Long>globally()))
- .isEqualTo(numElements);
- // Unique count == numElements
- PAssert
- .thatSingleton(output.apply(Distinct.<Long>create())
- .apply("UniqueCount", Count.<Long>globally()))
- .isEqualTo(numElements);
- // Min == 0
- PAssert
- .thatSingleton(output.apply("Min", Min.<Long>globally()))
- .isEqualTo(0L);
- // Max == numElements-1
- PAssert
- .thatSingleton(output.apply("Max", Max.<Long>globally()))
- .isEqualTo(numElements - 1);
- p.run();
- }
-
- @Test
- public void testCountingSourceToUnboundedCheckpoint() throws Exception {
- long numElements = 100;
- BoundedSource<Long> countingSource = CountingSource.upTo(numElements);
- List<Long> expected = Lists.newArrayList();
- for (long i = 0; i < numElements; ++i) {
- expected.add(i);
- }
- testBoundedToUnboundedSourceAdapterCheckpoint(countingSource, expected);
- }
-
- @Test
- public void testUnsplittableSourceToUnboundedCheckpoint() throws Exception {
- String baseName = "test-input";
- File compressedFile = tmpFolder.newFile(baseName + ".gz");
- byte[] input = generateInput(100);
- writeFile(compressedFile, input);
-
- BoundedSource<Byte> source = new UnsplittableSource(compressedFile.getPath(), 1);
- List<Byte> expected = Lists.newArrayList();
- for (byte i : input) {
- expected.add(i);
- }
- testBoundedToUnboundedSourceAdapterCheckpoint(source, expected);
- }
-
- private <T> void testBoundedToUnboundedSourceAdapterCheckpoint(
- BoundedSource<T> boundedSource,
- List<T> expectedElements) throws Exception {
- BoundedToUnboundedSourceAdapter<T> unboundedSource =
- new BoundedToUnboundedSourceAdapter<>(boundedSource);
-
- PipelineOptions options = PipelineOptionsFactory.create();
- BoundedToUnboundedSourceAdapter<T>.Reader reader =
- unboundedSource.createReader(options, null);
-
- List<T> actual = Lists.newArrayList();
- for (boolean hasNext = reader.start(); hasNext; hasNext = reader.advance()) {
- actual.add(reader.getCurrent());
- // checkpoint every 9 elements
- if (actual.size() % 9 == 0) {
- Checkpoint<T> checkpoint = reader.getCheckpointMark();
- checkpoint.finalizeCheckpoint();
- }
- }
- Checkpoint<T> checkpointDone = reader.getCheckpointMark();
- assertTrue(checkpointDone.getResidualElements() == null
- || checkpointDone.getResidualElements().isEmpty());
-
- assertEquals(expectedElements.size(), actual.size());
- assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual));
- }
-
- @Test
- public void testCountingSourceToUnboundedCheckpointRestart() throws Exception {
- long numElements = 100;
- BoundedSource<Long> countingSource = CountingSource.upTo(numElements);
- List<Long> expected = Lists.newArrayList();
- for (long i = 0; i < numElements; ++i) {
- expected.add(i);
- }
- testBoundedToUnboundedSourceAdapterCheckpointRestart(countingSource, expected);
- }
-
- @Test
- public void testUnsplittableSourceToUnboundedCheckpointRestart() throws Exception {
- String baseName = "test-input";
- File compressedFile = tmpFolder.newFile(baseName + ".gz");
- byte[] input = generateInput(1000);
- writeFile(compressedFile, input);
-
- BoundedSource<Byte> source = new UnsplittableSource(compressedFile.getPath(), 1);
- List<Byte> expected = Lists.newArrayList();
- for (byte i : input) {
- expected.add(i);
- }
- testBoundedToUnboundedSourceAdapterCheckpointRestart(source, expected);
- }
-
- private <T> void testBoundedToUnboundedSourceAdapterCheckpointRestart(
- BoundedSource<T> boundedSource,
- List<T> expectedElements) throws Exception {
- BoundedToUnboundedSourceAdapter<T> unboundedSource =
- new BoundedToUnboundedSourceAdapter<>(boundedSource);
-
- PipelineOptions options = PipelineOptionsFactory.create();
- BoundedToUnboundedSourceAdapter<T>.Reader reader =
- unboundedSource.createReader(options, null);
-
- List<T> actual = Lists.newArrayList();
- for (boolean hasNext = reader.start(); hasNext;) {
- actual.add(reader.getCurrent());
- // checkpoint every 9 elements
- if (actual.size() % 9 == 0) {
- Checkpoint<T> checkpoint = reader.getCheckpointMark();
- Coder<Checkpoint<T>> checkpointCoder = unboundedSource.getCheckpointMarkCoder();
- Checkpoint<T> decodedCheckpoint = CoderUtils.decodeFromByteArray(
- checkpointCoder,
- CoderUtils.encodeToByteArray(checkpointCoder, checkpoint));
- reader.close();
- checkpoint.finalizeCheckpoint();
-
- BoundedToUnboundedSourceAdapter<T>.Reader restarted =
- unboundedSource.createReader(options, decodedCheckpoint);
- reader = restarted;
- hasNext = reader.start();
- } else {
- hasNext = reader.advance();
- }
- }
- Checkpoint<T> checkpointDone = reader.getCheckpointMark();
- assertTrue(checkpointDone.getResidualElements() == null
- || checkpointDone.getResidualElements().isEmpty());
-
- assertEquals(expectedElements.size(), actual.size());
- assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual));
- }
-
- @Test
- public void testReadBeforeStart() throws Exception {
- thrown.expect(NoSuchElementException.class);
-
- BoundedSource<Long> countingSource = CountingSource.upTo(100);
- BoundedToUnboundedSourceAdapter<Long> unboundedSource =
- new BoundedToUnboundedSourceAdapter<>(countingSource);
- PipelineOptions options = PipelineOptionsFactory.create();
-
- unboundedSource.createReader(options, null).getCurrent();
- }
-
- @Test
- public void testReadFromCheckpointBeforeStart() throws Exception {
- thrown.expect(NoSuchElementException.class);
-
- BoundedSource<Long> countingSource = CountingSource.upTo(100);
- BoundedToUnboundedSourceAdapter<Long> unboundedSource =
- new BoundedToUnboundedSourceAdapter<>(countingSource);
- PipelineOptions options = PipelineOptionsFactory.create();
-
- List<TimestampedValue<Long>> elements =
- ImmutableList.of(TimestampedValue.of(1L, new Instant(1L)));
- Checkpoint<Long> checkpoint = new Checkpoint<>(elements, countingSource);
- unboundedSource.createReader(options, checkpoint).getCurrent();
- }
-
- /**
- * Generate byte array of given size.
- */
- private static byte[] generateInput(int size) {
- // Arbitrary but fixed seed
- Random random = new Random(285930);
- byte[] buff = new byte[size];
- random.nextBytes(buff);
- return buff;
- }
-
- /**
- * Writes a single output file.
- */
- private static void writeFile(File file, byte[] input) throws IOException {
- try (OutputStream os = new FileOutputStream(file)) {
- os.write(input);
- }
- }
-
- /**
- * Unsplittable source for use in tests.
- */
- private static class UnsplittableSource extends FileBasedSource<Byte> {
- public UnsplittableSource(String fileOrPatternSpec, long minBundleSize) {
- super(fileOrPatternSpec, minBundleSize);
- }
-
- public UnsplittableSource(
- String fileName, long minBundleSize, long startOffset, long endOffset) {
- super(fileName, minBundleSize, startOffset, endOffset);
- }
-
- @Override
- protected FileBasedSource<Byte> createForSubrangeOfFile(String fileName, long start, long end) {
- return new UnsplittableSource(fileName, getMinBundleSize(), start, end);
- }
-
- @Override
- protected FileBasedReader<Byte> createSingleFileReader(PipelineOptions options) {
- return new UnsplittableReader(this);
- }
-
- @Override
- public Coder<Byte> getDefaultOutputCoder() {
- return SerializableCoder.of(Byte.class);
- }
-
- private static class UnsplittableReader extends FileBasedReader<Byte> {
- ByteBuffer buff = ByteBuffer.allocate(1);
- Byte current;
- long offset;
- ReadableByteChannel channel;
-
- public UnsplittableReader(UnsplittableSource source) {
- super(source);
- offset = source.getStartOffset() - 1;
- }
-
- @Override
- public Byte getCurrent() throws NoSuchElementException {
- return current;
- }
-
- @Override
- public boolean allowsDynamicSplitting() {
- return false;
- }
-
- @Override
- protected boolean isAtSplitPoint() {
- return true;
- }
-
- @Override
- protected void startReading(ReadableByteChannel channel) throws IOException {
- this.channel = channel;
- }
-
- @Override
- protected boolean readNextRecord() throws IOException {
- buff.clear();
- if (channel.read(buff) != 1) {
- return false;
- }
- current = buff.get(0);
- offset += 1;
- return true;
- }
-
- @Override
- protected long getCurrentOffset() {
- return offset;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index a2b715f..7212d4f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -64,6 +64,7 @@ import org.apache.beam.runners.core.construction.EmptyFlattenAsCreateFactory;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
import org.apache.beam.runners.core.construction.UnsupportedOverrideFactory;
import org.apache.beam.runners.dataflow.BatchViewOverrides.BatchCombineGloballyAsSingletonViewFactory;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
@@ -1193,7 +1194,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
public final PCollection<T> expand(PBegin input) {
source.validate();
- return Pipeline.applyTransform(input, new DataflowUnboundedReadFromBoundedSource<>(source))
+ return Pipeline.applyTransform(input, new UnboundedReadFromBoundedSource<>(source))
.setIsBoundedInternal(IsBounded.BOUNDED);
}
}
[3/3] beam git commit: This closes #2481
Posted by dh...@apache.org.
This closes #2481
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e0e39a97
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e0e39a97
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e0e39a97
Branch: refs/heads/master
Commit: e0e39a97560933f99eaee98549877fd5f8f49a52
Parents: e53f959 6622914
Author: Dan Halperin <dh...@google.com>
Authored: Mon Apr 10 10:16:06 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Apr 10 10:16:06 2017 -0700
----------------------------------------------------------------------
.../translation/ApexPipelineTranslator.java | 2 +-
runners/core-construction-java/pom.xml | 15 +
.../UnboundedReadFromBoundedSource.java | 542 ++++++++++++++++++
.../UnboundedReadFromBoundedSourceTest.java | 373 +++++++++++++
runners/core-java/pom.xml | 5 -
.../core/UnboundedReadFromBoundedSource.java | 542 ------------------
.../UnboundedReadFromBoundedSourceTest.java | 373 -------------
.../beam/runners/dataflow/DataflowRunner.java | 3 +-
.../DataflowUnboundedReadFromBoundedSource.java | 547 -------------------
...aflowUnboundedReadFromBoundedSourceTest.java | 79 ---
.../beam/runners/spark/TestSparkRunner.java | 2 +-
11 files changed, 934 insertions(+), 1549 deletions(-)
----------------------------------------------------------------------