You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/24 02:07:56 UTC
[1/2] incubator-beam git commit: Add UnboundedReadFromBoundedSource
Repository: incubator-beam
Updated Branches:
refs/heads/master 82ae661c5 -> 7745b921f
Add UnboundedReadFromBoundedSource
Adds one way to convert a BoundedSource to an UnboundedSource.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/11d9ec5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/11d9ec5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/11d9ec5e
Branch: refs/heads/master
Commit: 11d9ec5ebff4820b36db4b6ea4df7a0f79115ddd
Parents: 82ae661
Author: Pei He <pe...@google.com>
Authored: Mon May 9 15:59:58 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jun 23 19:06:30 2016 -0700
----------------------------------------------------------------------
runners/core-java/pom.xml | 25 +
.../core/UnboundedReadFromBoundedSource.java | 543 +++++++++++++++++++
.../UnboundedReadFromBoundedSourceTest.java | 365 +++++++++++++
.../sdk/io/BoundedReadFromUnboundedSource.java | 3 +-
.../org/apache/beam/sdk/io/UnboundedSource.java | 2 +-
5 files changed, 936 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11d9ec5e/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index 1587a1a..98d80bb 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -195,6 +195,31 @@
<artifactId>beam-sdks-java-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
<!-- build dependencies -->
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11d9ec5e/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
new file mode 100644
index 0000000..2b3d1c7
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}.
+ *
+ * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles},
+ * and element timestamps are propagated. While any elements remain, the watermark is the beginning
+ * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced
+ * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
+ *
+ * <p>Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner
+ * {@link BoundedSource}.
+ * Sources that cannot be split are read entirely into memory, so this transform does not work well
+ * with large, unsplittable sources.
+ *
+ * <p>This transform is intended to be used by a runner during pipeline translation to convert
+ * a Read.Bounded into a Read.Unbounded.
+ */
+public class UnboundedReadFromBoundedSource<T> extends PTransform<PInput, PCollection<T>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(UnboundedReadFromBoundedSource.class);
+
+ private final BoundedSource<T> source;
+
+ /**
+ * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}.
+ */
+ public UnboundedReadFromBoundedSource(BoundedSource<T> source) {
+ this.source = source;
+ }
+
+ @Override
+ public PCollection<T> apply(PInput input) {
+ return input.getPipeline().apply(
+ Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
+ }
+
+ @Override
+ protected Coder<T> getDefaultOutputCoder() {
+ return source.getDefaultOutputCoder();
+ }
+
+ @Override
+ public String getKindString() {
+ return "Read(" + approximateSimpleName(source.getClass()) + ")";
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ // We explicitly do not register base-class data, instead we use the delegate inner source.
+ builder
+ .add(DisplayData.item("source", source.getClass()))
+ .include(source);
+ }
+
+ /**
+ * A {@code BoundedSource} to {@code UnboundedSource} adapter.
+ */
+ @VisibleForTesting
+ public static class BoundedToUnboundedSourceAdapter<T>
+ extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>> {
+
+ private BoundedSource<T> boundedSource;
+
+ public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) {
+ this.boundedSource = boundedSource;
+ }
+
+ @Override
+ public void validate() {
+ boundedSource.validate();
+ }
+
+ @Override
+ public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits(
+ int desiredNumSplits, PipelineOptions options) throws Exception {
+ try {
+ long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits;
+ if (desiredBundleSize <= 0) {
+ LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.",
+ boundedSource);
+ return ImmutableList.of(this);
+ }
+ List<? extends BoundedSource<T>> splits
+ = boundedSource.splitIntoBundles(desiredBundleSize, options);
+ if (splits == null) {
+ LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource);
+ return ImmutableList.of(this);
+ }
+ return Lists.transform(
+ splits,
+ new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>() {
+ @Override
+ public BoundedToUnboundedSourceAdapter<T> apply(BoundedSource<T> input) {
+ return new BoundedToUnboundedSourceAdapter<>(input);
+ }});
+ } catch (Exception e) {
+ LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e);
+ return ImmutableList.of(this);
+ }
+ }
+
+ @Override
+ public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint)
+ throws IOException {
+ if (checkpoint == null) {
+ return new Reader(
+ Collections.<TimestampedValue<T>>emptyList() /* residualElements */,
+ boundedSource,
+ options);
+ } else {
+ return new Reader(checkpoint.residualElements, checkpoint.residualSource, options);
+ }
+ }
+
+ @Override
+ public Coder<T> getDefaultOutputCoder() {
+ return boundedSource.getDefaultOutputCoder();
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
+ return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
+ }
+
+ @VisibleForTesting
+ static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
+ private final List<TimestampedValue<T>> residualElements;
+ private final @Nullable BoundedSource<T> residualSource;
+
+ public Checkpoint(
+ List<TimestampedValue<T>> residualElements,
+ @Nullable BoundedSource<T> residualSource) {
+ this.residualElements = residualElements;
+ this.residualSource = residualSource;
+ }
+
+ @Override
+ public void finalizeCheckpoint() {}
+
+ @VisibleForTesting
+ List<TimestampedValue<T>> getResidualElements() {
+ return residualElements;
+ }
+
+ @VisibleForTesting
+ @Nullable BoundedSource<T> getResidualSource() {
+ return residualSource;
+ }
+ }
+
+ @VisibleForTesting
+ static class CheckpointCoder<T> extends StandardCoder<Checkpoint<T>> {
+
+ @JsonCreator
+ public static CheckpointCoder<?> of(
+ @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+ List<Coder<?>> components) {
+ checkArgument(components.size() == 1,
+ "Expecting 1 components, got %s", components.size());
+ return new CheckpointCoder<>(components.get(0));
+ }
+
+ // The coder for a list of residual elements and their timestamps
+ private final Coder<List<TimestampedValue<T>>> elemsCoder;
+ // The coder from the BoundedReader for coding each element
+ private final Coder<T> elemCoder;
+ // The nullable and serializable coder for the BoundedSource.
+ @SuppressWarnings("rawtypes")
+ private final Coder<BoundedSource> sourceCoder;
+
+ CheckpointCoder(Coder<T> elemCoder) {
+ this.elemsCoder = NullableCoder.of(
+ ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder)));
+ this.elemCoder = elemCoder;
+ this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class));
+ }
+
+ @Override
+ public void encode(Checkpoint<T> value, OutputStream outStream, Context context)
+ throws CoderException, IOException {
+ Context nested = context.nested();
+ elemsCoder.encode(value.residualElements, outStream, nested);
+ sourceCoder.encode(value.residualSource, outStream, nested);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Checkpoint<T> decode(InputStream inStream, Context context)
+ throws CoderException, IOException {
+ Context nested = context.nested();
+ return new Checkpoint<>(
+ elemsCoder.decode(inStream, nested),
+ sourceCoder.decode(inStream, nested));
+ }
+
+ @Override
+ public List<Coder<?>> getCoderArguments() {
+ return Arrays.<Coder<?>>asList(elemCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ throw new NonDeterministicException(this,
+ "CheckpointCoder uses Java Serialization, which may be non-deterministic.");
+ }
+ }
+
+ /**
+ * An {@code UnboundedReader<T>} that wraps a {@code BoundedSource<T>} into
+ * {@link ResidualElements} and {@link ResidualSource}.
+ *
+ * <p>In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains
+ * the {@code BoundedSource<T>}. After the first checkpoint, the {@code BoundedSource<T>} will
+ * be split into {@link ResidualElements} and {@link ResidualSource}.
+ */
+ @VisibleForTesting
+ class Reader extends UnboundedReader<T> {
+ private ResidualElements residualElements;
+ private @Nullable ResidualSource residualSource;
+ private final PipelineOptions options;
+ private boolean done;
+
+ Reader(
+ List<TimestampedValue<T>> residualElementsList,
+ @Nullable BoundedSource<T> residualSource,
+ PipelineOptions options) {
+ init(residualElementsList, residualSource, options);
+ this.options = checkNotNull(options, "options");
+ this.done = false;
+ }
+
+ private void init(
+ List<TimestampedValue<T>> residualElementsList,
+ @Nullable BoundedSource<T> residualSource,
+ PipelineOptions options) {
+ this.residualElements = new ResidualElements(residualElementsList);
+ this.residualSource =
+ residualSource == null ? null : new ResidualSource(residualSource, options);
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (residualElements.advance()) {
+ return true;
+ } else if (residualSource != null && residualSource.advance()) {
+ return true;
+ } else {
+ done = true;
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (residualSource != null) {
+ residualSource.close();
+ }
+ }
+
+ @Override
+ public T getCurrent() throws NoSuchElementException {
+ if (residualElements.hasCurrent()) {
+ return residualElements.getCurrent();
+ } else if (residualSource != null) {
+ return residualSource.getCurrent();
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ if (residualElements.hasCurrent()) {
+ return residualElements.getCurrentTimestamp();
+ } else if (residualSource != null) {
+ return residualSource.getCurrentTimestamp();
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public Instant getWatermark() {
+ return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p>If only part of the {@link ResidualElements} is consumed, the new
+ * checkpoint will contain the remaining elements in {@link ResidualElements} and
+ * the {@link ResidualSource}.
+ *
+ * <p>If all {@link ResidualElements} and part of the
+ * {@link ResidualSource} are consumed, the new checkpoint is done by splitting
+ * {@link ResidualSource} into new {@link ResidualElements} and {@link ResidualSource}.
+ * {@link ResidualSource} is the source split from the current source,
+ * and {@link ResidualElements} contains rest elements from the current source after
+ * the splitting. For unsplittable source, it will put all remaining elements into
+ * the {@link ResidualElements}.
+ */
+ @Override
+ public Checkpoint<T> getCheckpointMark() {
+ Checkpoint<T> newCheckpoint;
+ if (!residualElements.done()) {
+ // Part of residualElements are consumed.
+ // Checkpoints the remaining elements and residualSource.
+ newCheckpoint = new Checkpoint<>(
+ residualElements.getRestElements(),
+ residualSource == null ? null : residualSource.getSource());
+ } else if (residualSource != null) {
+ newCheckpoint = residualSource.getCheckpointMark();
+ } else {
+ newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */);
+ }
+ // Re-initialize since the residualElements and the residualSource might be
+ // consumed or split by checkpointing.
+ init(newCheckpoint.residualElements, newCheckpoint.residualSource, options);
+ return newCheckpoint;
+ }
+
+ @Override
+ public BoundedToUnboundedSourceAdapter<T> getCurrentSource() {
+ return BoundedToUnboundedSourceAdapter.this;
+ }
+ }
+
+ private class ResidualElements {
+ private final List<TimestampedValue<T>> elementsList;
+ private @Nullable Iterator<TimestampedValue<T>> elementsIterator;
+ private @Nullable TimestampedValue<T> currentT;
+ private boolean hasCurrent;
+ private boolean done;
+
+ ResidualElements(List<TimestampedValue<T>> residualElementsList) {
+ this.elementsList = checkNotNull(residualElementsList, "residualElementsList");
+ this.elementsIterator = null;
+ this.currentT = null;
+ this.hasCurrent = false;
+ this.done = false;
+ }
+
+ public boolean advance() {
+ if (elementsIterator == null) {
+ elementsIterator = elementsList.iterator();
+ }
+ if (elementsIterator.hasNext()) {
+ currentT = elementsIterator.next();
+ hasCurrent = true;
+ return true;
+ } else {
+ done = true;
+ hasCurrent = false;
+ return false;
+ }
+ }
+
+ boolean hasCurrent() {
+ return hasCurrent;
+ }
+
+ boolean done() {
+ return done;
+ }
+
+ TimestampedValue<T> getCurrentTimestampedValue() {
+ if (!hasCurrent) {
+ throw new NoSuchElementException();
+ }
+ return currentT;
+ }
+
+ T getCurrent() {
+ return getCurrentTimestampedValue().getValue();
+ }
+
+ Instant getCurrentTimestamp() {
+ return getCurrentTimestampedValue().getTimestamp();
+ }
+
+ List<TimestampedValue<T>> getRestElements() {
+ if (elementsIterator == null) {
+ return elementsList;
+ } else {
+ List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
+ while (elementsIterator.hasNext()) {
+ newResidualElements.add(elementsIterator.next());
+ }
+ return newResidualElements;
+ }
+ }
+ }
+
+ private class ResidualSource {
+ private BoundedSource<T> residualSource;
+ private PipelineOptions options;
+ private @Nullable BoundedReader<T> reader;
+ private boolean closed;
+
+ public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) {
+ this.residualSource = checkNotNull(residualSource, "residualSource");
+ this.options = checkNotNull(options, "options");
+ this.reader = null;
+ this.closed = false;
+ }
+
+ private boolean advance() throws IOException {
+ if (reader == null && !closed) {
+ reader = residualSource.createReader(options);
+ return reader.start();
+ } else {
+ return reader.advance();
+ }
+ }
+
+ T getCurrent() throws NoSuchElementException {
+ if (reader == null) {
+ throw new NoSuchElementException();
+ }
+ return reader.getCurrent();
+ }
+
+ Instant getCurrentTimestamp() throws NoSuchElementException {
+ if (reader == null) {
+ throw new NoSuchElementException();
+ }
+ return reader.getCurrentTimestamp();
+ }
+
+ void close() throws IOException {
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ closed = true;
+ }
+
+ BoundedSource<T> getSource() {
+ return residualSource;
+ }
+
+ Checkpoint<T> getCheckpointMark() {
+ if (reader == null) {
+ // Reader hasn't started, checkpoint the residualSource.
+ return new Checkpoint<>(null /* residualElements */, residualSource);
+ } else {
+ // Part of residualSource are consumed.
+ // Splits the residualSource and tracks the new residualElements in current source.
+ BoundedSource<T> residualSplit = null;
+ Double fractionConsumed = reader.getFractionConsumed();
+ if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) {
+ double fractionRest = 1 - fractionConsumed;
+ int splitAttempts = 8;
+ for (int i = 0; i < 8 && residualSplit == null; ++i) {
+ double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts;
+ residualSplit = reader.splitAtFraction(fractionToSplit);
+ }
+ }
+ List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
+ try {
+ while (advance()) {
+ newResidualElements.add(
+ TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp()));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read elements from the bounded reader.", e);
+ }
+ return new Checkpoint<>(newResidualElements, residualSplit);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11d9ec5e/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
new file mode 100644
index 0000000..afd0927
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
+import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint;
+import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.CheckpointCoder;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.Min;
+import org.apache.beam.sdk.transforms.RemoveDuplicates;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Random;
+
+/**
+ * Unit tests for {@link UnboundedReadFromBoundedSource}.
+ */
+@RunWith(JUnit4.class)
+public class UnboundedReadFromBoundedSourceTest {
+
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @Rule
+ public transient ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testCheckpointCoderNulls() throws Exception {
+ CheckpointCoder<String> coder = new CheckpointCoder<>(StringUtf8Coder.of());
+ Checkpoint<String> emptyCheckpoint = new Checkpoint<>(null, null);
+ Checkpoint<String> decodedEmptyCheckpoint = CoderUtils.decodeFromByteArray(
+ coder,
+ CoderUtils.encodeToByteArray(coder, emptyCheckpoint));
+ assertNull(decodedEmptyCheckpoint.getResidualElements());
+ assertNull(decodedEmptyCheckpoint.getResidualSource());
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testBoundedToUnboundedSourceAdapter() throws Exception {
+ long numElements = 100;
+ BoundedSource<Long> boundedSource = CountingSource.upTo(numElements);
+ UnboundedSource<Long, Checkpoint<Long>> unboundedSource =
+ new BoundedToUnboundedSourceAdapter<>(boundedSource);
+
+ Pipeline p = TestPipeline.create();
+
+ PCollection<Long> output =
+ p.apply(Read.from(unboundedSource).withMaxNumRecords(numElements));
+
+ // Count == numElements
+ PAssert
+ .thatSingleton(output.apply("Count", Count.<Long>globally()))
+ .isEqualTo(numElements);
+ // Unique count == numElements
+ PAssert
+ .thatSingleton(output.apply(RemoveDuplicates.<Long>create())
+ .apply("UniqueCount", Count.<Long>globally()))
+ .isEqualTo(numElements);
+ // Min == 0
+ PAssert
+ .thatSingleton(output.apply("Min", Min.<Long>globally()))
+ .isEqualTo(0L);
+ // Max == numElements-1
+ PAssert
+ .thatSingleton(output.apply("Max", Max.<Long>globally()))
+ .isEqualTo(numElements - 1);
+ p.run();
+ }
+
+ @Test
+ public void testCountingSourceToUnboundedCheckpoint() throws Exception {
+ long numElements = 100;
+ BoundedSource<Long> countingSource = CountingSource.upTo(numElements);
+ List<Long> expected = Lists.newArrayList();
+ for (long i = 0; i < numElements; ++i) {
+ expected.add(i);
+ }
+ testBoundedToUnboundedSourceAdapterCheckpoint(countingSource, expected);
+ }
+
+ @Test
+ public void testUnsplittableSourceToUnboundedCheckpoint() throws Exception {
+ String baseName = "test-input";
+ File compressedFile = tmpFolder.newFile(baseName + ".gz");
+ byte[] input = generateInput(100);
+ writeFile(compressedFile, input);
+
+ BoundedSource<Byte> source = new UnsplittableSource(compressedFile.getPath(), 1);
+ List<Byte> expected = Lists.newArrayList();
+ for (byte i : input) {
+ expected.add(i);
+ }
+ testBoundedToUnboundedSourceAdapterCheckpoint(source, expected);
+ }
+
+ private <T> void testBoundedToUnboundedSourceAdapterCheckpoint(
+ BoundedSource<T> boundedSource,
+ List<T> expectedElements) throws Exception {
+ BoundedToUnboundedSourceAdapter<T> unboundedSource =
+ new BoundedToUnboundedSourceAdapter<>(boundedSource);
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ BoundedToUnboundedSourceAdapter<T>.Reader reader =
+ unboundedSource.createReader(options, null);
+
+ List<T> actual = Lists.newArrayList();
+ for (boolean hasNext = reader.start(); hasNext; hasNext = reader.advance()) {
+ actual.add(reader.getCurrent());
+ // checkpoint every 9 elements
+ if (actual.size() % 9 == 0) {
+ Checkpoint<T> checkpoint = reader.getCheckpointMark();
+ checkpoint.finalizeCheckpoint();
+ }
+ }
+ assertEquals(expectedElements.size(), actual.size());
+ assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual));
+ }
+
+ @Test
+ public void testCountingSourceToUnboundedCheckpointRestart() throws Exception {
+ long numElements = 100;
+ BoundedSource<Long> countingSource = CountingSource.upTo(numElements);
+ List<Long> expected = Lists.newArrayList();
+ for (long i = 0; i < numElements; ++i) {
+ expected.add(i);
+ }
+ testBoundedToUnboundedSourceAdapterCheckpointRestart(countingSource, expected);
+ }
+
+ @Test
+ public void testUnsplittableSourceToUnboundedCheckpointRestart() throws Exception {
+ String baseName = "test-input";
+ File compressedFile = tmpFolder.newFile(baseName + ".gz");
+ byte[] input = generateInput(1000);
+ writeFile(compressedFile, input);
+
+ BoundedSource<Byte> source = new UnsplittableSource(compressedFile.getPath(), 1);
+ List<Byte> expected = Lists.newArrayList();
+ for (byte i : input) {
+ expected.add(i);
+ }
+ testBoundedToUnboundedSourceAdapterCheckpointRestart(source, expected);
+ }
+
+ private <T> void testBoundedToUnboundedSourceAdapterCheckpointRestart(
+ BoundedSource<T> boundedSource,
+ List<T> expectedElements) throws Exception {
+ BoundedToUnboundedSourceAdapter<T> unboundedSource =
+ new BoundedToUnboundedSourceAdapter<>(boundedSource);
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ BoundedToUnboundedSourceAdapter<T>.Reader reader =
+ unboundedSource.createReader(options, null);
+
+ List<T> actual = Lists.newArrayList();
+ for (boolean hasNext = reader.start(); hasNext;) {
+ actual.add(reader.getCurrent());
+ // checkpoint every 9 elements
+ if (actual.size() % 9 == 0) {
+ Checkpoint<T> checkpoint = reader.getCheckpointMark();
+ Coder<Checkpoint<T>> checkpointCoder = unboundedSource.getCheckpointMarkCoder();
+ Checkpoint<T> decodedCheckpoint = CoderUtils.decodeFromByteArray(
+ checkpointCoder,
+ CoderUtils.encodeToByteArray(checkpointCoder, checkpoint));
+ reader.close();
+ checkpoint.finalizeCheckpoint();
+
+ BoundedToUnboundedSourceAdapter<T>.Reader restarted =
+ unboundedSource.createReader(options, decodedCheckpoint);
+ reader = restarted;
+ hasNext = reader.start();
+ } else {
+ hasNext = reader.advance();
+ }
+ }
+ assertEquals(expectedElements.size(), actual.size());
+ assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual));
+ }
+
+ @Test
+ public void testReadBeforeStart() throws Exception {
+ thrown.expect(NoSuchElementException.class);
+
+ BoundedSource<Long> countingSource = CountingSource.upTo(100);
+ BoundedToUnboundedSourceAdapter<Long> unboundedSource =
+ new BoundedToUnboundedSourceAdapter<>(countingSource);
+ PipelineOptions options = PipelineOptionsFactory.create();
+
+ unboundedSource.createReader(options, null).getCurrent();
+ }
+
+ @Test
+ public void testReadFromCheckpointBeforeStart() throws Exception {
+ thrown.expect(NoSuchElementException.class);
+
+ BoundedSource<Long> countingSource = CountingSource.upTo(100);
+ BoundedToUnboundedSourceAdapter<Long> unboundedSource =
+ new BoundedToUnboundedSourceAdapter<>(countingSource);
+ PipelineOptions options = PipelineOptionsFactory.create();
+
+ List<TimestampedValue<Long>> elements =
+ ImmutableList.of(TimestampedValue.of(1L, new Instant(1L)));
+ Checkpoint<Long> checkpoint = new Checkpoint<>(elements, countingSource);
+ unboundedSource.createReader(options, checkpoint).getCurrent();
+ }
+
+ /**
+ * Generate byte array of given size.
+ */
+ private static byte[] generateInput(int size) {
+ // Arbitrary but fixed seed
+ Random random = new Random(285930);
+ byte[] buff = new byte[size];
+ random.nextBytes(buff);
+ return buff;
+ }
+
+ /**
+ * Writes a single output file.
+ */
+ private static void writeFile(File file, byte[] input) throws IOException {
+ try (OutputStream os = new FileOutputStream(file)) {
+ os.write(input);
+ }
+ }
+
+ /**
+ * Unsplittable source for use in tests.
+ */
+ private static class UnsplittableSource extends FileBasedSource<Byte> {
+ public UnsplittableSource(String fileOrPatternSpec, long minBundleSize) {
+ super(fileOrPatternSpec, minBundleSize);
+ }
+
+ public UnsplittableSource(
+ String fileName, long minBundleSize, long startOffset, long endOffset) {
+ super(fileName, minBundleSize, startOffset, endOffset);
+ }
+
+ @Override
+ protected FileBasedSource<Byte> createForSubrangeOfFile(String fileName, long start, long end) {
+ return new UnsplittableSource(fileName, getMinBundleSize(), start, end);
+ }
+
+ @Override
+ protected FileBasedReader<Byte> createSingleFileReader(PipelineOptions options) {
+ return new UnsplittableReader(this);
+ }
+
+ @Override
+ public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+ return false;
+ }
+
+ @Override
+ public Coder<Byte> getDefaultOutputCoder() {
+ return SerializableCoder.of(Byte.class);
+ }
+
+ private static class UnsplittableReader extends FileBasedReader<Byte> {
+ ByteBuffer buff = ByteBuffer.allocate(1);
+ Byte current;
+ long offset;
+ ReadableByteChannel channel;
+
+ public UnsplittableReader(UnsplittableSource source) {
+ super(source);
+ offset = source.getStartOffset() - 1;
+ }
+
+ @Override
+ public Byte getCurrent() throws NoSuchElementException {
+ return current;
+ }
+
+ @Override
+ public boolean allowsDynamicSplitting() {
+ return false;
+ }
+
+ @Override
+ protected boolean isAtSplitPoint() {
+ return true;
+ }
+
+ @Override
+ protected void startReading(ReadableByteChannel channel) throws IOException {
+ this.channel = channel;
+ }
+
+ @Override
+ protected boolean readNextRecord() throws IOException {
+ buff.clear();
+ if (channel.read(buff) != 1) {
+ return false;
+ }
+ current = new Byte(buff.get(0));
+ offset += 1;
+ return true;
+ }
+
+ @Override
+ protected long getCurrentOffset() {
+ return offset;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11d9ec5e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index 49b2ad4..ba13f9d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -197,7 +197,8 @@ class BoundedReadFromUnboundedSource<T> extends PTransform<PInput, PCollection<T
}
@Override
- public BoundedReader<ValueWithRecordId<T>> createReader(PipelineOptions options) {
+ public BoundedReader<ValueWithRecordId<T>> createReader(PipelineOptions options)
+ throws IOException {
return new Reader(source.createReader(options, null));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11d9ec5e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
index 2c4a325..ea3004e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
@@ -76,7 +76,7 @@ public abstract class UnboundedSource<
* checkpoint if present.
*/
public abstract UnboundedReader<OutputT> createReader(
- PipelineOptions options, @Nullable CheckpointMarkT checkpointMark);
+ PipelineOptions options, @Nullable CheckpointMarkT checkpointMark) throws IOException;
/**
* Returns a {@link Coder} for encoding and decoding the checkpoints for this source, or
[2/2] incubator-beam git commit: Closes #339
Posted by dh...@apache.org.
Closes #339
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7745b921
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7745b921
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7745b921
Branch: refs/heads/master
Commit: 7745b921f6c359111be1daadc9c6e139f958ce24
Parents: 82ae661 11d9ec5
Author: Dan Halperin <dh...@google.com>
Authored: Thu Jun 23 19:06:31 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jun 23 19:06:31 2016 -0700
----------------------------------------------------------------------
runners/core-java/pom.xml | 25 +
.../core/UnboundedReadFromBoundedSource.java | 543 +++++++++++++++++++
.../UnboundedReadFromBoundedSourceTest.java | 365 +++++++++++++
.../sdk/io/BoundedReadFromUnboundedSource.java | 3 +-
.../org/apache/beam/sdk/io/UnboundedSource.java | 2 +-
5 files changed, 936 insertions(+), 2 deletions(-)
----------------------------------------------------------------------