You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/10 17:16:12 UTC

[1/3] beam git commit: [BEAM-386] Move UnboundedReadFromBoundedSource to core-construction-java

Repository: beam
Updated Branches:
  refs/heads/master e53f959f9 -> e0e39a975


http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java
deleted file mode 100644
index cfb5ebc..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java
+++ /dev/null
@@ -1,547 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.NullableCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.NameUtils;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}.
- *
- * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles},
- * and element timestamps are propagated. While any elements remain, the watermark is the beginning
- * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced
- * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
- *
- * <p>Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner
- * {@link BoundedSource}.
- * Sources that cannot be split are read entirely into memory, so this transform does not work well
- * with large, unsplittable sources.
- *
- * <p>This transform is intended to be used by a runner during pipeline translation to convert
- * a Read.Bounded into a Read.Unbounded.
- *
- * @deprecated This class is copied from beam runners core in order to avoid pipeline construction
- * time dependency. It should be replaced in the dataflow worker as an execution time dependency.
- */
-@Deprecated
-class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PCollection<T>> {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(DataflowUnboundedReadFromBoundedSource.class);
-
-  private final BoundedSource<T> source;
-
-  /**
-   * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}.
-   */
-  public DataflowUnboundedReadFromBoundedSource(BoundedSource<T> source) {
-    this.source = source;
-  }
-
-  @Override
-  public PCollection<T> expand(PBegin input) {
-    return input.getPipeline().apply(
-        Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
-  }
-
-  @Override
-  protected Coder<T> getDefaultOutputCoder() {
-    return source.getDefaultOutputCoder();
-  }
-
-  @Override
-  public String getKindString() {
-    return String.format("Read(%s)", NameUtils.approximateSimpleName(source, "AnonymousSource"));
-  }
-
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-    // We explicitly do not register base-class data, instead we use the delegate inner source.
-    builder
-        .add(DisplayData.item("source", source.getClass()))
-        .include("source", source);
-  }
-
-  /**
-   * A {@code BoundedSource} to {@code UnboundedSource} adapter.
-   */
-  @VisibleForTesting
-  public static class BoundedToUnboundedSourceAdapter<T>
-      extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>> {
-
-    private BoundedSource<T> boundedSource;
-
-    public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) {
-      this.boundedSource = boundedSource;
-    }
-
-    @Override
-    public void validate() {
-      boundedSource.validate();
-    }
-
-    @Override
-    public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits(
-        int desiredNumSplits, PipelineOptions options) throws Exception {
-      try {
-        long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits;
-        if (desiredBundleSize <= 0) {
-          LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.",
-              boundedSource);
-          return ImmutableList.of(this);
-        }
-        List<? extends BoundedSource<T>> splits =
-            boundedSource.splitIntoBundles(desiredBundleSize, options);
-        if (splits == null) {
-          LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource);
-          return ImmutableList.of(this);
-        }
-        return Lists.transform(
-            splits,
-            new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>() {
-              @Override
-              public BoundedToUnboundedSourceAdapter<T> apply(BoundedSource<T> input) {
-                return new BoundedToUnboundedSourceAdapter<>(input);
-              }});
-      } catch (Exception e) {
-        LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e);
-        return ImmutableList.of(this);
-      }
-    }
-
-    @Override
-    public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint)
-        throws IOException {
-      if (checkpoint == null) {
-        return new Reader(null /* residualElements */, boundedSource, options);
-      } else {
-        return new Reader(checkpoint.residualElements, checkpoint.residualSource, options);
-      }
-    }
-
-    @Override
-    public Coder<T> getDefaultOutputCoder() {
-      return boundedSource.getDefaultOutputCoder();
-    }
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    @Override
-    public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
-      return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      builder.add(DisplayData.item("source", boundedSource.getClass()));
-      builder.include("source", boundedSource);
-    }
-
-    @VisibleForTesting
-    static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
-      private final @Nullable List<TimestampedValue<T>> residualElements;
-      private final @Nullable BoundedSource<T> residualSource;
-
-      public Checkpoint(
-          @Nullable List<TimestampedValue<T>> residualElements,
-          @Nullable BoundedSource<T> residualSource) {
-        this.residualElements = residualElements;
-        this.residualSource = residualSource;
-      }
-
-      @Override
-      public void finalizeCheckpoint() {}
-
-      @VisibleForTesting
-      @Nullable List<TimestampedValue<T>> getResidualElements() {
-        return residualElements;
-      }
-
-      @VisibleForTesting
-      @Nullable BoundedSource<T> getResidualSource() {
-        return residualSource;
-      }
-    }
-
-    @VisibleForTesting
-    static class CheckpointCoder<T> extends StandardCoder<Checkpoint<T>> {
-
-      @JsonCreator
-      public static CheckpointCoder<?> of(
-          @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-          List<Coder<?>> components) {
-        checkArgument(components.size() == 1,
-            "Expecting 1 components, got %s", components.size());
-        return new CheckpointCoder<>(components.get(0));
-      }
-
-      // The coder for a list of residual elements and their timestamps
-      private final Coder<List<TimestampedValue<T>>> elemsCoder;
-      // The coder from the BoundedReader for coding each element
-      private final Coder<T> elemCoder;
-      // The nullable and serializable coder for the BoundedSource.
-      @SuppressWarnings("rawtypes")
-      private final Coder<BoundedSource> sourceCoder;
-
-      CheckpointCoder(Coder<T> elemCoder) {
-        this.elemsCoder = NullableCoder.of(
-            ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder)));
-        this.elemCoder = elemCoder;
-        this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class));
-      }
-
-      @Override
-      public void encode(Checkpoint<T> value, OutputStream outStream, Context context)
-          throws CoderException, IOException {
-        elemsCoder.encode(value.residualElements, outStream, context.nested());
-        sourceCoder.encode(value.residualSource, outStream, context);
-      }
-
-      @SuppressWarnings("unchecked")
-      @Override
-      public Checkpoint<T> decode(InputStream inStream, Context context)
-          throws CoderException, IOException {
-        return new Checkpoint<>(
-            elemsCoder.decode(inStream, context.nested()),
-            sourceCoder.decode(inStream, context));
-      }
-
-      @Override
-      public List<Coder<?>> getCoderArguments() {
-        return Arrays.<Coder<?>>asList(elemCoder);
-      }
-
-      @Override
-      public void verifyDeterministic() throws NonDeterministicException {
-        throw new NonDeterministicException(this,
-            "CheckpointCoder uses Java Serialization, which may be non-deterministic.");
-      }
-    }
-
-    /**
-     * An {@code UnboundedReader<T>} that wraps a {@code BoundedSource<T>} into
-     * {@link ResidualElements} and {@link ResidualSource}.
-     *
-     * <p>In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains
-     * the {@code BoundedSource<T>}. After the first checkpoint, the {@code BoundedSource<T>} will
-     * be split into {@link ResidualElements} and {@link ResidualSource}.
-     */
-    @VisibleForTesting
-    class Reader extends UnboundedReader<T> {
-      private ResidualElements residualElements;
-      private @Nullable ResidualSource residualSource;
-      private final PipelineOptions options;
-      private boolean done;
-
-      Reader(
-          @Nullable List<TimestampedValue<T>> residualElementsList,
-          @Nullable BoundedSource<T> residualSource,
-          PipelineOptions options) {
-        init(residualElementsList, residualSource, options);
-        this.options = checkNotNull(options, "options");
-        this.done = false;
-      }
-
-      private void init(
-          @Nullable List<TimestampedValue<T>> residualElementsList,
-          @Nullable BoundedSource<T> residualSource,
-          PipelineOptions options) {
-        this.residualElements = residualElementsList == null
-            ? new ResidualElements(Collections.<TimestampedValue<T>>emptyList())
-                : new ResidualElements(residualElementsList);
-        this.residualSource =
-            residualSource == null ? null : new ResidualSource(residualSource, options);
-      }
-
-      @Override
-      public boolean start() throws IOException {
-        return advance();
-      }
-
-      @Override
-      public boolean advance() throws IOException {
-        if (residualElements.advance()) {
-          return true;
-        } else if (residualSource != null && residualSource.advance()) {
-          return true;
-        } else {
-          done = true;
-          return false;
-        }
-      }
-
-      @Override
-      public void close() throws IOException {
-        if (residualSource != null) {
-          residualSource.close();
-        }
-      }
-
-      @Override
-      public T getCurrent() throws NoSuchElementException {
-        if (residualElements.hasCurrent()) {
-          return residualElements.getCurrent();
-        } else if (residualSource != null) {
-          return residualSource.getCurrent();
-        } else {
-          throw new NoSuchElementException();
-        }
-      }
-
-      @Override
-      public Instant getCurrentTimestamp() throws NoSuchElementException {
-        if (residualElements.hasCurrent()) {
-          return residualElements.getCurrentTimestamp();
-        } else if (residualSource != null) {
-          return residualSource.getCurrentTimestamp();
-        } else {
-          throw new NoSuchElementException();
-        }
-      }
-
-      @Override
-      public Instant getWatermark() {
-        return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE;
-      }
-
-      /**
-       * {@inheritDoc}
-       *
-       * <p>If only part of the {@link ResidualElements} is consumed, the new
-       * checkpoint will contain the remaining elements in {@link ResidualElements} and
-       * the {@link ResidualSource}.
-       *
-       * <p>If all {@link ResidualElements} and part of the
-       * {@link ResidualSource} are consumed, the new checkpoint is done by splitting
-       * {@link ResidualSource} into new {@link ResidualElements} and {@link ResidualSource}.
-       * {@link ResidualSource} is the source split from the current source,
-       * and {@link ResidualElements} contains rest elements from the current source after
-       * the splitting. For unsplittable source, it will put all remaining elements into
-       * the {@link ResidualElements}.
-       */
-      @Override
-      public Checkpoint<T> getCheckpointMark() {
-        Checkpoint<T> newCheckpoint;
-        if (!residualElements.done()) {
-          // Part of residualElements are consumed.
-          // Checkpoints the remaining elements and residualSource.
-          newCheckpoint = new Checkpoint<>(
-              residualElements.getRestElements(),
-              residualSource == null ? null : residualSource.getSource());
-        } else if (residualSource != null) {
-          newCheckpoint = residualSource.getCheckpointMark();
-        } else {
-          newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */);
-        }
-        // Re-initialize since the residualElements and the residualSource might be
-        // consumed or split by checkpointing.
-        init(newCheckpoint.residualElements, newCheckpoint.residualSource, options);
-        return newCheckpoint;
-      }
-
-      @Override
-      public BoundedToUnboundedSourceAdapter<T> getCurrentSource() {
-        return BoundedToUnboundedSourceAdapter.this;
-      }
-    }
-
-    private class ResidualElements {
-      private final List<TimestampedValue<T>> elementsList;
-      private @Nullable Iterator<TimestampedValue<T>> elementsIterator;
-      private @Nullable TimestampedValue<T> currentT;
-      private boolean hasCurrent;
-      private boolean done;
-
-      ResidualElements(List<TimestampedValue<T>> residualElementsList) {
-        this.elementsList = checkNotNull(residualElementsList, "residualElementsList");
-        this.elementsIterator = null;
-        this.currentT = null;
-        this.hasCurrent = false;
-        this.done = false;
-      }
-
-      public boolean advance() {
-        if (elementsIterator == null) {
-          elementsIterator = elementsList.iterator();
-        }
-        if (elementsIterator.hasNext()) {
-          currentT = elementsIterator.next();
-          hasCurrent = true;
-          return true;
-        } else {
-          done = true;
-          hasCurrent = false;
-          return false;
-        }
-      }
-
-      boolean hasCurrent() {
-        return hasCurrent;
-      }
-
-      boolean done() {
-        return done;
-      }
-
-      TimestampedValue<T> getCurrentTimestampedValue() {
-        if (!hasCurrent) {
-          throw new NoSuchElementException();
-        }
-        return currentT;
-      }
-
-      T getCurrent() {
-        return getCurrentTimestampedValue().getValue();
-      }
-
-      Instant getCurrentTimestamp() {
-        return getCurrentTimestampedValue().getTimestamp();
-      }
-
-      List<TimestampedValue<T>> getRestElements() {
-        if (elementsIterator == null) {
-          return elementsList;
-        } else {
-          List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
-          while (elementsIterator.hasNext()) {
-            newResidualElements.add(elementsIterator.next());
-          }
-          return newResidualElements;
-        }
-      }
-    }
-
-    private class ResidualSource {
-      private BoundedSource<T> residualSource;
-      private PipelineOptions options;
-      private @Nullable BoundedReader<T> reader;
-      private boolean closed;
-
-      public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) {
-        this.residualSource = checkNotNull(residualSource, "residualSource");
-        this.options = checkNotNull(options, "options");
-        this.reader = null;
-        this.closed = false;
-      }
-
-      private boolean advance() throws IOException {
-        if (reader == null && !closed) {
-          reader = residualSource.createReader(options);
-          return reader.start();
-        } else {
-          return reader.advance();
-        }
-      }
-
-      T getCurrent() throws NoSuchElementException {
-        if (reader == null) {
-          throw new NoSuchElementException();
-        }
-        return reader.getCurrent();
-      }
-
-      Instant getCurrentTimestamp() throws NoSuchElementException {
-        if (reader == null) {
-          throw new NoSuchElementException();
-        }
-        return reader.getCurrentTimestamp();
-      }
-
-      void close() throws IOException {
-        if (reader != null) {
-          reader.close();
-          reader = null;
-        }
-        closed = true;
-      }
-
-      BoundedSource<T> getSource() {
-        return residualSource;
-      }
-
-      Checkpoint<T> getCheckpointMark() {
-        if (reader == null) {
-          // Reader hasn't started, checkpoint the residualSource.
-          return new Checkpoint<>(null /* residualElements */, residualSource);
-        } else {
-          // Part of residualSource are consumed.
-          // Splits the residualSource and tracks the new residualElements in current source.
-          BoundedSource<T> residualSplit = null;
-          Double fractionConsumed = reader.getFractionConsumed();
-          if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) {
-            double fractionRest = 1 - fractionConsumed;
-            int splitAttempts = 8;
-            for (int i = 0; i < 8 && residualSplit == null; ++i) {
-              double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts;
-              residualSplit = reader.splitAtFraction(fractionToSplit);
-            }
-          }
-          List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
-          try {
-            while (advance()) {
-              newResidualElements.add(
-                  TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp()));
-            }
-          } catch (IOException e) {
-            throw new RuntimeException("Failed to read elements from the bounded reader.", e);
-          }
-          return new Checkpoint<>(newResidualElements, residualSplit);
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSourceTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSourceTest.java
deleted file mode 100644
index c479332..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSourceTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.beam.runners.dataflow;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@see DataflowUnboundedReadFromBoundedSource}.
- */
-@RunWith(JUnit4.class)
-public class DataflowUnboundedReadFromBoundedSourceTest {
-  @Test
-  public void testKind() {
-    DataflowUnboundedReadFromBoundedSource<?> read = new
-        DataflowUnboundedReadFromBoundedSource<>(new NoopNamedSource());
-
-    assertEquals("Read(NoopNamedSource)", read.getKindString());
-  }
-
-  @Test
-  public void testKindAnonymousSource() {
-    NoopNamedSource anonSource = new NoopNamedSource() {};
-    DataflowUnboundedReadFromBoundedSource<?> read = new
-        DataflowUnboundedReadFromBoundedSource<>(anonSource);
-
-    assertEquals("Read(AnonymousSource)", read.getKindString());
-  }
-
-  /** Source implementation only useful for its identity. */
-  static class NoopNamedSource extends BoundedSource<String> {
-    @Override
-    public List<? extends BoundedSource<String>> splitIntoBundles(long desiredBundleSizeBytes,
-        PipelineOptions options) throws Exception {
-      return null;
-    }
-    @Override
-    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-      return 0;
-    }
-    @Override
-    public BoundedReader<String> createReader(
-        PipelineOptions options) throws IOException {
-      return null;
-    }
-    @Override
-    public void validate() {
-
-    }
-    @Override
-    public Coder<String> getDefaultOutputCoder() {
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index 988a82b..fcc00f9 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -31,9 +31,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import org.apache.beam.runners.core.UnboundedReadFromBoundedSource;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
 import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
 import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
 import org.apache.beam.runners.spark.stateful.SparkTimerInternals;


[2/3] beam git commit: [BEAM-386] Move UnboundedReadFromBoundedSource to core-construction-java

Posted by dh...@apache.org.
[BEAM-386] Move UnboundedReadFromBoundedSource to core-construction-java

Now that it's in the construction package, we can delete the
Dataflow-specific clone that we couldn't use from runners-core.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/66229142
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/66229142
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/66229142

Branch: refs/heads/master
Commit: 662291426af2c82c35085053fac0e9c2b845339f
Parents: e53f959
Author: Dan Halperin <dh...@google.com>
Authored: Mon Apr 10 07:08:32 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Apr 10 10:16:04 2017 -0700

----------------------------------------------------------------------
 .../translation/ApexPipelineTranslator.java     |   2 +-
 runners/core-construction-java/pom.xml          |  15 +
 .../UnboundedReadFromBoundedSource.java         | 542 ++++++++++++++++++
 .../UnboundedReadFromBoundedSourceTest.java     | 373 +++++++++++++
 runners/core-java/pom.xml                       |   5 -
 .../core/UnboundedReadFromBoundedSource.java    | 542 ------------------
 .../UnboundedReadFromBoundedSourceTest.java     | 373 -------------
 .../beam/runners/dataflow/DataflowRunner.java   |   3 +-
 .../DataflowUnboundedReadFromBoundedSource.java | 547 -------------------
 ...aflowUnboundedReadFromBoundedSourceTest.java |  79 ---
 .../beam/runners/spark/TestSparkRunner.java     |   2 +-
 11 files changed, 934 insertions(+), 1549 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
index 42ff144..fdeefc7 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
@@ -24,8 +24,8 @@ import java.util.Map;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView;
 import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
-import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
 import org.apache.beam.runners.core.construction.PrimitiveCreate;
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.runners.TransformHierarchy;

http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index 9619280..78b6819 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -60,6 +60,16 @@
     </dependency>
 
     <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>joda-time</groupId>
       <artifactId>joda-time</artifactId>
     </dependency>
@@ -69,6 +79,11 @@
       <artifactId>guava</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
     <!-- test dependencies -->
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
new file mode 100644
index 0000000..6b7bd71
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.NameUtils;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}.
+ *
+ * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles},
+ * and element timestamps are propagated. While any elements remain, the watermark is the beginning
+ * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced
+ * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
+ *
+ * <p>Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner
+ * {@link BoundedSource}.
+ * Sources that cannot be split are read entirely into memory, so this transform does not work well
+ * with large, unsplittable sources.
+ *
+ * <p>This transform is intended to be used by a runner during pipeline translation to convert
+ * a Read.Bounded into a Read.Unbounded.
+ */
+public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PCollection<T>> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(UnboundedReadFromBoundedSource.class);
+
+  private final BoundedSource<T> source;
+
+  /**
+   * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}.
+   */
+  public UnboundedReadFromBoundedSource(BoundedSource<T> source) {
+    this.source = source;
+  }
+
+  @Override
+  public PCollection<T> expand(PBegin input) {
+    return input.getPipeline().apply(
+        Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
+  }
+
+  @Override
+  protected Coder<T> getDefaultOutputCoder() {
+    return source.getDefaultOutputCoder();
+  }
+
+  @Override
+  public String getKindString() {
+    return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    // We explicitly do not register base-class data, instead we use the delegate inner source.
+    builder
+        .add(DisplayData.item("source", source.getClass()))
+        .include("source", source);
+  }
+
+  /**
+   * A {@code BoundedSource} to {@code UnboundedSource} adapter.
+   */
+  @VisibleForTesting
+  public static class BoundedToUnboundedSourceAdapter<T>
+      extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>> {
+
+    private BoundedSource<T> boundedSource;
+
+    public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) {
+      this.boundedSource = boundedSource;
+    }
+
+    @Override
+    public void validate() {
+      boundedSource.validate();
+    }
+
+    @Override
+    public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits(
+        int desiredNumSplits, PipelineOptions options) throws Exception {
+      try {
+        long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits;
+        if (desiredBundleSize <= 0) {
+          LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.",
+              boundedSource);
+          return ImmutableList.of(this);
+        }
+        List<? extends BoundedSource<T>> splits =
+            boundedSource.splitIntoBundles(desiredBundleSize, options);
+        if (splits == null) {
+          LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource);
+          return ImmutableList.of(this);
+        }
+        return Lists.transform(
+            splits,
+            new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>() {
+              @Override
+              public BoundedToUnboundedSourceAdapter<T> apply(BoundedSource<T> input) {
+                return new BoundedToUnboundedSourceAdapter<>(input);
+              }});
+      } catch (Exception e) {
+        LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e);
+        return ImmutableList.of(this);
+      }
+    }
+
+    @Override
+    public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint)
+        throws IOException {
+      if (checkpoint == null) {
+        return new Reader(null /* residualElements */, boundedSource, options);
+      } else {
+        return new Reader(checkpoint.residualElements, checkpoint.residualSource, options);
+      }
+    }
+
+    @Override
+    public Coder<T> getDefaultOutputCoder() {
+      return boundedSource.getDefaultOutputCoder();
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    @Override
+    public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
+      return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
+    }
+
+    @VisibleForTesting
+    static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
+      private final @Nullable List<TimestampedValue<T>> residualElements;
+      private final @Nullable BoundedSource<T> residualSource;
+
+      public Checkpoint(
+          @Nullable List<TimestampedValue<T>> residualElements,
+          @Nullable BoundedSource<T> residualSource) {
+        this.residualElements = residualElements;
+        this.residualSource = residualSource;
+      }
+
+      @Override
+      public void finalizeCheckpoint() {}
+
+      @VisibleForTesting
+      @Nullable List<TimestampedValue<T>> getResidualElements() {
+        return residualElements;
+      }
+
+      @VisibleForTesting
+      @Nullable BoundedSource<T> getResidualSource() {
+        return residualSource;
+      }
+    }
+
+    @VisibleForTesting
+    static class CheckpointCoder<T> extends StandardCoder<Checkpoint<T>> {
+
+      @JsonCreator
+      public static CheckpointCoder<?> of(
+          @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+          List<Coder<?>> components) {
+        checkArgument(components.size() == 1,
+            "Expecting 1 components, got %s", components.size());
+        return new CheckpointCoder<>(components.get(0));
+      }
+
+      // The coder for a list of residual elements and their timestamps
+      private final Coder<List<TimestampedValue<T>>> elemsCoder;
+      // The coder from the BoundedReader for coding each element
+      private final Coder<T> elemCoder;
+      // The nullable and serializable coder for the BoundedSource.
+      @SuppressWarnings("rawtypes")
+      private final Coder<BoundedSource> sourceCoder;
+
+      CheckpointCoder(Coder<T> elemCoder) {
+        this.elemsCoder = NullableCoder.of(
+            ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder)));
+        this.elemCoder = elemCoder;
+        this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class));
+      }
+
+      @Override
+      public void encode(Checkpoint<T> value, OutputStream outStream, Context context)
+          throws CoderException, IOException {
+        elemsCoder.encode(value.residualElements, outStream, context.nested());
+        sourceCoder.encode(value.residualSource, outStream, context);
+      }
+
+      @SuppressWarnings("unchecked")
+      @Override
+      public Checkpoint<T> decode(InputStream inStream, Context context)
+          throws CoderException, IOException {
+        return new Checkpoint<>(
+            elemsCoder.decode(inStream, context.nested()),
+            sourceCoder.decode(inStream, context));
+      }
+
+      @Override
+      public List<Coder<?>> getCoderArguments() {
+        return Arrays.<Coder<?>>asList(elemCoder);
+      }
+
+      @Override
+      public void verifyDeterministic() throws NonDeterministicException {
+        throw new NonDeterministicException(this,
+            "CheckpointCoder uses Java Serialization, which may be non-deterministic.");
+      }
+    }
+
+    /**
+     * An {@code UnboundedReader<T>} that wraps a {@code BoundedSource<T>} into
+     * {@link ResidualElements} and {@link ResidualSource}.
+     *
+     * <p>In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains
+     * the {@code BoundedSource<T>}. After the first checkpoint, the {@code BoundedSource<T>} will
+     * be split into {@link ResidualElements} and {@link ResidualSource}.
+     */
+    @VisibleForTesting
+    class Reader extends UnboundedReader<T> {
+      private ResidualElements residualElements;
+      private @Nullable ResidualSource residualSource;
+      private final PipelineOptions options;
+      private boolean done;
+
+      Reader(
+          @Nullable List<TimestampedValue<T>> residualElementsList,
+          @Nullable BoundedSource<T> residualSource,
+          PipelineOptions options) {
+        init(residualElementsList, residualSource, options);
+        this.options = checkNotNull(options, "options");
+        this.done = false;
+      }
+
+      private void init(
+          @Nullable List<TimestampedValue<T>> residualElementsList,
+          @Nullable BoundedSource<T> residualSource,
+          PipelineOptions options) {
+        this.residualElements = residualElementsList == null
+            ? new ResidualElements(Collections.<TimestampedValue<T>>emptyList())
+                : new ResidualElements(residualElementsList);
+        this.residualSource =
+            residualSource == null ? null : new ResidualSource(residualSource, options);
+      }
+
+      @Override
+      public boolean start() throws IOException {
+        return advance();
+      }
+
+      @Override
+      public boolean advance() throws IOException {
+        if (residualElements.advance()) {
+          return true;
+        } else if (residualSource != null && residualSource.advance()) {
+          return true;
+        } else {
+          done = true;
+          return false;
+        }
+      }
+
+      @Override
+      public void close() throws IOException {
+        if (residualSource != null) {
+          residualSource.close();
+        }
+      }
+
+      @Override
+      public T getCurrent() throws NoSuchElementException {
+        if (residualElements.hasCurrent()) {
+          return residualElements.getCurrent();
+        } else if (residualSource != null) {
+          return residualSource.getCurrent();
+        } else {
+          throw new NoSuchElementException();
+        }
+      }
+
+      @Override
+      public Instant getCurrentTimestamp() throws NoSuchElementException {
+        if (residualElements.hasCurrent()) {
+          return residualElements.getCurrentTimestamp();
+        } else if (residualSource != null) {
+          return residualSource.getCurrentTimestamp();
+        } else {
+          throw new NoSuchElementException();
+        }
+      }
+
+      @Override
+      public Instant getWatermark() {
+        return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE;
+      }
+
+      /**
+       * {@inheritDoc}
+       *
+       * <p>If only part of the {@link ResidualElements} is consumed, the new
+       * checkpoint will contain the remaining elements in {@link ResidualElements} and
+       * the {@link ResidualSource}.
+       *
+       * <p>If all {@link ResidualElements} and part of the
+       * {@link ResidualSource} are consumed, the new checkpoint is done by splitting
+       * {@link ResidualSource} into new {@link ResidualElements} and {@link ResidualSource}.
+       * {@link ResidualSource} is the source split from the current source,
+       * and {@link ResidualElements} contains rest elements from the current source after
+       * the splitting. For unsplittable source, it will put all remaining elements into
+       * the {@link ResidualElements}.
+       */
+      @Override
+      public Checkpoint<T> getCheckpointMark() {
+        Checkpoint<T> newCheckpoint;
+        if (!residualElements.done()) {
+          // Part of residualElements are consumed.
+          // Checkpoints the remaining elements and residualSource.
+          newCheckpoint = new Checkpoint<>(
+              residualElements.getRestElements(),
+              residualSource == null ? null : residualSource.getSource());
+        } else if (residualSource != null) {
+          newCheckpoint = residualSource.getCheckpointMark();
+        } else {
+          newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */);
+        }
+        // Re-initialize since the residualElements and the residualSource might be
+        // consumed or split by checkpointing.
+        init(newCheckpoint.residualElements, newCheckpoint.residualSource, options);
+        return newCheckpoint;
+      }
+
+      @Override
+      public BoundedToUnboundedSourceAdapter<T> getCurrentSource() {
+        return BoundedToUnboundedSourceAdapter.this;
+      }
+    }
+
+    private class ResidualElements {
+      private final List<TimestampedValue<T>> elementsList;
+      private @Nullable Iterator<TimestampedValue<T>> elementsIterator;
+      private @Nullable TimestampedValue<T> currentT;
+      private boolean hasCurrent;
+      private boolean done;
+
+      ResidualElements(List<TimestampedValue<T>> residualElementsList) {
+        this.elementsList = checkNotNull(residualElementsList, "residualElementsList");
+        this.elementsIterator = null;
+        this.currentT = null;
+        this.hasCurrent = false;
+        this.done = false;
+      }
+
+      public boolean advance() {
+        if (elementsIterator == null) {
+          elementsIterator = elementsList.iterator();
+        }
+        if (elementsIterator.hasNext()) {
+          currentT = elementsIterator.next();
+          hasCurrent = true;
+          return true;
+        } else {
+          done = true;
+          hasCurrent = false;
+          return false;
+        }
+      }
+
+      boolean hasCurrent() {
+        return hasCurrent;
+      }
+
+      boolean done() {
+        return done;
+      }
+
+      TimestampedValue<T> getCurrentTimestampedValue() {
+        if (!hasCurrent) {
+          throw new NoSuchElementException();
+        }
+        return currentT;
+      }
+
+      T getCurrent() {
+        return getCurrentTimestampedValue().getValue();
+      }
+
+      Instant getCurrentTimestamp() {
+        return getCurrentTimestampedValue().getTimestamp();
+      }
+
+      List<TimestampedValue<T>> getRestElements() {
+        if (elementsIterator == null) {
+          return elementsList;
+        } else {
+          List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
+          while (elementsIterator.hasNext()) {
+            newResidualElements.add(elementsIterator.next());
+          }
+          return newResidualElements;
+        }
+      }
+    }
+
+    private class ResidualSource {
+      private BoundedSource<T> residualSource;
+      private PipelineOptions options;
+      private @Nullable BoundedReader<T> reader;
+      private boolean closed;
+      private boolean readerDone;
+
+      public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) {
+        this.residualSource = checkNotNull(residualSource, "residualSource");
+        this.options = checkNotNull(options, "options");
+        this.reader = null;
+        this.closed = false;
+        this.readerDone = false;
+      }
+
+      private boolean advance() throws IOException {
+        checkArgument(!closed, "advance() call on closed %s", getClass().getName());
+        if (readerDone) {
+          return false;
+        }
+        if (reader == null) {
+          reader = residualSource.createReader(options);
+          readerDone = !reader.start();
+        } else {
+          readerDone = !reader.advance();
+        }
+        return !readerDone;
+      }
+
+      T getCurrent() throws NoSuchElementException {
+        if (reader == null) {
+          throw new NoSuchElementException();
+        }
+        return reader.getCurrent();
+      }
+
+      Instant getCurrentTimestamp() throws NoSuchElementException {
+        if (reader == null) {
+          throw new NoSuchElementException();
+        }
+        return reader.getCurrentTimestamp();
+      }
+
+      void close() throws IOException {
+        if (reader != null) {
+          reader.close();
+          reader = null;
+        }
+        closed = true;
+      }
+
+      BoundedSource<T> getSource() {
+        return residualSource;
+      }
+
+      Checkpoint<T> getCheckpointMark() {
+        if (reader == null) {
+          // Reader hasn't started, checkpoint the residualSource.
+          return new Checkpoint<>(null /* residualElements */, residualSource);
+        } else {
+          // Part of residualSource are consumed.
+          // Splits the residualSource and tracks the new residualElements in current source.
+          BoundedSource<T> residualSplit = null;
+          Double fractionConsumed = reader.getFractionConsumed();
+          if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) {
+            double fractionRest = 1 - fractionConsumed;
+            int splitAttempts = 8;
+            for (int i = 0; i < 8 && residualSplit == null; ++i) {
+              double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts;
+              residualSplit = reader.splitAtFraction(fractionToSplit);
+            }
+          }
+          List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
+          try {
+            while (advance()) {
+              newResidualElements.add(
+                  TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp()));
+            }
+          } catch (IOException e) {
+            throw new RuntimeException("Failed to read elements from the bounded reader.", e);
+          }
+          return new Checkpoint<>(newResidualElements, residualSplit);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
new file mode 100644
index 0000000..c905cf5
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint;
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.CheckpointCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Distinct;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.Min;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Unit tests for {@link UnboundedReadFromBoundedSource}.
+ */
+@RunWith(JUnit4.class)
+public class UnboundedReadFromBoundedSourceTest {
+
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Rule
+  public transient ExpectedException thrown = ExpectedException.none();
+
+  @Rule
+  public TestPipeline p = TestPipeline.create();
+
+  @Test
+  public void testCheckpointCoderNulls() throws Exception {
+    CheckpointCoder<String> coder = new CheckpointCoder<>(StringUtf8Coder.of());
+    Checkpoint<String> emptyCheckpoint = new Checkpoint<>(null, null);
+    Checkpoint<String> decodedEmptyCheckpoint = CoderUtils.decodeFromByteArray(
+        coder,
+        CoderUtils.encodeToByteArray(coder, emptyCheckpoint));
+    assertNull(decodedEmptyCheckpoint.getResidualElements());
+    assertNull(decodedEmptyCheckpoint.getResidualSource());
+  }
+
+  @Test
+  public void testCheckpointCoderIsSerializableWithWellKnownCoderType() throws Exception {
+    CoderProperties.coderSerializable(new CheckpointCoder<>(GlobalWindow.Coder.INSTANCE));
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testBoundedToUnboundedSourceAdapter() throws Exception {
+    long numElements = 100;
+    BoundedSource<Long> boundedSource = CountingSource.upTo(numElements);
+    UnboundedSource<Long, Checkpoint<Long>> unboundedSource =
+        new BoundedToUnboundedSourceAdapter<>(boundedSource);
+
+    PCollection<Long> output =
+        p.apply(Read.from(unboundedSource).withMaxNumRecords(numElements));
+
+    // Count == numElements
+    PAssert
+      .thatSingleton(output.apply("Count", Count.<Long>globally()))
+      .isEqualTo(numElements);
+    // Unique count == numElements
+    PAssert
+      .thatSingleton(output.apply(Distinct.<Long>create())
+                          .apply("UniqueCount", Count.<Long>globally()))
+      .isEqualTo(numElements);
+    // Min == 0
+    PAssert
+      .thatSingleton(output.apply("Min", Min.<Long>globally()))
+      .isEqualTo(0L);
+    // Max == numElements-1
+    PAssert
+      .thatSingleton(output.apply("Max", Max.<Long>globally()))
+      .isEqualTo(numElements - 1);
+    p.run();
+  }
+
+  @Test
+  public void testCountingSourceToUnboundedCheckpoint() throws Exception {
+    long numElements = 100;
+    BoundedSource<Long> countingSource = CountingSource.upTo(numElements);
+    List<Long> expected = Lists.newArrayList();
+    for (long i = 0; i < numElements; ++i) {
+      expected.add(i);
+    }
+    testBoundedToUnboundedSourceAdapterCheckpoint(countingSource, expected);
+  }
+
+  @Test
+  public void testUnsplittableSourceToUnboundedCheckpoint() throws Exception {
+    String baseName = "test-input";
+    File compressedFile = tmpFolder.newFile(baseName + ".gz");
+    byte[] input = generateInput(100);
+    writeFile(compressedFile, input);
+
+    BoundedSource<Byte> source = new UnsplittableSource(compressedFile.getPath(), 1);
+    List<Byte> expected = Lists.newArrayList();
+    for (byte i : input) {
+      expected.add(i);
+    }
+    testBoundedToUnboundedSourceAdapterCheckpoint(source, expected);
+  }
+
+  private <T> void testBoundedToUnboundedSourceAdapterCheckpoint(
+      BoundedSource<T> boundedSource,
+      List<T> expectedElements) throws Exception {
+    BoundedToUnboundedSourceAdapter<T> unboundedSource =
+        new BoundedToUnboundedSourceAdapter<>(boundedSource);
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    BoundedToUnboundedSourceAdapter<T>.Reader reader =
+        unboundedSource.createReader(options, null);
+
+    List<T> actual = Lists.newArrayList();
+    for (boolean hasNext = reader.start(); hasNext; hasNext = reader.advance()) {
+      actual.add(reader.getCurrent());
+      // checkpoint every 9 elements
+      if (actual.size() % 9 == 0) {
+        Checkpoint<T> checkpoint = reader.getCheckpointMark();
+        checkpoint.finalizeCheckpoint();
+      }
+    }
+    Checkpoint<T> checkpointDone = reader.getCheckpointMark();
+    assertTrue(checkpointDone.getResidualElements() == null
+        || checkpointDone.getResidualElements().isEmpty());
+
+    assertEquals(expectedElements.size(), actual.size());
+    assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual));
+  }
+
+  @Test
+  public void testCountingSourceToUnboundedCheckpointRestart() throws Exception {
+    long numElements = 100;
+    BoundedSource<Long> countingSource = CountingSource.upTo(numElements);
+    List<Long> expected = Lists.newArrayList();
+    for (long i = 0; i < numElements; ++i) {
+      expected.add(i);
+    }
+    testBoundedToUnboundedSourceAdapterCheckpointRestart(countingSource, expected);
+  }
+
+  @Test
+  public void testUnsplittableSourceToUnboundedCheckpointRestart() throws Exception {
+    String baseName = "test-input";
+    File compressedFile = tmpFolder.newFile(baseName + ".gz");
+    byte[] input = generateInput(1000);
+    writeFile(compressedFile, input);
+
+    BoundedSource<Byte> source = new UnsplittableSource(compressedFile.getPath(), 1);
+    List<Byte> expected = Lists.newArrayList();
+    for (byte i : input) {
+      expected.add(i);
+    }
+    testBoundedToUnboundedSourceAdapterCheckpointRestart(source, expected);
+  }
+
+  private <T> void testBoundedToUnboundedSourceAdapterCheckpointRestart(
+      BoundedSource<T> boundedSource,
+      List<T> expectedElements) throws Exception {
+    BoundedToUnboundedSourceAdapter<T> unboundedSource =
+        new BoundedToUnboundedSourceAdapter<>(boundedSource);
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    BoundedToUnboundedSourceAdapter<T>.Reader reader =
+        unboundedSource.createReader(options, null);
+
+    List<T> actual = Lists.newArrayList();
+    for (boolean hasNext = reader.start(); hasNext;) {
+      actual.add(reader.getCurrent());
+      // checkpoint every 9 elements
+      if (actual.size() % 9 == 0) {
+        Checkpoint<T> checkpoint = reader.getCheckpointMark();
+        Coder<Checkpoint<T>> checkpointCoder = unboundedSource.getCheckpointMarkCoder();
+        Checkpoint<T> decodedCheckpoint = CoderUtils.decodeFromByteArray(
+            checkpointCoder,
+            CoderUtils.encodeToByteArray(checkpointCoder, checkpoint));
+        reader.close();
+        checkpoint.finalizeCheckpoint();
+
+        BoundedToUnboundedSourceAdapter<T>.Reader restarted =
+            unboundedSource.createReader(options, decodedCheckpoint);
+        reader = restarted;
+        hasNext = reader.start();
+      } else {
+        hasNext = reader.advance();
+      }
+    }
+    Checkpoint<T> checkpointDone = reader.getCheckpointMark();
+    assertTrue(checkpointDone.getResidualElements() == null
+        || checkpointDone.getResidualElements().isEmpty());
+
+    assertEquals(expectedElements.size(), actual.size());
+    assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual));
+  }
+
+  @Test
+  public void testReadBeforeStart() throws Exception {
+    thrown.expect(NoSuchElementException.class);
+
+    BoundedSource<Long> countingSource = CountingSource.upTo(100);
+    BoundedToUnboundedSourceAdapter<Long> unboundedSource =
+        new BoundedToUnboundedSourceAdapter<>(countingSource);
+    PipelineOptions options = PipelineOptionsFactory.create();
+
+    unboundedSource.createReader(options, null).getCurrent();
+  }
+
+  @Test
+  public void testReadFromCheckpointBeforeStart() throws Exception {
+    thrown.expect(NoSuchElementException.class);
+
+    BoundedSource<Long> countingSource = CountingSource.upTo(100);
+    BoundedToUnboundedSourceAdapter<Long> unboundedSource =
+        new BoundedToUnboundedSourceAdapter<>(countingSource);
+    PipelineOptions options = PipelineOptionsFactory.create();
+
+    List<TimestampedValue<Long>> elements =
+        ImmutableList.of(TimestampedValue.of(1L, new Instant(1L)));
+    Checkpoint<Long> checkpoint = new Checkpoint<>(elements, countingSource);
+    unboundedSource.createReader(options, checkpoint).getCurrent();
+  }
+
+  /**
+   * Generate byte array of given size.
+   */
+  private static byte[] generateInput(int size) {
+    // Arbitrary but fixed seed
+    Random random = new Random(285930);
+    byte[] buff = new byte[size];
+    random.nextBytes(buff);
+    return buff;
+  }
+
+  /**
+   * Writes a single output file.
+   */
+  private static void writeFile(File file, byte[] input) throws IOException {
+    try (OutputStream os = new FileOutputStream(file)) {
+      os.write(input);
+    }
+  }
+
+  /**
+   * Unsplittable source for use in tests.
+   */
+  private static class UnsplittableSource extends FileBasedSource<Byte> {
+    public UnsplittableSource(String fileOrPatternSpec, long minBundleSize) {
+      super(fileOrPatternSpec, minBundleSize);
+    }
+
+    public UnsplittableSource(
+        String fileName, long minBundleSize, long startOffset, long endOffset) {
+      super(fileName, minBundleSize, startOffset, endOffset);
+    }
+
+    @Override
+    protected FileBasedSource<Byte> createForSubrangeOfFile(String fileName, long start, long end) {
+      return new UnsplittableSource(fileName, getMinBundleSize(), start, end);
+    }
+
+    @Override
+    protected FileBasedReader<Byte> createSingleFileReader(PipelineOptions options) {
+      return new UnsplittableReader(this);
+    }
+
+    @Override
+    public Coder<Byte> getDefaultOutputCoder() {
+      return SerializableCoder.of(Byte.class);
+    }
+
+    private static class UnsplittableReader extends FileBasedReader<Byte> {
+      ByteBuffer buff = ByteBuffer.allocate(1);
+      Byte current;
+      long offset;
+      ReadableByteChannel channel;
+
+      public UnsplittableReader(UnsplittableSource source) {
+        super(source);
+        offset = source.getStartOffset() - 1;
+      }
+
+      @Override
+      public Byte getCurrent() throws NoSuchElementException {
+        return current;
+      }
+
+      @Override
+      public boolean allowsDynamicSplitting() {
+        return false;
+      }
+
+      @Override
+      protected boolean isAtSplitPoint() {
+        return true;
+      }
+
+      @Override
+      protected void startReading(ReadableByteChannel channel) throws IOException {
+        this.channel = channel;
+      }
+
+      @Override
+      protected boolean readNextRecord() throws IOException {
+        buff.clear();
+        if (channel.read(buff) != 1) {
+          return false;
+        }
+        current = buff.get(0);
+        offset += 1;
+        return true;
+      }
+
+      @Override
+      protected long getCurrentOffset() {
+        return offset;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index 631e8c0..affd1a9 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -105,11 +105,6 @@
       <artifactId>joda-time</artifactId>
     </dependency>
 
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-    </dependency>
-
     <!-- test dependencies -->
 
     <!-- Utilities such as WindowMatchers -->

http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
deleted file mode 100644
index 0c173a0..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
+++ /dev/null
@@ -1,542 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.NullableCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.NameUtils;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}.
- *
- * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles},
- * and element timestamps are propagated. While any elements remain, the watermark is the beginning
- * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced
- * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
- *
- * <p>Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner
- * {@link BoundedSource}.
- * Sources that cannot be split are read entirely into memory, so this transform does not work well
- * with large, unsplittable sources.
- *
- * <p>This transform is intended to be used by a runner during pipeline translation to convert
- * a Read.Bounded into a Read.Unbounded.
- */
-public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PCollection<T>> {
-
-  private static final Logger LOG = LoggerFactory.getLogger(UnboundedReadFromBoundedSource.class);
-
-  private final BoundedSource<T> source;
-
-  /**
-   * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}.
-   */
-  public UnboundedReadFromBoundedSource(BoundedSource<T> source) {
-    this.source = source;
-  }
-
-  @Override
-  public PCollection<T> expand(PBegin input) {
-    return input.getPipeline().apply(
-        Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
-  }
-
-  @Override
-  protected Coder<T> getDefaultOutputCoder() {
-    return source.getDefaultOutputCoder();
-  }
-
-  @Override
-  public String getKindString() {
-    return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
-  }
-
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-    // We explicitly do not register base-class data, instead we use the delegate inner source.
-    builder
-        .add(DisplayData.item("source", source.getClass()))
-        .include("source", source);
-  }
-
-  /**
-   * A {@code BoundedSource} to {@code UnboundedSource} adapter.
-   */
-  @VisibleForTesting
-  public static class BoundedToUnboundedSourceAdapter<T>
-      extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>> {
-
-    private BoundedSource<T> boundedSource;
-
-    public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) {
-      this.boundedSource = boundedSource;
-    }
-
-    @Override
-    public void validate() {
-      boundedSource.validate();
-    }
-
-    @Override
-    public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits(
-        int desiredNumSplits, PipelineOptions options) throws Exception {
-      try {
-        long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits;
-        if (desiredBundleSize <= 0) {
-          LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.",
-              boundedSource);
-          return ImmutableList.of(this);
-        }
-        List<? extends BoundedSource<T>> splits =
-            boundedSource.splitIntoBundles(desiredBundleSize, options);
-        if (splits == null) {
-          LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource);
-          return ImmutableList.of(this);
-        }
-        return Lists.transform(
-            splits,
-            new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>() {
-              @Override
-              public BoundedToUnboundedSourceAdapter<T> apply(BoundedSource<T> input) {
-                return new BoundedToUnboundedSourceAdapter<>(input);
-              }});
-      } catch (Exception e) {
-        LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e);
-        return ImmutableList.of(this);
-      }
-    }
-
-    @Override
-    public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint)
-        throws IOException {
-      if (checkpoint == null) {
-        return new Reader(null /* residualElements */, boundedSource, options);
-      } else {
-        return new Reader(checkpoint.residualElements, checkpoint.residualSource, options);
-      }
-    }
-
-    @Override
-    public Coder<T> getDefaultOutputCoder() {
-      return boundedSource.getDefaultOutputCoder();
-    }
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    @Override
-    public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
-      return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
-    }
-
-    @VisibleForTesting
-    static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
-      private final @Nullable List<TimestampedValue<T>> residualElements;
-      private final @Nullable BoundedSource<T> residualSource;
-
-      public Checkpoint(
-          @Nullable List<TimestampedValue<T>> residualElements,
-          @Nullable BoundedSource<T> residualSource) {
-        this.residualElements = residualElements;
-        this.residualSource = residualSource;
-      }
-
-      @Override
-      public void finalizeCheckpoint() {}
-
-      @VisibleForTesting
-      @Nullable List<TimestampedValue<T>> getResidualElements() {
-        return residualElements;
-      }
-
-      @VisibleForTesting
-      @Nullable BoundedSource<T> getResidualSource() {
-        return residualSource;
-      }
-    }
-
-    @VisibleForTesting
-    static class CheckpointCoder<T> extends StandardCoder<Checkpoint<T>> {
-
-      @JsonCreator
-      public static CheckpointCoder<?> of(
-          @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-          List<Coder<?>> components) {
-        checkArgument(components.size() == 1,
-            "Expecting 1 components, got %s", components.size());
-        return new CheckpointCoder<>(components.get(0));
-      }
-
-      // The coder for a list of residual elements and their timestamps
-      private final Coder<List<TimestampedValue<T>>> elemsCoder;
-      // The coder from the BoundedReader for coding each element
-      private final Coder<T> elemCoder;
-      // The nullable and serializable coder for the BoundedSource.
-      @SuppressWarnings("rawtypes")
-      private final Coder<BoundedSource> sourceCoder;
-
-      CheckpointCoder(Coder<T> elemCoder) {
-        this.elemsCoder = NullableCoder.of(
-            ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder)));
-        this.elemCoder = elemCoder;
-        this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class));
-      }
-
-      @Override
-      public void encode(Checkpoint<T> value, OutputStream outStream, Context context)
-          throws CoderException, IOException {
-        elemsCoder.encode(value.residualElements, outStream, context.nested());
-        sourceCoder.encode(value.residualSource, outStream, context);
-      }
-
-      @SuppressWarnings("unchecked")
-      @Override
-      public Checkpoint<T> decode(InputStream inStream, Context context)
-          throws CoderException, IOException {
-        return new Checkpoint<>(
-            elemsCoder.decode(inStream, context.nested()),
-            sourceCoder.decode(inStream, context));
-      }
-
-      @Override
-      public List<Coder<?>> getCoderArguments() {
-        return Arrays.<Coder<?>>asList(elemCoder);
-      }
-
-      @Override
-      public void verifyDeterministic() throws NonDeterministicException {
-        throw new NonDeterministicException(this,
-            "CheckpointCoder uses Java Serialization, which may be non-deterministic.");
-      }
-    }
-
-    /**
-     * An {@code UnboundedReader<T>} that wraps a {@code BoundedSource<T>} into
-     * {@link ResidualElements} and {@link ResidualSource}.
-     *
-     * <p>In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains
-     * the {@code BoundedSource<T>}. After the first checkpoint, the {@code BoundedSource<T>} will
-     * be split into {@link ResidualElements} and {@link ResidualSource}.
-     */
-    @VisibleForTesting
-    class Reader extends UnboundedReader<T> {
-      private ResidualElements residualElements;
-      private @Nullable ResidualSource residualSource;
-      private final PipelineOptions options;
-      private boolean done;
-
-      Reader(
-          @Nullable List<TimestampedValue<T>> residualElementsList,
-          @Nullable BoundedSource<T> residualSource,
-          PipelineOptions options) {
-        init(residualElementsList, residualSource, options);
-        this.options = checkNotNull(options, "options");
-        this.done = false;
-      }
-
-      private void init(
-          @Nullable List<TimestampedValue<T>> residualElementsList,
-          @Nullable BoundedSource<T> residualSource,
-          PipelineOptions options) {
-        this.residualElements = residualElementsList == null
-            ? new ResidualElements(Collections.<TimestampedValue<T>>emptyList())
-                : new ResidualElements(residualElementsList);
-        this.residualSource =
-            residualSource == null ? null : new ResidualSource(residualSource, options);
-      }
-
-      @Override
-      public boolean start() throws IOException {
-        return advance();
-      }
-
-      @Override
-      public boolean advance() throws IOException {
-        if (residualElements.advance()) {
-          return true;
-        } else if (residualSource != null && residualSource.advance()) {
-          return true;
-        } else {
-          done = true;
-          return false;
-        }
-      }
-
-      @Override
-      public void close() throws IOException {
-        if (residualSource != null) {
-          residualSource.close();
-        }
-      }
-
-      @Override
-      public T getCurrent() throws NoSuchElementException {
-        if (residualElements.hasCurrent()) {
-          return residualElements.getCurrent();
-        } else if (residualSource != null) {
-          return residualSource.getCurrent();
-        } else {
-          throw new NoSuchElementException();
-        }
-      }
-
-      @Override
-      public Instant getCurrentTimestamp() throws NoSuchElementException {
-        if (residualElements.hasCurrent()) {
-          return residualElements.getCurrentTimestamp();
-        } else if (residualSource != null) {
-          return residualSource.getCurrentTimestamp();
-        } else {
-          throw new NoSuchElementException();
-        }
-      }
-
-      @Override
-      public Instant getWatermark() {
-        return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE;
-      }
-
-      /**
-       * {@inheritDoc}
-       *
-       * <p>If only part of the {@link ResidualElements} is consumed, the new
-       * checkpoint will contain the remaining elements in {@link ResidualElements} and
-       * the {@link ResidualSource}.
-       *
-       * <p>If all {@link ResidualElements} and part of the
-       * {@link ResidualSource} are consumed, the new checkpoint is done by splitting
-       * {@link ResidualSource} into new {@link ResidualElements} and {@link ResidualSource}.
-       * {@link ResidualSource} is the source split from the current source,
-       * and {@link ResidualElements} contains rest elements from the current source after
-       * the splitting. For unsplittable source, it will put all remaining elements into
-       * the {@link ResidualElements}.
-       */
-      @Override
-      public Checkpoint<T> getCheckpointMark() {
-        Checkpoint<T> newCheckpoint;
-        if (!residualElements.done()) {
-          // Part of residualElements are consumed.
-          // Checkpoints the remaining elements and residualSource.
-          newCheckpoint = new Checkpoint<>(
-              residualElements.getRestElements(),
-              residualSource == null ? null : residualSource.getSource());
-        } else if (residualSource != null) {
-          newCheckpoint = residualSource.getCheckpointMark();
-        } else {
-          newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */);
-        }
-        // Re-initialize since the residualElements and the residualSource might be
-        // consumed or split by checkpointing.
-        init(newCheckpoint.residualElements, newCheckpoint.residualSource, options);
-        return newCheckpoint;
-      }
-
-      @Override
-      public BoundedToUnboundedSourceAdapter<T> getCurrentSource() {
-        return BoundedToUnboundedSourceAdapter.this;
-      }
-    }
-
-    private class ResidualElements {
-      private final List<TimestampedValue<T>> elementsList;
-      private @Nullable Iterator<TimestampedValue<T>> elementsIterator;
-      private @Nullable TimestampedValue<T> currentT;
-      private boolean hasCurrent;
-      private boolean done;
-
-      ResidualElements(List<TimestampedValue<T>> residualElementsList) {
-        this.elementsList = checkNotNull(residualElementsList, "residualElementsList");
-        this.elementsIterator = null;
-        this.currentT = null;
-        this.hasCurrent = false;
-        this.done = false;
-      }
-
-      public boolean advance() {
-        if (elementsIterator == null) {
-          elementsIterator = elementsList.iterator();
-        }
-        if (elementsIterator.hasNext()) {
-          currentT = elementsIterator.next();
-          hasCurrent = true;
-          return true;
-        } else {
-          done = true;
-          hasCurrent = false;
-          return false;
-        }
-      }
-
-      boolean hasCurrent() {
-        return hasCurrent;
-      }
-
-      boolean done() {
-        return done;
-      }
-
-      TimestampedValue<T> getCurrentTimestampedValue() {
-        if (!hasCurrent) {
-          throw new NoSuchElementException();
-        }
-        return currentT;
-      }
-
-      T getCurrent() {
-        return getCurrentTimestampedValue().getValue();
-      }
-
-      Instant getCurrentTimestamp() {
-        return getCurrentTimestampedValue().getTimestamp();
-      }
-
-      List<TimestampedValue<T>> getRestElements() {
-        if (elementsIterator == null) {
-          return elementsList;
-        } else {
-          List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
-          while (elementsIterator.hasNext()) {
-            newResidualElements.add(elementsIterator.next());
-          }
-          return newResidualElements;
-        }
-      }
-    }
-
-    private class ResidualSource {
-      private BoundedSource<T> residualSource;
-      private PipelineOptions options;
-      private @Nullable BoundedReader<T> reader;
-      private boolean closed;
-      private boolean readerDone;
-
-      public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) {
-        this.residualSource = checkNotNull(residualSource, "residualSource");
-        this.options = checkNotNull(options, "options");
-        this.reader = null;
-        this.closed = false;
-        this.readerDone = false;
-      }
-
-      private boolean advance() throws IOException {
-        checkArgument(!closed, "advance() call on closed %s", getClass().getName());
-        if (readerDone) {
-          return false;
-        }
-        if (reader == null) {
-          reader = residualSource.createReader(options);
-          readerDone = !reader.start();
-        } else {
-          readerDone = !reader.advance();
-        }
-        return !readerDone;
-      }
-
-      T getCurrent() throws NoSuchElementException {
-        if (reader == null) {
-          throw new NoSuchElementException();
-        }
-        return reader.getCurrent();
-      }
-
-      Instant getCurrentTimestamp() throws NoSuchElementException {
-        if (reader == null) {
-          throw new NoSuchElementException();
-        }
-        return reader.getCurrentTimestamp();
-      }
-
-      void close() throws IOException {
-        if (reader != null) {
-          reader.close();
-          reader = null;
-        }
-        closed = true;
-      }
-
-      BoundedSource<T> getSource() {
-        return residualSource;
-      }
-
-      Checkpoint<T> getCheckpointMark() {
-        if (reader == null) {
-          // Reader hasn't started, checkpoint the residualSource.
-          return new Checkpoint<>(null /* residualElements */, residualSource);
-        } else {
-          // Part of residualSource are consumed.
-          // Splits the residualSource and tracks the new residualElements in current source.
-          BoundedSource<T> residualSplit = null;
-          Double fractionConsumed = reader.getFractionConsumed();
-          if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) {
-            double fractionRest = 1 - fractionConsumed;
-            int splitAttempts = 8;
-            for (int i = 0; i < 8 && residualSplit == null; ++i) {
-              double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts;
-              residualSplit = reader.splitAtFraction(fractionToSplit);
-            }
-          }
-          List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
-          try {
-            while (advance()) {
-              newResidualElements.add(
-                  TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp()));
-            }
-          } catch (IOException e) {
-            throw new RuntimeException("Failed to read elements from the bounded reader.", e);
-          }
-          return new Checkpoint<>(newResidualElements, residualSplit);
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
deleted file mode 100644
index e1968cb..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Random;
-import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
-import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint;
-import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.CheckpointCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.CountingSource;
-import org.apache.beam.sdk.io.FileBasedSource;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Distinct;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Unit tests for {@link UnboundedReadFromBoundedSource}.
- */
-@RunWith(JUnit4.class)
-public class UnboundedReadFromBoundedSourceTest {
-
-  @Rule
-  public TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  @Rule
-  public transient ExpectedException thrown = ExpectedException.none();
-
-  @Rule
-  public TestPipeline p = TestPipeline.create();
-
-  @Test
-  public void testCheckpointCoderNulls() throws Exception {
-    CheckpointCoder<String> coder = new CheckpointCoder<>(StringUtf8Coder.of());
-    Checkpoint<String> emptyCheckpoint = new Checkpoint<>(null, null);
-    Checkpoint<String> decodedEmptyCheckpoint = CoderUtils.decodeFromByteArray(
-        coder,
-        CoderUtils.encodeToByteArray(coder, emptyCheckpoint));
-    assertNull(decodedEmptyCheckpoint.getResidualElements());
-    assertNull(decodedEmptyCheckpoint.getResidualSource());
-  }
-
-  @Test
-  public void testCheckpointCoderIsSerializableWithWellKnownCoderType() throws Exception {
-    CoderProperties.coderSerializable(new CheckpointCoder<>(GlobalWindow.Coder.INSTANCE));
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testBoundedToUnboundedSourceAdapter() throws Exception {
-    long numElements = 100;
-    BoundedSource<Long> boundedSource = CountingSource.upTo(numElements);
-    UnboundedSource<Long, Checkpoint<Long>> unboundedSource =
-        new BoundedToUnboundedSourceAdapter<>(boundedSource);
-
-    PCollection<Long> output =
-        p.apply(Read.from(unboundedSource).withMaxNumRecords(numElements));
-
-    // Count == numElements
-    PAssert
-      .thatSingleton(output.apply("Count", Count.<Long>globally()))
-      .isEqualTo(numElements);
-    // Unique count == numElements
-    PAssert
-      .thatSingleton(output.apply(Distinct.<Long>create())
-                          .apply("UniqueCount", Count.<Long>globally()))
-      .isEqualTo(numElements);
-    // Min == 0
-    PAssert
-      .thatSingleton(output.apply("Min", Min.<Long>globally()))
-      .isEqualTo(0L);
-    // Max == numElements-1
-    PAssert
-      .thatSingleton(output.apply("Max", Max.<Long>globally()))
-      .isEqualTo(numElements - 1);
-    p.run();
-  }
-
-  @Test
-  public void testCountingSourceToUnboundedCheckpoint() throws Exception {
-    long numElements = 100;
-    BoundedSource<Long> countingSource = CountingSource.upTo(numElements);
-    List<Long> expected = Lists.newArrayList();
-    for (long i = 0; i < numElements; ++i) {
-      expected.add(i);
-    }
-    testBoundedToUnboundedSourceAdapterCheckpoint(countingSource, expected);
-  }
-
-  @Test
-  public void testUnsplittableSourceToUnboundedCheckpoint() throws Exception {
-    String baseName = "test-input";
-    File compressedFile = tmpFolder.newFile(baseName + ".gz");
-    byte[] input = generateInput(100);
-    writeFile(compressedFile, input);
-
-    BoundedSource<Byte> source = new UnsplittableSource(compressedFile.getPath(), 1);
-    List<Byte> expected = Lists.newArrayList();
-    for (byte i : input) {
-      expected.add(i);
-    }
-    testBoundedToUnboundedSourceAdapterCheckpoint(source, expected);
-  }
-
-  private <T> void testBoundedToUnboundedSourceAdapterCheckpoint(
-      BoundedSource<T> boundedSource,
-      List<T> expectedElements) throws Exception {
-    BoundedToUnboundedSourceAdapter<T> unboundedSource =
-        new BoundedToUnboundedSourceAdapter<>(boundedSource);
-
-    PipelineOptions options = PipelineOptionsFactory.create();
-    BoundedToUnboundedSourceAdapter<T>.Reader reader =
-        unboundedSource.createReader(options, null);
-
-    List<T> actual = Lists.newArrayList();
-    for (boolean hasNext = reader.start(); hasNext; hasNext = reader.advance()) {
-      actual.add(reader.getCurrent());
-      // checkpoint every 9 elements
-      if (actual.size() % 9 == 0) {
-        Checkpoint<T> checkpoint = reader.getCheckpointMark();
-        checkpoint.finalizeCheckpoint();
-      }
-    }
-    Checkpoint<T> checkpointDone = reader.getCheckpointMark();
-    assertTrue(checkpointDone.getResidualElements() == null
-        || checkpointDone.getResidualElements().isEmpty());
-
-    assertEquals(expectedElements.size(), actual.size());
-    assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual));
-  }
-
-  @Test
-  public void testCountingSourceToUnboundedCheckpointRestart() throws Exception {
-    long numElements = 100;
-    BoundedSource<Long> countingSource = CountingSource.upTo(numElements);
-    List<Long> expected = Lists.newArrayList();
-    for (long i = 0; i < numElements; ++i) {
-      expected.add(i);
-    }
-    testBoundedToUnboundedSourceAdapterCheckpointRestart(countingSource, expected);
-  }
-
-  @Test
-  public void testUnsplittableSourceToUnboundedCheckpointRestart() throws Exception {
-    String baseName = "test-input";
-    File compressedFile = tmpFolder.newFile(baseName + ".gz");
-    byte[] input = generateInput(1000);
-    writeFile(compressedFile, input);
-
-    BoundedSource<Byte> source = new UnsplittableSource(compressedFile.getPath(), 1);
-    List<Byte> expected = Lists.newArrayList();
-    for (byte i : input) {
-      expected.add(i);
-    }
-    testBoundedToUnboundedSourceAdapterCheckpointRestart(source, expected);
-  }
-
-  private <T> void testBoundedToUnboundedSourceAdapterCheckpointRestart(
-      BoundedSource<T> boundedSource,
-      List<T> expectedElements) throws Exception {
-    BoundedToUnboundedSourceAdapter<T> unboundedSource =
-        new BoundedToUnboundedSourceAdapter<>(boundedSource);
-
-    PipelineOptions options = PipelineOptionsFactory.create();
-    BoundedToUnboundedSourceAdapter<T>.Reader reader =
-        unboundedSource.createReader(options, null);
-
-    List<T> actual = Lists.newArrayList();
-    for (boolean hasNext = reader.start(); hasNext;) {
-      actual.add(reader.getCurrent());
-      // checkpoint every 9 elements
-      if (actual.size() % 9 == 0) {
-        Checkpoint<T> checkpoint = reader.getCheckpointMark();
-        Coder<Checkpoint<T>> checkpointCoder = unboundedSource.getCheckpointMarkCoder();
-        Checkpoint<T> decodedCheckpoint = CoderUtils.decodeFromByteArray(
-            checkpointCoder,
-            CoderUtils.encodeToByteArray(checkpointCoder, checkpoint));
-        reader.close();
-        checkpoint.finalizeCheckpoint();
-
-        BoundedToUnboundedSourceAdapter<T>.Reader restarted =
-            unboundedSource.createReader(options, decodedCheckpoint);
-        reader = restarted;
-        hasNext = reader.start();
-      } else {
-        hasNext = reader.advance();
-      }
-    }
-    Checkpoint<T> checkpointDone = reader.getCheckpointMark();
-    assertTrue(checkpointDone.getResidualElements() == null
-        || checkpointDone.getResidualElements().isEmpty());
-
-    assertEquals(expectedElements.size(), actual.size());
-    assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual));
-  }
-
-  @Test
-  public void testReadBeforeStart() throws Exception {
-    thrown.expect(NoSuchElementException.class);
-
-    BoundedSource<Long> countingSource = CountingSource.upTo(100);
-    BoundedToUnboundedSourceAdapter<Long> unboundedSource =
-        new BoundedToUnboundedSourceAdapter<>(countingSource);
-    PipelineOptions options = PipelineOptionsFactory.create();
-
-    unboundedSource.createReader(options, null).getCurrent();
-  }
-
-  @Test
-  public void testReadFromCheckpointBeforeStart() throws Exception {
-    thrown.expect(NoSuchElementException.class);
-
-    BoundedSource<Long> countingSource = CountingSource.upTo(100);
-    BoundedToUnboundedSourceAdapter<Long> unboundedSource =
-        new BoundedToUnboundedSourceAdapter<>(countingSource);
-    PipelineOptions options = PipelineOptionsFactory.create();
-
-    List<TimestampedValue<Long>> elements =
-        ImmutableList.of(TimestampedValue.of(1L, new Instant(1L)));
-    Checkpoint<Long> checkpoint = new Checkpoint<>(elements, countingSource);
-    unboundedSource.createReader(options, checkpoint).getCurrent();
-  }
-
-  /**
-   * Generate byte array of given size.
-   */
-  private static byte[] generateInput(int size) {
-    // Arbitrary but fixed seed
-    Random random = new Random(285930);
-    byte[] buff = new byte[size];
-    random.nextBytes(buff);
-    return buff;
-  }
-
-  /**
-   * Writes a single output file.
-   */
-  private static void writeFile(File file, byte[] input) throws IOException {
-    try (OutputStream os = new FileOutputStream(file)) {
-      os.write(input);
-    }
-  }
-
-  /**
-   * Unsplittable source for use in tests.
-   */
-  private static class UnsplittableSource extends FileBasedSource<Byte> {
-    public UnsplittableSource(String fileOrPatternSpec, long minBundleSize) {
-      super(fileOrPatternSpec, minBundleSize);
-    }
-
-    public UnsplittableSource(
-        String fileName, long minBundleSize, long startOffset, long endOffset) {
-      super(fileName, minBundleSize, startOffset, endOffset);
-    }
-
-    @Override
-    protected FileBasedSource<Byte> createForSubrangeOfFile(String fileName, long start, long end) {
-      return new UnsplittableSource(fileName, getMinBundleSize(), start, end);
-    }
-
-    @Override
-    protected FileBasedReader<Byte> createSingleFileReader(PipelineOptions options) {
-      return new UnsplittableReader(this);
-    }
-
-    @Override
-    public Coder<Byte> getDefaultOutputCoder() {
-      return SerializableCoder.of(Byte.class);
-    }
-
-    private static class UnsplittableReader extends FileBasedReader<Byte> {
-      ByteBuffer buff = ByteBuffer.allocate(1);
-      Byte current;
-      long offset;
-      ReadableByteChannel channel;
-
-      public UnsplittableReader(UnsplittableSource source) {
-        super(source);
-        offset = source.getStartOffset() - 1;
-      }
-
-      @Override
-      public Byte getCurrent() throws NoSuchElementException {
-        return current;
-      }
-
-      @Override
-      public boolean allowsDynamicSplitting() {
-        return false;
-      }
-
-      @Override
-      protected boolean isAtSplitPoint() {
-        return true;
-      }
-
-      @Override
-      protected void startReading(ReadableByteChannel channel) throws IOException {
-        this.channel = channel;
-      }
-
-      @Override
-      protected boolean readNextRecord() throws IOException {
-        buff.clear();
-        if (channel.read(buff) != 1) {
-          return false;
-        }
-        current = buff.get(0);
-        offset += 1;
-        return true;
-      }
-
-      @Override
-      protected long getCurrentOffset() {
-        return offset;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index a2b715f..7212d4f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -64,6 +64,7 @@ import org.apache.beam.runners.core.construction.EmptyFlattenAsCreateFactory;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
 import org.apache.beam.runners.core.construction.UnsupportedOverrideFactory;
 import org.apache.beam.runners.dataflow.BatchViewOverrides.BatchCombineGloballyAsSingletonViewFactory;
 import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
@@ -1193,7 +1194,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     public final PCollection<T> expand(PBegin input) {
       source.validate();
 
-      return Pipeline.applyTransform(input, new DataflowUnboundedReadFromBoundedSource<>(source))
+      return Pipeline.applyTransform(input, new UnboundedReadFromBoundedSource<>(source))
           .setIsBoundedInternal(IsBounded.BOUNDED);
     }
   }


[3/3] beam git commit: This closes #2481

Posted by dh...@apache.org.
This closes #2481


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e0e39a97
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e0e39a97
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e0e39a97

Branch: refs/heads/master
Commit: e0e39a97560933f99eaee98549877fd5f8f49a52
Parents: e53f959 6622914
Author: Dan Halperin <dh...@google.com>
Authored: Mon Apr 10 10:16:06 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Apr 10 10:16:06 2017 -0700

----------------------------------------------------------------------
 .../translation/ApexPipelineTranslator.java     |   2 +-
 runners/core-construction-java/pom.xml          |  15 +
 .../UnboundedReadFromBoundedSource.java         | 542 ++++++++++++++++++
 .../UnboundedReadFromBoundedSourceTest.java     | 373 +++++++++++++
 runners/core-java/pom.xml                       |   5 -
 .../core/UnboundedReadFromBoundedSource.java    | 542 ------------------
 .../UnboundedReadFromBoundedSourceTest.java     | 373 -------------
 .../beam/runners/dataflow/DataflowRunner.java   |   3 +-
 .../DataflowUnboundedReadFromBoundedSource.java | 547 -------------------
 ...aflowUnboundedReadFromBoundedSourceTest.java |  79 ---
 .../beam/runners/spark/TestSparkRunner.java     |   2 +-
 11 files changed, 934 insertions(+), 1549 deletions(-)
----------------------------------------------------------------------