You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:48:08 UTC

[44/67] [partial] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedSource.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedSource.java
deleted file mode 100644
index be3a415..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedSource.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * A {@link Source} that reads a finite amount of input and, because of that, supports
- * some additional operations.
- *
- * <p>The operations are:
- * <ul>
- * <li>Splitting into bundles of given size: {@link #splitIntoBundles};
- * <li>Size estimation: {@link #getEstimatedSizeBytes};
- * <li>Telling whether or not this source produces key/value pairs in sorted order:
- * {@link #producesSortedKeys};
- * <li>The reader ({@link BoundedReader}) supports progress estimation
- * ({@link BoundedReader#getFractionConsumed}) and dynamic splitting
- * ({@link BoundedReader#splitAtFraction}).
- * </ul>
- *
- * <p>To use this class for supporting your custom input type, derive your class
- * class from it, and override the abstract methods. For an example, see {@link DatastoreIO}.
- *
- * @param <T> Type of records read by the source.
- */
-public abstract class BoundedSource<T> extends Source<T> {
-  /**
-   * Splits the source into bundles of approximately {@code desiredBundleSizeBytes}.
-   */
-  public abstract List<? extends BoundedSource<T>> splitIntoBundles(
-      long desiredBundleSizeBytes, PipelineOptions options) throws Exception;
-
-  /**
-   * An estimate of the total size (in bytes) of the data that would be read from this source.
-   * This estimate is in terms of external storage size, before any decompression or other
-   * processing done by the reader.
-   */
-  public abstract long getEstimatedSizeBytes(PipelineOptions options) throws Exception;
-
-  /**
-   * Whether this source is known to produce key/value pairs sorted by lexicographic order on
-   * the bytes of the encoded key.
-   */
-  public abstract boolean producesSortedKeys(PipelineOptions options) throws Exception;
-
-  /**
-   * Returns a new {@link BoundedReader} that reads from this source.
-   */
-  public abstract BoundedReader<T> createReader(PipelineOptions options) throws IOException;
-
-  /**
-   * A {@code Reader} that reads a bounded amount of input and supports some additional
-   * operations, such as progress estimation and dynamic work rebalancing.
-   *
-   * <h3>Boundedness</h3>
-   * <p>Once {@link #start} or {@link #advance} has returned false, neither will be called
-   * again on this object.
-   *
-   * <h3>Thread safety</h3>
-   * All methods will be run from the same thread except {@link #splitAtFraction},
-   * {@link #getFractionConsumed} and {@link #getCurrentSource}, which can be called concurrently
-   * from a different thread. There will not be multiple concurrent calls to
-   * {@link #splitAtFraction} but there can be for {@link #getFractionConsumed} if
-   * {@link #splitAtFraction} is implemented.
-   *
-   * <p>If the source does not implement {@link #splitAtFraction}, you do not need to worry about
-   * thread safety. If implemented, it must be safe to call {@link #splitAtFraction} and
-   * {@link #getFractionConsumed} concurrently with other methods.
-   *
-   * <p>Additionally, a successful {@link #splitAtFraction} call must, by definition, cause
-   * {@link #getCurrentSource} to start returning a different value.
-   * Callers of {@link #getCurrentSource} need to be aware of the possibility that the returned
-   * value can change at any time, and must only access the properties of the source returned by
-   * {@link #getCurrentSource} which do not change between {@link #splitAtFraction} calls.
-   *
-   * <h3>Implementing {@link #splitAtFraction}</h3>
-   * In the course of dynamic work rebalancing, the method {@link #splitAtFraction}
-   * may be called concurrently with {@link #advance} or {@link #start}. It is critical that
-   * their interaction is implemented in a thread-safe way, otherwise data loss is possible.
-   *
-   * <p>Sources which support dynamic work rebalancing should use
-   * {@link com.google.cloud.dataflow.sdk.io.range.RangeTracker} to manage the (source-specific)
-   * range of positions that is being split. If your source supports dynamic work rebalancing,
-   * please use that class to implement it if possible; if not possible, please contact the team
-   * at <i>dataflow-feedback@google.com</i>.
-   */
-  @Experimental(Experimental.Kind.SOURCE_SINK)
-  public abstract static class BoundedReader<T> extends Source.Reader<T> {
-    /**
-     * Returns a value in [0, 1] representing approximately what fraction of the
-     * {@link #getCurrentSource current source} this reader has read so far, or {@code null} if such
-     * an estimate is not available.
-     *
-     * <p>It is recommended that this method should satisfy the following properties:
-     * <ul>
-     *   <li>Should return 0 before the {@link #start} call.
-     *   <li>Should return 1 after a {@link #start} or {@link #advance} call that returns false.
-     *   <li>The returned values should be non-decreasing (though they don't have to be unique).
-     * </ul>
-     *
-     * <p>By default, returns null to indicate that this cannot be estimated.
-     *
-     * <h5>Thread safety</h5>
-     * If {@link #splitAtFraction} is implemented, this method can be called concurrently to other
-     * methods (including itself), and it is therefore critical for it to be implemented
-     * in a thread-safe way.
-     */
-    public Double getFractionConsumed() {
-      return null;
-    }
-
-    /**
-     * Returns a {@code Source} describing the same input that this {@code Reader} currently reads
-     * (including items already read).
-     *
-     * <h3>Usage</h3>
-     * <p>Reader subclasses can use this method for convenience to access unchanging properties of
-     * the source being read. Alternatively, they can cache these properties in the constructor.
-     * <p>The framework will call this method in the course of dynamic work rebalancing, e.g. after
-     * a successful {@link BoundedSource.BoundedReader#splitAtFraction} call.
-     *
-     * <h3>Mutability and thread safety</h3>
-     * Remember that {@link Source} objects must always be immutable. However, the return value of
-     * this function may be affected by dynamic work rebalancing, happening asynchronously via
-     * {@link BoundedSource.BoundedReader#splitAtFraction}, meaning it can return a different
-     * {@link Source} object. However, the returned object itself will still itself be immutable.
-     * Callers must take care not to rely on properties of the returned source that may be
-     * asynchronously changed as a result of this process (e.g. do not cache an end offset when
-     * reading a file).
-     *
-     * <h3>Implementation</h3>
-     * For convenience, subclasses should usually return the most concrete subclass of
-     * {@link Source} possible.
-     * In practice, the implementation of this method should nearly always be one of the following:
-     * <ul>
-     *   <li>Source that inherits from a base class that already implements
-     *   {@link #getCurrentSource}: delegate to base class. In this case, it is almost always
-     *   an error for the subclass to maintain its own copy of the source.
-     * <pre>{@code
-     *   public FooReader(FooSource<T> source) {
-     *     super(source);
-     *   }
-     *
-     *   public FooSource<T> getCurrentSource() {
-     *     return (FooSource<T>)super.getCurrentSource();
-     *   }
-     * }</pre>
-     *   <li>Source that does not support dynamic work rebalancing: return a private final variable.
-     * <pre>{@code
-     *   private final FooSource<T> source;
-     *
-     *   public FooReader(FooSource<T> source) {
-     *     this.source = source;
-     *   }
-     *
-     *   public FooSource<T> getCurrentSource() {
-     *     return source;
-     *   }
-     * }</pre>
-     *   <li>{@link BoundedSource.BoundedReader} that explicitly supports dynamic work rebalancing:
-     *   maintain a variable pointing to an immutable source object, and protect it with
-     *   synchronization.
-     * <pre>{@code
-     *   private FooSource<T> source;
-     *
-     *   public FooReader(FooSource<T> source) {
-     *     this.source = source;
-     *   }
-     *
-     *   public synchronized FooSource<T> getCurrentSource() {
-     *     return source;
-     *   }
-     *
-     *   public synchronized FooSource<T> splitAtFraction(double fraction) {
-     *     ...
-     *     FooSource<T> primary = ...;
-     *     FooSource<T> residual = ...;
-     *     this.source = primary;
-     *     return residual;
-     *   }
-     * }</pre>
-     * </ul>
-     */
-    @Override
-    public abstract BoundedSource<T> getCurrentSource();
-
-    /**
-     * Tells the reader to narrow the range of the input it's going to read and give up
-     * the remainder, so that the new range would contain approximately the given
-     * fraction of the amount of data in the current range.
-     *
-     * <p>Returns a {@code BoundedSource} representing the remainder.
-     *
-     * <h5>Detailed description</h5>
-     * Assuming the following sequence of calls:
-     * <pre>{@code
-     *   BoundedSource<T> initial = reader.getCurrentSource();
-     *   BoundedSource<T> residual = reader.splitAtFraction(fraction);
-     *   BoundedSource<T> primary = reader.getCurrentSource();
-     * }</pre>
-     * <ul>
-     *  <li> The "primary" and "residual" sources, when read, should together cover the same
-     *  set of records as "initial".
-     *  <li> The current reader should continue to be in a valid state, and continuing to read
-     *  from it should, together with the records it already read, yield the same records
-     *  as would have been read by "primary".
-     *  <li> The amount of data read by "primary" should ideally represent approximately
-     *  the given fraction of the amount of data read by "initial".
-     * </ul>
-     * For example, a reader that reads a range of offsets <i>[A, B)</i> in a file might implement
-     * this method by truncating the current range to <i>[A, A + fraction*(B-A))</i> and returning
-     * a Source representing the range <i>[A + fraction*(B-A), B)</i>.
-     *
-     * <p>This method should return {@code null} if the split cannot be performed for this fraction
-     * while satisfying the semantics above. E.g., a reader that reads a range of offsets
-     * in a file should return {@code null} if it is already past the position in its range
-     * corresponding to the given fraction. In this case, the method MUST have no effect
-     * (the reader must behave as if the method hadn't been called at all).
-     *
-     * <h5>Statefulness</h5>
-     * Since this method (if successful) affects the reader's source, in subsequent invocations
-     * "fraction" should be interpreted relative to the new current source.
-     *
-     * <h5>Thread safety and blocking</h5>
-     * This method will be called concurrently to other methods (however there will not be multiple
-     * concurrent invocations of this method itself), and it is critical for it to be implemented
-     * in a thread-safe way (otherwise data loss is possible).
-     *
-     * <p>It is also very important that this method always completes quickly. In particular,
-     * it should not perform or wait on any blocking operations such as I/O, RPCs etc. Violating
-     * this requirement may stall completion of the work item or even cause it to fail.
-     *
-     * <p>It is incorrect to make both this method and {@link #start}/{@link #advance}
-     * {@code synchronized}, because those methods can perform blocking operations, and then
-     * this method would have to wait for those calls to complete.
-     *
-     * <p>{@link com.google.cloud.dataflow.sdk.io.range.RangeTracker} makes it easy to implement
-     * this method safely and correctly.
-     *
-     * <p>By default, returns null to indicate that splitting is not possible.
-     */
-    public BoundedSource<T> splitAtFraction(double fraction) {
-      return null;
-    }
-
-    /**
-     * By default, returns the minimum possible timestamp.
-     */
-    @Override
-    public Instant getCurrentTimestamp() throws NoSuchElementException {
-      return BoundedWindow.TIMESTAMP_MIN_VALUE;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java
deleted file mode 100644
index e3dca91..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java
+++ /dev/null
@@ -1,413 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.common.base.Preconditions;
-import com.google.common.io.ByteStreams;
-import com.google.common.primitives.Ints;
-
-import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
-import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
-
-import java.io.IOException;
-import java.io.PushbackInputStream;
-import java.io.Serializable;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.util.NoSuchElementException;
-import java.util.zip.GZIPInputStream;
-
-/**
- * A Source that reads from compressed files. A {@code CompressedSources} wraps a delegate
- * {@link FileBasedSource} that is able to read the decompressed file format.
- *
- * <p>For example, use the following to read from a gzip-compressed XML file:
- *
- * <pre> {@code
- * XmlSource mySource = XmlSource.from(...);
- * PCollection<T> collection = p.apply(Read.from(CompressedSource
- *     .from(mySource)
- *     .withDecompression(CompressedSource.CompressionMode.GZIP)));
- * } </pre>
- *
- * <p>Supported compression algorithms are {@link CompressionMode#GZIP} and
- * {@link CompressionMode#BZIP2}. User-defined compression types are supported by implementing
- * {@link DecompressingChannelFactory}.
- *
- * <p>By default, the compression algorithm is selected from those supported in
- * {@link CompressionMode} based on the file name provided to the source, namely
- * {@code ".bz2"} indicates {@link CompressionMode#BZIP2} and {@code ".gz"} indicates
- * {@link CompressionMode#GZIP}. If the file name does not match any of the supported
- * algorithms, it is assumed to be uncompressed data.
- *
- * @param <T> The type to read from the compressed file.
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public class CompressedSource<T> extends FileBasedSource<T> {
-  /**
-   * Factory interface for creating channels that decompress the content of an underlying channel.
-   */
-  public static interface DecompressingChannelFactory extends Serializable {
-    /**
-     * Given a channel, create a channel that decompresses the content read from the channel.
-     * @throws IOException
-     */
-    public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
-        throws IOException;
-  }
-
-  /**
-   * Factory interface for creating channels that decompress the content of an underlying channel,
-   * based on both the channel and the file name.
-   */
-  private static interface FileNameBasedDecompressingChannelFactory
-      extends DecompressingChannelFactory {
-    /**
-     * Given a channel, create a channel that decompresses the content read from the channel.
-     * @throws IOException
-     */
-    ReadableByteChannel createDecompressingChannel(String fileName, ReadableByteChannel channel)
-        throws IOException;
-
-    /**
-     * Given a file name, returns true if the file name matches any supported compression
-     * scheme.
-     */
-    boolean isCompressed(String fileName);
-  }
-
-  /**
-   * Default compression types supported by the {@code CompressedSource}.
-   */
-  public enum CompressionMode implements DecompressingChannelFactory {
-    /**
-     * Reads a byte channel assuming it is compressed with gzip.
-     */
-    GZIP {
-      @Override
-      public boolean matches(String fileName) {
-          return fileName.toLowerCase().endsWith(".gz");
-      }
-
-      @Override
-      public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
-          throws IOException {
-        // Determine if the input stream is gzipped. The input stream returned from the
-        // GCS connector may already be decompressed; GCS does this based on the
-        // content-encoding property.
-        PushbackInputStream stream = new PushbackInputStream(Channels.newInputStream(channel), 2);
-        byte[] headerBytes = new byte[2];
-        int bytesRead = ByteStreams.read(
-            stream /* source */, headerBytes /* dest */, 0 /* offset */, 2 /* len */);
-        stream.unread(headerBytes, 0, bytesRead);
-        if (bytesRead >= 2) {
-          byte zero = 0x00;
-          int header = Ints.fromBytes(zero, zero, headerBytes[1], headerBytes[0]);
-          if (header == GZIPInputStream.GZIP_MAGIC) {
-            return Channels.newChannel(new GzipCompressorInputStream(stream));
-          }
-        }
-        return Channels.newChannel(stream);
-      }
-    },
-
-    /**
-     * Reads a byte channel assuming it is compressed with bzip2.
-     */
-    BZIP2 {
-      @Override
-      public boolean matches(String fileName) {
-          return fileName.toLowerCase().endsWith(".bz2");
-      }
-
-      @Override
-      public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
-          throws IOException {
-        return Channels.newChannel(
-            new BZip2CompressorInputStream(Channels.newInputStream(channel)));
-      }
-    };
-
-    /**
-     * Returns {@code true} if the given file name implies that the contents are compressed
-     * according to the compression embodied by this factory.
-     */
-    public abstract boolean matches(String fileName);
-
-    @Override
-    public abstract ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
-        throws IOException;
-  }
-
-  /**
-   * Reads a byte channel detecting compression according to the file name. If the filename
-   * is not any other known {@link CompressionMode}, it is presumed to be uncompressed.
-   */
-  private static class DecompressAccordingToFilename
-      implements FileNameBasedDecompressingChannelFactory {
-
-    @Override
-    public ReadableByteChannel createDecompressingChannel(
-        String fileName, ReadableByteChannel channel) throws IOException {
-      for (CompressionMode type : CompressionMode.values()) {
-        if (type.matches(fileName)) {
-          return type.createDecompressingChannel(channel);
-        }
-      }
-      // Uncompressed
-      return channel;
-    }
-
-    @Override
-    public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel) {
-      throw new UnsupportedOperationException(
-          String.format("%s does not support createDecompressingChannel(%s) but only"
-              + " createDecompressingChannel(%s,%s)",
-              getClass().getSimpleName(),
-              String.class.getSimpleName(),
-              ReadableByteChannel.class.getSimpleName(),
-              ReadableByteChannel.class.getSimpleName()));
-    }
-
-    @Override
-    public boolean isCompressed(String fileName) {
-      for (CompressionMode type : CompressionMode.values()) {
-        if  (type.matches(fileName)) {
-          return true;
-        }
-      }
-      return false;
-    }
-  }
-
-  private final FileBasedSource<T> sourceDelegate;
-  private final DecompressingChannelFactory channelFactory;
-
-  /**
-   * Creates a {@link Read} transform that reads from that reads from the underlying
-   * {@link FileBasedSource} {@code sourceDelegate} after decompressing it with a {@link
-   * DecompressingChannelFactory}.
-   */
-  public static <T> Read.Bounded<T> readFromSource(
-      FileBasedSource<T> sourceDelegate, DecompressingChannelFactory channelFactory) {
-    return Read.from(new CompressedSource<>(sourceDelegate, channelFactory));
-  }
-
-  /**
-   * Creates a {@code CompressedSource} from an underlying {@code FileBasedSource}. The type
-   * of compression used will be based on the file name extension unless explicitly
-   * configured via {@link CompressedSource#withDecompression}.
-   */
-  public static <T> CompressedSource<T> from(FileBasedSource<T> sourceDelegate) {
-    return new CompressedSource<>(sourceDelegate, new DecompressAccordingToFilename());
-  }
-
-  /**
-   * Return a {@code CompressedSource} that is like this one but will decompress its underlying file
-   * with the given {@link DecompressingChannelFactory}.
-   */
-  public CompressedSource<T> withDecompression(DecompressingChannelFactory channelFactory) {
-    return new CompressedSource<>(this.sourceDelegate, channelFactory);
-  }
-
-  /**
-   * Creates a {@code CompressedSource} from a delegate file based source and a decompressing
-   * channel factory.
-   */
-  private CompressedSource(
-      FileBasedSource<T> sourceDelegate, DecompressingChannelFactory channelFactory) {
-    super(sourceDelegate.getFileOrPatternSpec(), Long.MAX_VALUE);
-    this.sourceDelegate = sourceDelegate;
-    this.channelFactory = channelFactory;
-  }
-
-  /**
-   * Creates a {@code CompressedSource} for an individual file. Used by {@link
-   * CompressedSource#createForSubrangeOfFile}.
-   */
-  private CompressedSource(FileBasedSource<T> sourceDelegate,
-      DecompressingChannelFactory channelFactory, String filePatternOrSpec, long minBundleSize,
-      long startOffset, long endOffset) {
-    super(filePatternOrSpec, minBundleSize, startOffset, endOffset);
-    Preconditions.checkArgument(
-        startOffset == 0,
-        "CompressedSources must start reading at offset 0. Requested offset: " + startOffset);
-    this.sourceDelegate = sourceDelegate;
-    this.channelFactory = channelFactory;
-  }
-
-  /**
-   * Validates that the delegate source is a valid source and that the channel factory is not null.
-   */
-  @Override
-  public void validate() {
-    super.validate();
-    Preconditions.checkNotNull(sourceDelegate);
-    sourceDelegate.validate();
-    Preconditions.checkNotNull(channelFactory);
-  }
-
-  /**
-   * Creates a {@code CompressedSource} for a subrange of a file. Called by superclass to create a
-   * source for a single file.
-   */
-  @Override
-  protected FileBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end) {
-    return new CompressedSource<>(sourceDelegate.createForSubrangeOfFile(fileName, start, end),
-        channelFactory, fileName, Long.MAX_VALUE, start, end);
-  }
-
-  /**
-   * Determines whether a single file represented by this source is splittable. Returns true
-   * if we are using the default decompression factory and and it determines
-   * from the requested file name that the file is not compressed.
-   */
-  @Override
-  protected final boolean isSplittable() throws Exception {
-    if (channelFactory instanceof FileNameBasedDecompressingChannelFactory) {
-      FileNameBasedDecompressingChannelFactory fileNameBasedChannelFactory =
-          (FileNameBasedDecompressingChannelFactory) channelFactory;
-      return !fileNameBasedChannelFactory.isCompressed(getFileOrPatternSpec());
-    }
-    return true;
-  }
-
-  /**
-   * Creates a {@code FileBasedReader} to read a single file.
-   *
-   * <p>Uses the delegate source to create a single file reader for the delegate source.
-   * Utilizes the default decompression channel factory to not wrap the source reader
-   * if the file name does not represent a compressed file allowing for splitting of
-   * the source.
-   */
-  @Override
-  protected final FileBasedReader<T> createSingleFileReader(PipelineOptions options) {
-    if (channelFactory instanceof FileNameBasedDecompressingChannelFactory) {
-      FileNameBasedDecompressingChannelFactory fileNameBasedChannelFactory =
-          (FileNameBasedDecompressingChannelFactory) channelFactory;
-      if (!fileNameBasedChannelFactory.isCompressed(getFileOrPatternSpec())) {
-        return sourceDelegate.createSingleFileReader(options);
-      }
-    }
-    return new CompressedReader<T>(
-        this, sourceDelegate.createSingleFileReader(options));
-  }
-
-  /**
-   * Returns whether the delegate source produces sorted keys.
-   */
-  @Override
-  public final boolean producesSortedKeys(PipelineOptions options) throws Exception {
-    return sourceDelegate.producesSortedKeys(options);
-  }
-
-  /**
-   * Returns the delegate source's default output coder.
-   */
-  @Override
-  public final Coder<T> getDefaultOutputCoder() {
-    return sourceDelegate.getDefaultOutputCoder();
-  }
-
-  public final DecompressingChannelFactory getChannelFactory() {
-    return channelFactory;
-  }
-
-  /**
-   * Reader for a {@link CompressedSource}. Decompresses its input and uses a delegate
-   * reader to read elements from the decompressed input.
-   * @param <T> The type of records read from the source.
-   */
-  public static class CompressedReader<T> extends FileBasedReader<T> {
-
-    private final FileBasedReader<T> readerDelegate;
-    private final CompressedSource<T> source;
-    private int numRecordsRead;
-
-    /**
-     * Create a {@code CompressedReader} from a {@code CompressedSource} and delegate reader.
-     */
-    public CompressedReader(CompressedSource<T> source, FileBasedReader<T> readerDelegate) {
-      super(source);
-      this.source = source;
-      this.readerDelegate = readerDelegate;
-    }
-
-    /**
-     * Gets the current record from the delegate reader.
-     */
-    @Override
-    public T getCurrent() throws NoSuchElementException {
-      return readerDelegate.getCurrent();
-    }
-
-    /**
-     * Returns true only for the first record; compressed sources cannot be split.
-     */
-    @Override
-    protected final boolean isAtSplitPoint() {
-      // We have to return true for the first record, but not for the state before reading it,
-      // and not for the state after reading any other record. Hence == rather than >= or <=.
-      // This is required because FileBasedReader is intended for readers that can read a range
-      // of offsets in a file and where the range can be split in parts. CompressedReader,
-      // however, is a degenerate case because it cannot be split, but it has to satisfy the
-      // semantics of offsets and split points anyway.
-      return numRecordsRead == 1;
-    }
-
-    /**
-     * Creates a decompressing channel from the input channel and passes it to its delegate reader's
-     * {@link FileBasedReader#startReading(ReadableByteChannel)}.
-     */
-    @Override
-    protected final void startReading(ReadableByteChannel channel) throws IOException {
-      if (source.getChannelFactory() instanceof FileNameBasedDecompressingChannelFactory) {
-        FileNameBasedDecompressingChannelFactory channelFactory =
-            (FileNameBasedDecompressingChannelFactory) source.getChannelFactory();
-        readerDelegate.startReading(channelFactory.createDecompressingChannel(
-            getCurrentSource().getFileOrPatternSpec(),
-            channel));
-      } else {
-        readerDelegate.startReading(source.getChannelFactory().createDecompressingChannel(
-            channel));
-      }
-    }
-
-    /**
-     * Reads the next record via the delegate reader.
-     */
-    @Override
-    protected final boolean readNextRecord() throws IOException {
-      if (!readerDelegate.readNextRecord()) {
-        return false;
-      }
-      ++numRecordsRead;
-      return true;
-    }
-
-    /**
-     * Returns the delegate reader's current offset in the decompressed input.
-     */
-    @Override
-    protected final long getCurrentOffset() {
-      return readerDelegate.getCurrentOffset();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java
deleted file mode 100644
index 07609ba..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.cloud.dataflow.sdk.io.CountingSource.NowTimestampFn;
-import com.google.cloud.dataflow.sdk.io.Read.Unbounded;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.values.PBegin;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
-import com.google.common.base.Optional;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * A {@link PTransform} that produces longs. When used to produce a
- * {@link IsBounded#BOUNDED bounded} {@link PCollection}, {@link CountingInput} starts at {@code 0}
- * and counts up to a specified maximum. When used to produce an
- * {@link IsBounded#UNBOUNDED unbounded} {@link PCollection}, it counts up to {@link Long#MAX_VALUE}
- * and then never produces more output. (In practice, this limit should never be reached.)
- *
- * <p>The bounded {@link CountingInput} is implemented based on {@link OffsetBasedSource} and
- * {@link OffsetBasedSource.OffsetBasedReader}, so it performs efficient initial splitting and it
- * supports dynamic work rebalancing.
- *
- * <p>To produce a bounded {@code PCollection<Long>}, use {@link CountingInput#upTo(long)}:
- *
- * <pre>{@code
- * Pipeline p = ...
- * PTransform<PBegin, PCollection<Long>> producer = CountingInput.upTo(1000);
- * PCollection<Long> bounded = p.apply(producer);
- * }</pre>
- *
- * <p>To produce an unbounded {@code PCollection<Long>}, use {@link CountingInput#unbounded()},
- * calling {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to provide values
- * with timestamps other than {@link Instant#now}.
- *
- * <pre>{@code
- * Pipeline p = ...
- *
- * // To create an unbounded producer that uses processing time as the element timestamp.
- * PCollection<Long> unbounded = p.apply(CountingInput.unbounded());
- * // Or, to create an unbounded source that uses a provided function to set the element timestamp.
- * PCollection<Long> unboundedWithTimestamps =
- *     p.apply(CountingInput.unbounded().withTimestampFn(someFn));
- * }</pre>
- */
-public class CountingInput {
-  /**
-   * Creates a {@link BoundedCountingInput} that will produce the specified number of elements,
-   * from {@code 0} to {@code numElements - 1}.
-   */
-  public static BoundedCountingInput upTo(long numElements) {
-    checkArgument(numElements > 0, "numElements (%s) must be greater than 0", numElements);
-    return new BoundedCountingInput(numElements);
-  }
-
-  /**
-   * Creates an {@link UnboundedCountingInput} that will produce numbers starting from {@code 0} up
-   * to {@link Long#MAX_VALUE}.
-   *
-   * <p>After {@link Long#MAX_VALUE}, the transform never produces more output. (In practice, this
-   * limit should never be reached.)
-   *
-   * <p>Elements in the resulting {@link PCollection PCollection&lt;Long&gt;} will by default have
-   * timestamps corresponding to processing time at element generation, provided by
-   * {@link Instant#now}. Use the transform returned by
-   * {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to control the output
-   * timestamps.
-   */
-  public static UnboundedCountingInput unbounded() {
-    return new UnboundedCountingInput(
-        new NowTimestampFn(), Optional.<Long>absent(), Optional.<Duration>absent());
-  }
-
-  /**
-   * A {@link PTransform} that will produce a specified number of {@link Long Longs} starting from
-   * 0.
-   */
-  public static class BoundedCountingInput extends PTransform<PBegin, PCollection<Long>> {
-    private final long numElements;
-
-    private BoundedCountingInput(long numElements) {
-      this.numElements = numElements;
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public PCollection<Long> apply(PBegin begin) {
-      return begin.apply(Read.from(CountingSource.upTo(numElements)));
-    }
-  }
-
-  /**
-   * A {@link PTransform} that will produce numbers starting from {@code 0} up to
-   * {@link Long#MAX_VALUE}.
-   *
-   * <p>After {@link Long#MAX_VALUE}, the transform never produces more output. (In practice, this
-   * limit should never be reached.)
-   *
-   * <p>Elements in the resulting {@link PCollection PCollection&lt;Long&gt;} will by default have
-   * timestamps corresponding to processing time at element generation, provided by
-   * {@link Instant#now}. Use the transform returned by
-   * {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to control the output
-   * timestamps.
-   */
-  public static class UnboundedCountingInput extends PTransform<PBegin, PCollection<Long>> {
-    private final SerializableFunction<Long, Instant> timestampFn;
-    private final Optional<Long> maxNumRecords;
-    private final Optional<Duration> maxReadTime;
-
-    private UnboundedCountingInput(
-        SerializableFunction<Long, Instant> timestampFn,
-        Optional<Long> maxNumRecords,
-        Optional<Duration> maxReadTime) {
-      this.timestampFn = timestampFn;
-      this.maxNumRecords = maxNumRecords;
-      this.maxReadTime = maxReadTime;
-    }
-
-    /**
-     * Returns an {@link UnboundedCountingInput} like this one, but where output elements have the
-     * timestamp specified by the timestampFn.
-     *
-     * <p>Note that the timestamps produced by {@code timestampFn} may not decrease.
-     */
-    public UnboundedCountingInput withTimestampFn(SerializableFunction<Long, Instant> timestampFn) {
-      return new UnboundedCountingInput(timestampFn, maxNumRecords, maxReadTime);
-    }
-
-    /**
-     * Returns an {@link UnboundedCountingInput} like this one, but that will read at most the
-     * specified number of elements.
-     *
-     * <p>A bounded amount of elements will be produced by the result transform, and the result
-     * {@link PCollection} will be {@link IsBounded#BOUNDED bounded}.
-     */
-    public UnboundedCountingInput withMaxNumRecords(long maxRecords) {
-      checkArgument(
-          maxRecords > 0, "MaxRecords must be a positive (nonzero) value. Got %s", maxRecords);
-      return new UnboundedCountingInput(timestampFn, Optional.of(maxRecords), maxReadTime);
-    }
-
-    /**
-     * Returns an {@link UnboundedCountingInput} like this one, but that will read for at most the
-     * specified amount of time.
-     *
-     * <p>A bounded amount of elements will be produced by the result transform, and the result
-     * {@link PCollection} will be {@link IsBounded#BOUNDED bounded}.
-     */
-    public UnboundedCountingInput withMaxReadTime(Duration readTime) {
-      checkNotNull(readTime, "ReadTime cannot be null");
-      return new UnboundedCountingInput(timestampFn, maxNumRecords, Optional.of(readTime));
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public PCollection<Long> apply(PBegin begin) {
-      Unbounded<Long> read = Read.from(CountingSource.unboundedWithTimestampFn(timestampFn));
-      if (!maxNumRecords.isPresent() && !maxReadTime.isPresent()) {
-        return begin.apply(read);
-      } else if (maxNumRecords.isPresent() && !maxReadTime.isPresent()) {
-        return begin.apply(read.withMaxNumRecords(maxNumRecords.get()));
-      } else if (!maxNumRecords.isPresent() && maxReadTime.isPresent()) {
-        return begin.apply(read.withMaxReadTime(maxReadTime.get()));
-      } else {
-        return begin.apply(
-            read.withMaxReadTime(maxReadTime.get()).withMaxNumRecords(maxNumRecords.get()));
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java
deleted file mode 100644
index 412f3a7..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java
+++ /dev/null
@@ -1,397 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.cloud.dataflow.sdk.coders.AvroCoder;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
-import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
-import com.google.cloud.dataflow.sdk.io.CountingInput.UnboundedCountingInput;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.collect.ImmutableList;
-
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * A source that produces longs. When used as a {@link BoundedSource}, {@link CountingSource}
- * starts at {@code 0} and counts up to a specified maximum. When used as an
- * {@link UnboundedSource}, it counts up to {@link Long#MAX_VALUE} and then never produces more
- * output. (In practice, this limit should never be reached.)
- *
- * <p>The bounded {@link CountingSource} is implemented based on {@link OffsetBasedSource} and
- * {@link OffsetBasedSource.OffsetBasedReader}, so it performs efficient initial splitting and it
- * supports dynamic work rebalancing.
- *
- * <p>To produce a bounded {@code PCollection<Long>}, use {@link CountingSource#upTo(long)}:
- *
- * <pre>{@code
- * Pipeline p = ...
- * PTransform<PBegin, PCollection<Long>> producer = CountingInput.upTo(1000);
- * PCollection<Long> bounded = p.apply(producer);
- * }</pre>
- *
- * <p>To produce an unbounded {@code PCollection<Long>}, use {@link CountingInput#unbounded()},
- * calling {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to provide values
- * with timestamps other than {@link Instant#now}.
- *
- * <pre>{@code
- * Pipeline p = ...
- *
- * // To create an unbounded PCollection that uses processing time as the element timestamp.
- * PCollection<Long> unbounded = p.apply(CountingInput.unbounded());
- * // Or, to create an unbounded source that uses a provided function to set the element timestamp.
- * PCollection<Long> unboundedWithTimestamps =
- *     p.apply(CountingInput.unbounded().withTimestampFn(someFn));
- *
- * }</pre>
- */
-public class CountingSource {
-  /**
-   * Creates a {@link BoundedSource} that will produce the specified number of elements,
-   * from {@code 0} to {@code numElements - 1}.
-   *
-   * @deprecated use {@link CountingInput#upTo(long)} instead
-   */
-  @Deprecated
-  public static BoundedSource<Long> upTo(long numElements) {
-    checkArgument(numElements > 0, "numElements (%s) must be greater than 0", numElements);
-    return new BoundedCountingSource(0, numElements);
-  }
-
-  /**
-   * Creates an {@link UnboundedSource} that will produce numbers starting from {@code 0} up to
-   * {@link Long#MAX_VALUE}.
-   *
-   * <p>After {@link Long#MAX_VALUE}, the source never produces more output. (In practice, this
-   * limit should never be reached.)
-   *
-   * <p>Elements in the resulting {@link PCollection PCollection&lt;Long&gt;} will have timestamps
-   * corresponding to processing time at element generation, provided by {@link Instant#now}.
-   *
-   * @deprecated use {@link CountingInput#unbounded()} instead
-   */
-  @Deprecated
-  public static UnboundedSource<Long, CounterMark> unbounded() {
-    return unboundedWithTimestampFn(new NowTimestampFn());
-  }
-
-  /**
-   * Creates an {@link UnboundedSource} that will produce numbers starting from {@code 0} up to
-   * {@link Long#MAX_VALUE}, with element timestamps supplied by the specified function.
-   *
-   * <p>After {@link Long#MAX_VALUE}, the source never produces more output. (In practice, this
-   * limit should never be reached.)
-   *
-   * <p>Note that the timestamps produced by {@code timestampFn} may not decrease.
-   *
-   * @deprecated use {@link CountingInput#unbounded()} and call
-   *             {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} instead
-   */
-  @Deprecated
-  public static UnboundedSource<Long, CounterMark> unboundedWithTimestampFn(
-      SerializableFunction<Long, Instant> timestampFn) {
-    return new UnboundedCountingSource(0, 1, timestampFn);
-  }
-
-  /////////////////////////////////////////////////////////////////////////////////////////////
-
-  /** Prevent instantiation. */
-  private CountingSource() {}
-
-  /**
-   * A function that returns {@link Instant#now} as the timestamp for each generated element.
-   */
-  static class NowTimestampFn implements SerializableFunction<Long, Instant> {
-    @Override
-    public Instant apply(Long input) {
-      return Instant.now();
-    }
-  }
-
-  /**
-   * An implementation of {@link CountingSource} that produces a bounded {@link PCollection}.
-   * It is implemented on top of {@link OffsetBasedSource} (with associated reader
-   * {@link BoundedCountingReader}) and performs efficient initial splitting and supports dynamic
-   * work rebalancing.
-   */
-  private static class BoundedCountingSource extends OffsetBasedSource<Long> {
-    /**
-     * Creates a {@link BoundedCountingSource} that generates the numbers in the specified
-     * {@code [start, end)} range.
-     */
-    public BoundedCountingSource(long start, long end) {
-      super(start, end, 1 /* can be split every 1 offset */);
-    }
-
-    ////////////////////////////////////////////////////////////////////////////////////////////
-
-    @Override
-    public long getBytesPerOffset() {
-      return 8;
-    }
-
-    @Override
-    public long getMaxEndOffset(PipelineOptions options) throws Exception {
-      return getEndOffset();
-    }
-
-    @Override
-    public OffsetBasedSource<Long> createSourceForSubrange(long start, long end) {
-      return new BoundedCountingSource(start, end);
-    }
-
-    @Override
-    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
-      return true;
-    }
-
-    @Override
-    public com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader<Long> createReader(
-        PipelineOptions options) throws IOException {
-      return new BoundedCountingReader(this);
-    }
-
-    @Override
-    public Coder<Long> getDefaultOutputCoder() {
-      return VarLongCoder.of();
-    }
-  }
-
-  /**
-   * The reader associated with {@link BoundedCountingSource}.
-   *
-   * @see BoundedCountingSource
-   */
-  private static class BoundedCountingReader extends OffsetBasedSource.OffsetBasedReader<Long> {
-    private long current;
-
-    public BoundedCountingReader(OffsetBasedSource<Long> source) {
-      super(source);
-    }
-
-    @Override
-    protected long getCurrentOffset() throws NoSuchElementException {
-      return current;
-    }
-
-    @Override
-    public synchronized BoundedCountingSource getCurrentSource()  {
-      return (BoundedCountingSource) super.getCurrentSource();
-    }
-
-    @Override
-    public Long getCurrent() throws NoSuchElementException {
-      return current;
-    }
-
-    @Override
-    protected boolean startImpl() throws IOException {
-      current = getCurrentSource().getStartOffset();
-      return true;
-    }
-
-    @Override
-    protected boolean advanceImpl() throws IOException {
-      current++;
-      return true;
-    }
-
-    @Override
-    public void close() throws IOException {}
-  }
-
-  /**
-   * An implementation of {@link CountingSource} that produces an unbounded {@link PCollection}.
-   */
-  private static class UnboundedCountingSource extends UnboundedSource<Long, CounterMark> {
-    /** The first number (>= 0) generated by this {@link UnboundedCountingSource}. */
-    private final long start;
-    /** The interval between numbers generated by this {@link UnboundedCountingSource}. */
-    private final long stride;
-    /** The function used to produce timestamps for the generated elements. */
-    private final SerializableFunction<Long, Instant> timestampFn;
-
-    /**
-     * Creates an {@link UnboundedSource} that will produce numbers starting from {@code 0} up to
-     * {@link Long#MAX_VALUE}, with element timestamps supplied by the specified function.
-     *
-     * <p>After {@link Long#MAX_VALUE}, the source never produces more output. (In practice, this
-     * limit should never be reached.)
-     *
-     * <p>Note that the timestamps produced by {@code timestampFn} may not decrease.
-     */
-    public UnboundedCountingSource(
-        long start, long stride, SerializableFunction<Long, Instant> timestampFn) {
-      this.start = start;
-      this.stride = stride;
-      this.timestampFn = timestampFn;
-    }
-
-    /**
-     * Splits an unbounded source {@code desiredNumSplits} ways by giving each split every
-     * {@code desiredNumSplits}th element that this {@link UnboundedCountingSource}
-     * produces.
-     *
-     * <p>E.g., if a source produces all even numbers {@code [0, 2, 4, 6, 8, ...)} and we want to
-     * split into 3 new sources, then the new sources will produce numbers that are 6 apart and
-     * are offset at the start by the original stride: {@code [0, 6, 12, ...)},
-     * {@code [2, 8, 14, ...)}, and {@code [4, 10, 16, ...)}.
-     */
-    @Override
-    public List<? extends UnboundedSource<Long, CountingSource.CounterMark>> generateInitialSplits(
-        int desiredNumSplits, PipelineOptions options) throws Exception {
-      // Using Javadoc example, stride 2 with 3 splits becomes stride 6.
-      long newStride = stride * desiredNumSplits;
-
-      ImmutableList.Builder<UnboundedCountingSource> splits = ImmutableList.builder();
-      for (int i = 0; i < desiredNumSplits; ++i) {
-        // Starts offset by the original stride. Using Javadoc example, this generates starts of
-        // 0, 2, and 4.
-        splits.add(new UnboundedCountingSource(start + i * stride, newStride, timestampFn));
-      }
-      return splits.build();
-    }
-
-    @Override
-    public UnboundedReader<Long> createReader(
-        PipelineOptions options, CounterMark checkpointMark) {
-      return new UnboundedCountingReader(this, checkpointMark);
-    }
-
-    @Override
-    public Coder<CountingSource.CounterMark> getCheckpointMarkCoder() {
-      return AvroCoder.of(CountingSource.CounterMark.class);
-    }
-
-    @Override
-    public void validate() {}
-
-    @Override
-    public Coder<Long> getDefaultOutputCoder() {
-      return VarLongCoder.of();
-    }
-  }
-
-  /**
-   * The reader associated with {@link UnboundedCountingSource}.
-   *
-   * @see UnboundedCountingSource
-   */
-  private static class UnboundedCountingReader extends UnboundedReader<Long> {
-    private UnboundedCountingSource source;
-    private long current;
-    private Instant currentTimestamp;
-
-    public UnboundedCountingReader(UnboundedCountingSource source, CounterMark mark) {
-      this.source = source;
-      if (mark == null) {
-        // Because we have not emitted an element yet, and start() calls advance, we need to
-        // "un-advance" so that start() produces the correct output.
-        this.current = source.start - source.stride;
-      } else {
-        this.current = mark.getLastEmitted();
-      }
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      return advance();
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      // Overflow-safe check that (current + source.stride) <= LONG.MAX_VALUE. Else, stop producing.
-      if (Long.MAX_VALUE - source.stride < current) {
-        return false;
-      }
-      current += source.stride;
-      currentTimestamp = source.timestampFn.apply(current);
-      return true;
-    }
-
-    @Override
-    public Instant getWatermark() {
-      return source.timestampFn.apply(current);
-    }
-
-    @Override
-    public CounterMark getCheckpointMark() {
-      return new CounterMark(current);
-    }
-
-    @Override
-    public UnboundedSource<Long, CounterMark> getCurrentSource() {
-      return source;
-    }
-
-    @Override
-    public Long getCurrent() throws NoSuchElementException {
-      return current;
-    }
-
-    @Override
-    public Instant getCurrentTimestamp() throws NoSuchElementException {
-      return currentTimestamp;
-    }
-
-    @Override
-    public void close() throws IOException {}
-  }
-
-  /**
-   * The checkpoint for an unbounded {@link CountingSource} is simply the last value produced. The
-   * associated source object encapsulates the information needed to produce the next value.
-   */
-  @DefaultCoder(AvroCoder.class)
-  public static class CounterMark implements UnboundedSource.CheckpointMark {
-    /** The last value emitted. */
-    private final long lastEmitted;
-
-    /**
-     * Creates a checkpoint mark reflecting the last emitted value.
-     */
-    public CounterMark(long lastEmitted) {
-      this.lastEmitted = lastEmitted;
-    }
-
-    /**
-     * Returns the last value emitted by the reader.
-     */
-    public long getLastEmitted() {
-      return lastEmitted;
-    }
-
-    /////////////////////////////////////////////////////////////////////////////////////
-
-    @SuppressWarnings("unused") // For AvroCoder
-    private CounterMark() {
-      this.lastEmitted = 0L;
-    }
-
-    @Override
-    public void finalizeCheckpoint() throws IOException {}
-   }
-}