You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/14 06:48:02 UTC

[15/74] [partial] incubator-beam git commit: Rename com/google/cloud/dataflow->org/apache/beam

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java
deleted file mode 100644
index 3ad32b4..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java
+++ /dev/null
@@ -1,663 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.util.IOChannelFactory;
-import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.SeekableByteChannel;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.NoSuchElementException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-
-/**
- * A common base class for all file-based {@link Source}s. Extend this class to implement your own
- * file-based custom source.
- *
- * <p>A file-based {@code Source} is a {@code Source} backed by a file pattern defined as a Java
- * glob, a single file, or a offset range for a single file. See {@link OffsetBasedSource} and
- * {@link com.google.cloud.dataflow.sdk.io.range.RangeTracker} for semantics of offset ranges.
- *
- * <p>This source stores a {@code String} that is an {@link IOChannelFactory} specification for a
- * file or file pattern. There should be an {@code IOChannelFactory} defined for the file
- * specification provided. Please refer to {@link IOChannelUtils} and {@link IOChannelFactory} for
- * more information on this.
- *
- * <p>In addition to the methods left abstract from {@code BoundedSource}, subclasses must implement
- * methods to create a sub-source and a reader for a range of a single file -
- * {@link #createForSubrangeOfFile} and {@link #createSingleFileReader}. Please refer to
- * {@link XmlSource} for an example implementation of {@code FileBasedSource}.
- *
- * @param <T> Type of records represented by the source.
- */
-public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
-  private static final Logger LOG = LoggerFactory.getLogger(FileBasedSource.class);
-  private static final float FRACTION_OF_FILES_TO_STAT = 0.01f;
-
-  // Package-private for testing
-  static final int MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT = 100;
-
-  // Size of the thread pool to be used for performing file operations in parallel.
-  // Package-private for testing.
-  static final int THREAD_POOL_SIZE = 128;
-
-  private final String fileOrPatternSpec;
-  private final Mode mode;
-
-  /**
-   * A given {@code FileBasedSource} represents a file resource of one of these types.
-   */
-  public enum Mode {
-    FILEPATTERN,
-    SINGLE_FILE_OR_SUBRANGE
-  }
-
-  /**
-   * Create a {@code FileBaseSource} based on a file or a file pattern specification. This
-   * constructor must be used when creating a new {@code FileBasedSource} for a file pattern.
-   *
-   * <p>See {@link OffsetBasedSource} for a detailed description of {@code minBundleSize}.
-   *
-   * @param fileOrPatternSpec {@link IOChannelFactory} specification of file or file pattern
-   *        represented by the {@link FileBasedSource}.
-   * @param minBundleSize minimum bundle size in bytes.
-   */
-  public FileBasedSource(String fileOrPatternSpec, long minBundleSize) {
-    super(0, Long.MAX_VALUE, minBundleSize);
-    mode = Mode.FILEPATTERN;
-    this.fileOrPatternSpec = fileOrPatternSpec;
-  }
-
-  /**
-   * Create a {@code FileBasedSource} based on a single file. This constructor must be used when
-   * creating a new {@code FileBasedSource} for a subrange of a single file.
-   * Additionally, this constructor must be used to create new {@code FileBasedSource}s when
-   * subclasses implement the method {@link #createForSubrangeOfFile}.
-   *
-   * <p>See {@link OffsetBasedSource} for detailed descriptions of {@code minBundleSize},
-   * {@code startOffset}, and {@code endOffset}.
-   *
-   * @param fileName {@link IOChannelFactory} specification of the file represented by the
-   *        {@link FileBasedSource}.
-   * @param minBundleSize minimum bundle size in bytes.
-   * @param startOffset starting byte offset.
-   * @param endOffset ending byte offset. If the specified value {@code >= #getMaxEndOffset()} it
-   *        implies {@code #getMaxEndOffSet()}.
-   */
-  public FileBasedSource(String fileName, long minBundleSize,
-      long startOffset, long endOffset) {
-    super(startOffset, endOffset, minBundleSize);
-    mode = Mode.SINGLE_FILE_OR_SUBRANGE;
-    this.fileOrPatternSpec = fileName;
-  }
-
-  public final String getFileOrPatternSpec() {
-    return fileOrPatternSpec;
-  }
-
-  public final Mode getMode() {
-    return mode;
-  }
-
-  @Override
-  public final FileBasedSource<T> createSourceForSubrange(long start, long end) {
-    checkArgument(mode != Mode.FILEPATTERN,
-        "Cannot split a file pattern based source based on positions");
-    checkArgument(start >= getStartOffset(),
-        "Start offset value %s of the subrange cannot be smaller than the start offset value %s"
-            + " of the parent source",
-        start,
-        getStartOffset());
-    checkArgument(end <= getEndOffset(),
-        "End offset value %s of the subrange cannot be larger than the end offset value %s",
-        end,
-        getEndOffset());
-
-    FileBasedSource<T> source = createForSubrangeOfFile(fileOrPatternSpec, start, end);
-    if (start > 0 || end != Long.MAX_VALUE) {
-      checkArgument(source.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE,
-          "Source created for the range [%s,%s) must be a subrange source", start, end);
-    }
-    return source;
-  }
-
-  /**
-   * Creates and returns a new {@code FileBasedSource} of the same type as the current
-   * {@code FileBasedSource} backed by a given file and an offset range. When current source is
-   * being split, this method is used to generate new sub-sources. When creating the source
-   * subclasses must call the constructor {@link #FileBasedSource(String, long, long, long)} of
-   * {@code FileBasedSource} with corresponding parameter values passed here.
-   *
-   * @param fileName file backing the new {@code FileBasedSource}.
-   * @param start starting byte offset of the new {@code FileBasedSource}.
-   * @param end ending byte offset of the new {@code FileBasedSource}. May be Long.MAX_VALUE,
-   *        in which case it will be inferred using {@link #getMaxEndOffset}.
-   */
-  protected abstract FileBasedSource<T> createForSubrangeOfFile(
-      String fileName, long start, long end);
-
-  /**
-   * Creates and returns an instance of a {@code FileBasedReader} implementation for the current
-   * source assuming the source represents a single file. File patterns will be handled by
-   * {@code FileBasedSource} implementation automatically.
-   */
-  protected abstract FileBasedReader<T> createSingleFileReader(
-      PipelineOptions options);
-
-  @Override
-  public final long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
-    // This implementation of method getEstimatedSizeBytes is provided to simplify subclasses. Here
-    // we perform the size estimation of files and file patterns using the interface provided by
-    // IOChannelFactory.
-
-    IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
-    if (mode == Mode.FILEPATTERN) {
-      // TODO Implement a more efficient parallel/batch size estimation mechanism for file patterns.
-      long startTime = System.currentTimeMillis();
-      long totalSize = 0;
-      Collection<String> inputs = factory.match(fileOrPatternSpec);
-      if (inputs.size() <= MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT) {
-        totalSize = getExactTotalSizeOfFiles(inputs, factory);
-        LOG.debug("Size estimation of all files of pattern {} took {} ms",
-            fileOrPatternSpec,
-            System.currentTimeMillis() - startTime);
-      } else {
-        totalSize = getEstimatedSizeOfFilesBySampling(inputs, factory);
-        LOG.debug("Size estimation of pattern {} by sampling took {} ms",
-            fileOrPatternSpec,
-            System.currentTimeMillis() - startTime);
-      }
-      return totalSize;
-    } else {
-      long start = getStartOffset();
-      long end = Math.min(getEndOffset(), getMaxEndOffset(options));
-      return end - start;
-    }
-  }
-
-  // Get the exact total size of the given set of files.
-  // Invokes multiple requests for size estimation in parallel using a thread pool.
-  // TODO: replace this with bulk request API when it is available. Will require updates
-  // to IOChannelFactory interface.
-  private static long getExactTotalSizeOfFiles(
-      Collection<String> files, IOChannelFactory ioChannelFactory) throws IOException {
-    List<ListenableFuture<Long>> futures = new ArrayList<>();
-    ListeningExecutorService service =
-        MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_POOL_SIZE));
-    try {
-      long totalSize = 0;
-      for (String file : files) {
-        futures.add(createFutureForSizeEstimation(file, ioChannelFactory, service));
-      }
-
-      for (Long val : Futures.allAsList(futures).get()) {
-        totalSize += val;
-      }
-
-      return totalSize;
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IOException(e);
-    } catch (ExecutionException e) {
-      throw new IOException(e.getCause());
-    }  finally {
-      service.shutdown();
-    }
-  }
-
-  private static ListenableFuture<Long> createFutureForSizeEstimation(
-      final String file,
-      final IOChannelFactory ioChannelFactory,
-      ListeningExecutorService service) {
-    return service.submit(
-        new Callable<Long>() {
-          @Override
-          public Long call() throws IOException {
-            return ioChannelFactory.getSizeBytes(file);
-          }
-        });
-  }
-
-  // Estimate the total size of the given set of files through sampling and extrapolation.
-  // Currently we use uniform sampling which requires a linear sampling size for a reasonable
-  // estimate.
-  // TODO: Implement a more efficient sampling mechanism.
-  private static long getEstimatedSizeOfFilesBySampling(
-      Collection<String> files, IOChannelFactory ioChannelFactory) throws IOException {
-    int sampleSize = (int) (FRACTION_OF_FILES_TO_STAT * files.size());
-    sampleSize = Math.max(MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT, sampleSize);
-
-    List<String> selectedFiles = new ArrayList<String>(files);
-    Collections.shuffle(selectedFiles);
-    selectedFiles = selectedFiles.subList(0, sampleSize);
-
-    return files.size() * getExactTotalSizeOfFiles(selectedFiles, ioChannelFactory)
-        / selectedFiles.size();
-  }
-
-  private ListenableFuture<List<? extends FileBasedSource<T>>> createFutureForFileSplit(
-      final String file,
-      final long desiredBundleSizeBytes,
-      final PipelineOptions options,
-      ListeningExecutorService service) {
-    return service.submit(new Callable<List<? extends FileBasedSource<T>>>() {
-      @Override
-      public List<? extends FileBasedSource<T>> call() throws Exception {
-        return createForSubrangeOfFile(file, 0, Long.MAX_VALUE)
-            .splitIntoBundles(desiredBundleSizeBytes, options);
-      }
-    });
-  }
-
-  @Override
-  public final List<? extends FileBasedSource<T>> splitIntoBundles(
-      long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-    // This implementation of method splitIntoBundles is provided to simplify subclasses. Here we
-    // split a FileBasedSource based on a file pattern to FileBasedSources based on full single
-    // files. For files that can be efficiently seeked, we further split FileBasedSources based on
-    // those files to FileBasedSources based on sub ranges of single files.
-
-    if (mode == Mode.FILEPATTERN) {
-      long startTime = System.currentTimeMillis();
-      List<ListenableFuture<List<? extends FileBasedSource<T>>>> futures = new ArrayList<>();
-
-      ListeningExecutorService service =
-          MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_POOL_SIZE));
-      try {
-        for (final String file : FileBasedSource.expandFilePattern(fileOrPatternSpec)) {
-          futures.add(createFutureForFileSplit(file, desiredBundleSizeBytes, options, service));
-        }
-        List<? extends FileBasedSource<T>> splitResults =
-            ImmutableList.copyOf(Iterables.concat(Futures.allAsList(futures).get()));
-        LOG.debug(
-            "Splitting the source based on file pattern {} took {} ms",
-            fileOrPatternSpec,
-            System.currentTimeMillis() - startTime);
-        return splitResults;
-      } finally {
-        service.shutdown();
-      }
-    } else {
-      if (isSplittable()) {
-        List<FileBasedSource<T>> splitResults = new ArrayList<>();
-        for (OffsetBasedSource<T> split :
-            super.splitIntoBundles(desiredBundleSizeBytes, options)) {
-          splitResults.add((FileBasedSource<T>) split);
-        }
-        return splitResults;
-      } else {
-        LOG.debug("The source for file {} is not split into sub-range based sources since "
-            + "the file is not seekable",
-            fileOrPatternSpec);
-        return ImmutableList.of(this);
-      }
-    }
-  }
-
-  /**
-   * Determines whether a file represented by this source is can be split into bundles.
-   *
-   * <p>By default, a file is splittable if it is on a file system that supports efficient read
-   * seeking. Subclasses may override to provide different behavior.
-   */
-  protected boolean isSplittable() throws Exception {
-    // We split a file-based source into subranges only if the file is efficiently seekable.
-    // If a file is not efficiently seekable it would be highly inefficient to create and read a
-    // source based on a subrange of that file.
-    IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
-    return factory.isReadSeekEfficient(fileOrPatternSpec);
-  }
-
-  @Override
-  public final BoundedReader<T> createReader(PipelineOptions options) throws IOException {
-    // Validate the current source prior to creating a reader for it.
-    this.validate();
-
-    if (mode == Mode.FILEPATTERN) {
-      long startTime = System.currentTimeMillis();
-      Collection<String> files = FileBasedSource.expandFilePattern(fileOrPatternSpec);
-      List<FileBasedReader<T>> fileReaders = new ArrayList<>();
-      for (String fileName : files) {
-        long endOffset;
-        try {
-          endOffset = IOChannelUtils.getFactory(fileName).getSizeBytes(fileName);
-        } catch (IOException e) {
-          LOG.warn("Failed to get size of {}", fileName, e);
-          endOffset = Long.MAX_VALUE;
-        }
-        fileReaders.add(
-            createForSubrangeOfFile(fileName, 0, endOffset).createSingleFileReader(options));
-      }
-      LOG.debug(
-          "Creating a reader for file pattern {} took {} ms",
-          fileOrPatternSpec,
-          System.currentTimeMillis() - startTime);
-      if (fileReaders.size() == 1) {
-        return fileReaders.get(0);
-      }
-      return new FilePatternReader(this, fileReaders);
-    } else {
-      return createSingleFileReader(options);
-    }
-  }
-
-  @Override
-  public String toString() {
-    switch (mode) {
-      case FILEPATTERN:
-        return fileOrPatternSpec;
-      case SINGLE_FILE_OR_SUBRANGE:
-        return fileOrPatternSpec + " range " + super.toString();
-      default:
-        throw new IllegalStateException("Unexpected mode: " + mode);
-    }
-  }
-
-  @Override
-  public void validate() {
-    super.validate();
-    switch (mode) {
-      case FILEPATTERN:
-        checkArgument(getStartOffset() == 0,
-            "FileBasedSource is based on a file pattern or a full single file "
-            + "but the starting offset proposed %s is not zero", getStartOffset());
-        checkArgument(getEndOffset() == Long.MAX_VALUE,
-            "FileBasedSource is based on a file pattern or a full single file "
-            + "but the ending offset proposed %s is not Long.MAX_VALUE", getEndOffset());
-        break;
-      case SINGLE_FILE_OR_SUBRANGE:
-        // Nothing more to validate.
-        break;
-      default:
-        throw new IllegalStateException("Unknown mode: " + mode);
-    }
-  }
-
-  @Override
-  public final long getMaxEndOffset(PipelineOptions options) throws IOException {
-    checkArgument(
-            mode != Mode.FILEPATTERN, "Cannot determine the exact end offset of a file pattern");
-    if (getEndOffset() == Long.MAX_VALUE) {
-      IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
-      return factory.getSizeBytes(fileOrPatternSpec);
-    } else {
-      return getEndOffset();
-    }
-  }
-
-  protected static final Collection<String> expandFilePattern(String fileOrPatternSpec)
-      throws IOException {
-    IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
-    Collection<String> matches = factory.match(fileOrPatternSpec);
-    LOG.info("Matched {} files for pattern {}", matches.size(), fileOrPatternSpec);
-    return matches;
-  }
-
-  /**
-   * A {@link Source.Reader reader} that implements code common to readers of
-   * {@code FileBasedSource}s.
-   *
-   * <h2>Seekability</h2>
-   *
-   * <p>This reader uses a {@link ReadableByteChannel} created for the file represented by the
-   * corresponding source to efficiently move to the correct starting position defined in the
-   * source. Subclasses of this reader should implement {@link #startReading} to get access to this
-   * channel. If the source corresponding to the reader is for a subrange of a file the
-   * {@code ReadableByteChannel} provided is guaranteed to be an instance of the type
-   * {@link SeekableByteChannel}, which may be used by subclass to traverse back in the channel to
-   * determine the correct starting position.
-   *
-   * <h2>Reading Records</h2>
-   *
-   * <p>Sequential reading is implemented using {@link #readNextRecord}.
-   *
-   * <p>Then {@code FileBasedReader} implements "reading a range [A, B)" in the following way.
-   * <ol>
-   * <li>{@link #start} opens the file
-   * <li>{@link #start} seeks the {@code SeekableByteChannel} to A (reading offset ranges for
-   * non-seekable files is not supported) and calls {@code startReading()}
-   * <li>{@link #start} calls {@link #advance} once, which, via {@link #readNextRecord},
-   * locates the first record which is at a split point AND its offset is at or after A.
-   * If this record is at or after B, {@link #advance} returns false and reading is finished.
-   * <li>if the previous advance call returned {@code true} sequential reading starts and
-   * {@code advance()} will be called repeatedly
-   * </ol>
-   * {@code advance()} calls {@code readNextRecord()} on the subclass, and stops (returns false) if
-   * the new record is at a split point AND the offset of the new record is at or after B.
-   *
-   * <h2>Thread Safety</h2>
-   *
-   * <p>Since this class implements {@link Source.Reader} it guarantees thread safety. Abstract
-   * methods defined here will not be accessed by more than one thread concurrently.
-   */
-  public abstract static class FileBasedReader<T> extends OffsetBasedReader<T> {
-    private ReadableByteChannel channel = null;
-
-    /**
-     * Subclasses should not perform IO operations at the constructor. All IO operations should be
-     * delayed until the {@link #startReading} method is invoked.
-     */
-    public FileBasedReader(FileBasedSource<T> source) {
-      super(source);
-      checkArgument(source.getMode() != Mode.FILEPATTERN,
-          "FileBasedReader does not support reading file patterns");
-    }
-
-    @Override
-    public FileBasedSource<T> getCurrentSource() {
-      return (FileBasedSource<T>) super.getCurrentSource();
-    }
-
-    @Override
-    protected final boolean startImpl() throws IOException {
-      FileBasedSource<T> source = getCurrentSource();
-      IOChannelFactory factory = IOChannelUtils.getFactory(source.getFileOrPatternSpec());
-      this.channel = factory.open(source.getFileOrPatternSpec());
-
-      if (channel instanceof SeekableByteChannel) {
-        SeekableByteChannel seekChannel = (SeekableByteChannel) channel;
-        seekChannel.position(source.getStartOffset());
-      } else {
-        // Channel is not seekable. Must not be a subrange.
-        checkArgument(source.mode != Mode.SINGLE_FILE_OR_SUBRANGE,
-            "Subrange-based sources must only be defined for file types that support seekable "
-            + " read channels");
-        checkArgument(source.getStartOffset() == 0,
-            "Start offset %s is not zero but channel for reading the file is not seekable.",
-            source.getStartOffset());
-      }
-
-      startReading(channel);
-
-      // Advance once to load the first record.
-      return advanceImpl();
-    }
-
-    @Override
-    protected final boolean advanceImpl() throws IOException {
-      return readNextRecord();
-    }
-
-    /**
-     * Closes any {@link ReadableByteChannel} created for the current reader. This implementation is
-     * idempotent. Any {@code close()} method introduced by a subclass must be idempotent and must
-     * call the {@code close()} method in the {@code FileBasedReader}.
-     */
-    @Override
-    public void close() throws IOException {
-      if (channel != null) {
-        channel.close();
-      }
-    }
-
-    /**
-     * Performs any initialization of the subclass of {@code FileBasedReader} that involves IO
-     * operations. Will only be invoked once and before that invocation the base class will seek the
-     * channel to the source's starting offset.
-     *
-     * <p>Provided {@link ReadableByteChannel} is for the file represented by the source of this
-     * reader. Subclass may use the {@code channel} to build a higher level IO abstraction, e.g., a
-     * BufferedReader or an XML parser.
-     *
-     * <p>If the corresponding source is for a subrange of a file, {@code channel} is guaranteed to
-     * be an instance of the type {@link SeekableByteChannel}.
-     *
-     * <p>After this method is invoked the base class will not be reading data from the channel or
-     * adjusting the position of the channel. But the base class is responsible for properly closing
-     * the channel.
-     *
-     * @param channel a byte channel representing the file backing the reader.
-     */
-    protected abstract void startReading(ReadableByteChannel channel) throws IOException;
-
-    /**
-     * Reads the next record from the channel provided by {@link #startReading}. Methods
-     * {@link #getCurrent}, {@link #getCurrentOffset}, and {@link #isAtSplitPoint()} should return
-     * the corresponding information about the record read by the last invocation of this method.
-     *
-     * <p>Note that this method will be called the same way for reading the first record in the
-     * source (file or offset range in the file) and for reading subsequent records. It is up to the
-     * subclass to do anything special for locating and reading the first record, if necessary.
-     *
-     * @return {@code true} if a record was successfully read, {@code false} if the end of the
-     *         channel was reached before successfully reading a new record.
-     */
-    protected abstract boolean readNextRecord() throws IOException;
-  }
-
-  // An internal Reader implementation that concatenates a sequence of FileBasedReaders.
-  private class FilePatternReader extends BoundedReader<T> {
-    private final FileBasedSource<T> source;
-    private final List<FileBasedReader<T>> fileReaders;
-    final ListIterator<FileBasedReader<T>> fileReadersIterator;
-    FileBasedReader<T> currentReader = null;
-
-    public FilePatternReader(FileBasedSource<T> source, List<FileBasedReader<T>> fileReaders) {
-      this.source = source;
-      this.fileReaders = fileReaders;
-      this.fileReadersIterator = fileReaders.listIterator();
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      return startNextNonemptyReader();
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      checkState(currentReader != null, "Call start() before advance()");
-      if (currentReader.advance()) {
-        return true;
-      }
-      return startNextNonemptyReader();
-    }
-
-    private boolean startNextNonemptyReader() throws IOException {
-      while (fileReadersIterator.hasNext()) {
-        currentReader = fileReadersIterator.next();
-        if (currentReader.start()) {
-          return true;
-        }
-        currentReader.close();
-      }
-      return false;
-    }
-
-    @Override
-    public T getCurrent() throws NoSuchElementException {
-      // A NoSuchElement will be thrown by the last FileBasedReader if getCurrent() is called after
-      // advance() returns false.
-      return currentReader.getCurrent();
-    }
-
-    @Override
-    public Instant getCurrentTimestamp() throws NoSuchElementException {
-      // A NoSuchElement will be thrown by the last FileBasedReader if getCurrentTimestamp()
-      // is called after advance() returns false.
-      return currentReader.getCurrentTimestamp();
-    }
-
-    @Override
-    public void close() throws IOException {
-      // Close all readers that may have not yet been closed.
-      // If this reader has not been started, currentReader is null.
-      if (currentReader != null) {
-        currentReader.close();
-      }
-      while (fileReadersIterator.hasNext()) {
-        fileReadersIterator.next().close();
-      }
-    }
-
-    @Override
-    public FileBasedSource<T> getCurrentSource() {
-      return source;
-    }
-
-    @Override
-    public FileBasedSource<T> splitAtFraction(double fraction) {
-      // Unsupported. TODO: implement.
-      LOG.debug("Dynamic splitting of FilePatternReader is unsupported.");
-      return null;
-    }
-
-    @Override
-    public Double getFractionConsumed() {
-      if (currentReader == null) {
-        return 0.0;
-      }
-      if (fileReaders.isEmpty()) {
-        return 1.0;
-      }
-      int index = fileReadersIterator.previousIndex();
-      int numReaders = fileReaders.size();
-      if (index == numReaders) {
-        return 1.0;
-      }
-      double before = 1.0 * index / numReaders;
-      double after = 1.0 * (index + 1) / numReaders;
-      Double fractionOfCurrentReader = currentReader.getFractionConsumed();
-      if (fractionOfCurrentReader == null) {
-        return before;
-      }
-      return before + fractionOfCurrentReader * (after - before);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/OffsetBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/OffsetBasedSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/OffsetBasedSource.java
deleted file mode 100644
index 288688e..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/OffsetBasedSource.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.cloud.dataflow.sdk.io.range.OffsetRangeTracker;
-import com.google.cloud.dataflow.sdk.io.range.RangeTracker;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.common.base.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * A {@link BoundedSource} that uses offsets to define starting and ending positions.
- *
- * <p>{@link OffsetBasedSource} is a common base class for all bounded sources where the input can
- * be represented as a single range, and an input can be efficiently processed in parallel by
- * splitting the range into a set of disjoint ranges whose union is the original range. This class
- * should be used for sources that can be cheaply read starting at any given offset.
- * {@link OffsetBasedSource} stores the range and implements splitting into bundles.
- *
- * <p>Extend {@link OffsetBasedSource} to implement your own offset-based custom source.
- * {@link FileBasedSource}, which is a subclass of this, adds additional functionality useful for
- * custom sources that are based on files. If possible implementors should start from
- * {@link FileBasedSource} instead of {@link OffsetBasedSource}.
- *
- * <p>Consult {@link RangeTracker} for important semantics common to all sources defined by a range
- * of positions of a certain type, including the semantics of split points
- * ({@link OffsetBasedReader#isAtSplitPoint}).
- *
- * @param <T> Type of records represented by the source.
- * @see BoundedSource
- * @see FileBasedSource
- * @see RangeTracker
- */
-public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
-  private final long startOffset;
-  private final long endOffset;
-  private final long minBundleSize;
-
-  /**
-   * @param startOffset starting offset (inclusive) of the source. Must be non-negative.
-   *
-   * @param endOffset ending offset (exclusive) of the source. Use {@link Long#MAX_VALUE} to
-   *        indicate that the entire source after {@code startOffset} should be read. Must be
-   *        {@code > startOffset}.
-   *
-   * @param minBundleSize minimum bundle size in offset units that should be used when splitting the
-   *                      source into sub-sources. This value may not be respected if the total
-   *                      range of the source is smaller than the specified {@code minBundleSize}.
-   *                      Must be non-negative.
-   */
-  public OffsetBasedSource(long startOffset, long endOffset, long minBundleSize) {
-    this.startOffset = startOffset;
-    this.endOffset = endOffset;
-    this.minBundleSize = minBundleSize;
-  }
-
-  /**
-   * Returns the starting offset of the source.
-   */
-  public long getStartOffset() {
-    return startOffset;
-  }
-
-  /**
-   * Returns the specified ending offset of the source. Any returned value greater than or equal to
-   * {@link #getMaxEndOffset(PipelineOptions)} should be treated as
-   * {@link #getMaxEndOffset(PipelineOptions)}.
-   */
-  public long getEndOffset() {
-    return endOffset;
-  }
-
-  /**
-   * Returns the minimum bundle size that should be used when splitting the source into sub-sources.
-   * This value may not be respected if the total range of the source is smaller than the specified
-   * {@code minBundleSize}.
-   */
-  public long getMinBundleSize() {
-    return minBundleSize;
-  }
-
-  @Override
-  public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-    long trueEndOffset = (endOffset == Long.MAX_VALUE) ? getMaxEndOffset(options) : endOffset;
-    return getBytesPerOffset() * (trueEndOffset - getStartOffset());
-  }
-
-  @Override
-  public List<? extends OffsetBasedSource<T>> splitIntoBundles(
-      long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-    // Split the range into bundles based on the desiredBundleSizeBytes. Final bundle is adjusted to
-    // make sure that we do not end up with a too small bundle at the end. If the desired bundle
-    // size is smaller than the minBundleSize of the source then minBundleSize will be used instead.
-
-    long desiredBundleSizeOffsetUnits = Math.max(
-        Math.max(1, desiredBundleSizeBytes / getBytesPerOffset()),
-        minBundleSize);
-
-    List<OffsetBasedSource<T>> subSources = new ArrayList<>();
-    long start = startOffset;
-    long maxEnd = Math.min(endOffset, getMaxEndOffset(options));
-
-    while (start < maxEnd) {
-      long end = start + desiredBundleSizeOffsetUnits;
-      end = Math.min(end, maxEnd);
-      // Avoid having a too small bundle at the end and ensure that we respect minBundleSize.
-      long remaining = maxEnd - end;
-      if ((remaining < desiredBundleSizeOffsetUnits / 4) || (remaining < minBundleSize)) {
-        end = maxEnd;
-      }
-      subSources.add(createSourceForSubrange(start, end));
-
-      start = end;
-    }
-    return subSources;
-  }
-
-  @Override
-  public void validate() {
-    Preconditions.checkArgument(
-        this.startOffset >= 0,
-        "Start offset has value %s, must be non-negative", this.startOffset);
-    Preconditions.checkArgument(
-        this.endOffset >= 0,
-        "End offset has value %s, must be non-negative", this.endOffset);
-    Preconditions.checkArgument(
-        this.startOffset < this.endOffset,
-        "Start offset %s must be before end offset %s",
-        this.startOffset, this.endOffset);
-    Preconditions.checkArgument(
-        this.minBundleSize >= 0,
-        "minBundleSize has value %s, must be non-negative",
-        this.minBundleSize);
-  }
-
-  @Override
-  public String toString() {
-    return "[" + startOffset + ", " + endOffset + ")";
-  }
-
-  /**
-   * Returns approximately how many bytes of data correspond to a single offset in this source.
-   * Used for translation between this source's range and methods defined in terms of bytes, such
-   * as {@link #getEstimatedSizeBytes} and {@link #splitIntoBundles}.
-   *
-   * <p>Defaults to {@code 1} byte, which is the common case for, e.g., file sources.
-   */
-  public long getBytesPerOffset() {
-    return 1L;
-  }
-
-  /**
-   * Returns the actual ending offset of the current source. The value returned by this function
-   * will be used to clip the end of the range {@code [startOffset, endOffset)} such that the
-   * range used is {@code [startOffset, min(endOffset, maxEndOffset))}.
-   *
-   * <p>As an example in which {@link OffsetBasedSource} is used to implement a file source, suppose
-   * that this source was constructed with an {@code endOffset} of {@link Long#MAX_VALUE} to
-   * indicate that a file should be read to the end. Then {@link #getMaxEndOffset} should determine
-   * the actual, exact size of the file in bytes and return it.
-   */
-  public abstract long getMaxEndOffset(PipelineOptions options) throws Exception;
-
-  /**
-   * Returns an {@link OffsetBasedSource} for a subrange of the current source. The
-   * subrange {@code [start, end)} must be within the range {@code [startOffset, endOffset)} of
-   * the current source, i.e. {@code startOffset <= start < end <= endOffset}.
-   */
-  public abstract OffsetBasedSource<T> createSourceForSubrange(long start, long end);
-
-  /**
-   * Whether this source should allow dynamic splitting of the offset ranges.
-   *
-   * <p>True by default. Override this to return false if the source cannot
-   * support dynamic splitting correctly. If this returns false,
-   * {@link OffsetBasedSource.OffsetBasedReader#splitAtFraction} will refuse all split requests.
-   */
-  public boolean allowsDynamicSplitting() {
-    return true;
-  }
-
-  /**
-   * A {@link Source.Reader} that implements code common to readers of all
-   * {@link OffsetBasedSource}s.
-   *
-   * <p>Subclasses have to implement:
-   * <ul>
-   *   <li>The methods {@link #startImpl} and {@link #advanceImpl} for reading the
-   *   first or subsequent records.
-   *   <li>The methods {@link #getCurrent}, {@link #getCurrentOffset}, and optionally
-   *   {@link #isAtSplitPoint} and {@link #getCurrentTimestamp} to access properties of
-   *   the last record successfully read by {@link #startImpl} or {@link #advanceImpl}.
-   * </ul>
-   */
-  public abstract static class OffsetBasedReader<T> extends BoundedReader<T> {
-    private static final Logger LOG = LoggerFactory.getLogger(OffsetBasedReader.class);
-
-    private OffsetBasedSource<T> source;
-
-    /** The {@link OffsetRangeTracker} managing the range and current position of the source. */
-    private final OffsetRangeTracker rangeTracker;
-
-    /**
-     * @param source the {@link OffsetBasedSource} to be read by the current reader.
-     */
-    public OffsetBasedReader(OffsetBasedSource<T> source) {
-      this.source = source;
-      this.rangeTracker = new OffsetRangeTracker(source.getStartOffset(), source.getEndOffset());
-    }
-
-    /**
-     * Returns the <i>starting</i> offset of the {@link Source.Reader#getCurrent current record},
-     * which has been read by the last successful {@link Source.Reader#start} or
-     * {@link Source.Reader#advance} call.
-     * <p>If no such call has been made yet, the return value is unspecified.
-     * <p>See {@link RangeTracker} for description of offset semantics.
-     */
-    protected abstract long getCurrentOffset() throws NoSuchElementException;
-
-    /**
-     * Returns whether the current record is at a split point (i.e., whether the current record
-     * would be the first record to be read by a source with a specified start offset of
-     * {@link #getCurrentOffset}).
-     *
-     * <p>See detailed documentation about split points in {@link RangeTracker}.
-     */
-    protected boolean isAtSplitPoint() throws NoSuchElementException {
-      return true;
-    }
-
-    @Override
-    public final boolean start() throws IOException {
-      return startImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint(), getCurrentOffset());
-    }
-
-    @Override
-    public final boolean advance() throws IOException {
-      return advanceImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint(), getCurrentOffset());
-    }
-
-    /**
-     * Initializes the {@link OffsetBasedSource.OffsetBasedReader} and advances to the first record,
-     * returning {@code true} if there is a record available to be read. This method will be
-     * invoked exactly once and may perform expensive setup operations that are needed to
-     * initialize the reader.
-     *
-     * <p>This function is the {@code OffsetBasedReader} implementation of
-     * {@link BoundedReader#start}. The key difference is that the implementor can ignore the
-     * possibility that it should no longer produce the first record, either because it has exceeded
-     * the original {@code endOffset} assigned to the reader, or because a concurrent call to
-     * {@link #splitAtFraction} has changed the source to shrink the offset range being read.
-     *
-     * @see BoundedReader#start
-     */
-    protected abstract boolean startImpl() throws IOException;
-
-    /**
-     * Advances to the next record and returns {@code true}, or returns false if there is no next
-     * record.
-     *
-     * <p>This function is the {@code OffsetBasedReader} implementation of
-     * {@link BoundedReader#advance}. The key difference is that the implementor can ignore the
-     * possibility that it should no longer produce the next record, either because it has exceeded
-     * the original {@code endOffset} assigned to the reader, or because a concurrent call to
-     * {@link #splitAtFraction} has changed the source to shrink the offset range being read.
-     *
-     * @see BoundedReader#advance
-     */
-    protected abstract boolean advanceImpl() throws IOException;
-
-    @Override
-    public synchronized OffsetBasedSource<T> getCurrentSource() {
-      return source;
-    }
-
-    @Override
-    public Double getFractionConsumed() {
-      return rangeTracker.getFractionConsumed();
-    }
-
-    @Override
-    public final synchronized OffsetBasedSource<T> splitAtFraction(double fraction) {
-      if (!getCurrentSource().allowsDynamicSplitting()) {
-        return null;
-      }
-      if (rangeTracker.getStopPosition() == Long.MAX_VALUE) {
-        LOG.debug(
-            "Refusing to split unbounded OffsetBasedReader {} at fraction {}",
-            rangeTracker, fraction);
-        return null;
-      }
-      long splitOffset = rangeTracker.getPositionForFractionConsumed(fraction);
-      LOG.debug(
-          "Proposing to split OffsetBasedReader {} at fraction {} (offset {})",
-          rangeTracker, fraction, splitOffset);
-      if (!rangeTracker.trySplitAtPosition(splitOffset)) {
-        return null;
-      }
-      long start = source.getStartOffset();
-      long end = source.getEndOffset();
-      OffsetBasedSource<T> primary = source.createSourceForSubrange(start, splitOffset);
-      OffsetBasedSource<T> residual = source.createSourceForSubrange(splitOffset, end);
-      this.source = primary;
-      return residual;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java
deleted file mode 100644
index e5b8a39..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java
+++ /dev/null
@@ -1,322 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.api.client.repackaged.com.google.common.base.Preconditions;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-
-/**
- * A helper interface for talking to Pubsub via an underlying transport.
- */
-public interface PubsubClient extends AutoCloseable {
-  /**
-   * Path representing a cloud project id.
-   */
-  class ProjectPath implements Serializable {
-    private final String path;
-
-    public ProjectPath(String path) {
-      this.path = path;
-    }
-
-    public String getPath() {
-      return path;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      ProjectPath that = (ProjectPath) o;
-
-      return path.equals(that.path);
-
-    }
-
-    @Override
-    public int hashCode() {
-      return path.hashCode();
-    }
-
-    @Override
-    public String toString() {
-      return path;
-    }
-
-    public static ProjectPath fromId(String projectId) {
-      return new ProjectPath(String.format("projects/%s", projectId));
-    }
-  }
-
-  /**
-   * Path representing a Pubsub subscription.
-   */
-  class SubscriptionPath implements Serializable {
-    private final String path;
-
-    public SubscriptionPath(String path) {
-      this.path = path;
-    }
-
-    public String getPath() {
-      return path;
-    }
-
-    public String getV1Beta1Path() {
-      String[] splits = path.split("/");
-      Preconditions.checkState(splits.length == 4);
-      return String.format("/subscriptions/%s/%s", splits[1], splits[3]);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      SubscriptionPath that = (SubscriptionPath) o;
-      return path.equals(that.path);
-    }
-
-    @Override
-    public int hashCode() {
-      return path.hashCode();
-    }
-
-    @Override
-    public String toString() {
-      return path;
-    }
-
-    public static SubscriptionPath fromName(String projectId, String subscriptionName) {
-      return new SubscriptionPath(String.format("projects/%s/subscriptions/%s",
-          projectId, subscriptionName));
-    }
-  }
-
-  /**
-   * Path representing a Pubsub topic.
-   */
-  class TopicPath implements Serializable {
-    private final String path;
-
-    public TopicPath(String path) {
-      this.path = path;
-    }
-
-    public String getPath() {
-      return path;
-    }
-
-    public String getV1Beta1Path() {
-      String[] splits = path.split("/");
-      Preconditions.checkState(splits.length == 4);
-      return String.format("/topics/%s/%s", splits[1], splits[3]);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      TopicPath topicPath = (TopicPath) o;
-      return path.equals(topicPath.path);
-    }
-
-    @Override
-    public int hashCode() {
-      return path.hashCode();
-    }
-
-    @Override
-    public String toString() {
-      return path;
-    }
-
-    public static TopicPath fromName(String projectId, String topicName) {
-      return new TopicPath(String.format("projects/%s/topics/%s", projectId, topicName));
-    }
-  }
-
-  /**
-   * A message to be sent to Pubsub.
-   */
-  class OutgoingMessage {
-    /**
-     * Underlying (encoded) element.
-     */
-    public final byte[] elementBytes;
-
-    /**
-     * Timestamp for element (ms since epoch).
-     */
-    public final long timestampMsSinceEpoch;
-
-    public OutgoingMessage(byte[] elementBytes, long timestampMsSinceEpoch) {
-      this.elementBytes = elementBytes;
-      this.timestampMsSinceEpoch = timestampMsSinceEpoch;
-    }
-  }
-
-  /**
-   * A message received from Pubsub.
-   */
-  class IncomingMessage {
-    /**
-     * Underlying (encoded) element.
-     */
-    public final byte[] elementBytes;
-
-    /**
-     * Timestamp for element (ms since epoch). Either Pubsub's processing time,
-     * or the custom timestamp associated with the message.
-     */
-    public final long timestampMsSinceEpoch;
-
-    /**
-     * Timestamp (in system time) at which we requested the message (ms since epoch).
-     */
-    public final long requestTimeMsSinceEpoch;
-
-    /**
-     * Id to pass back to Pubsub to acknowledge receipt of this message.
-     */
-    public final String ackId;
-
-    /**
-     * Id to pass to the runner to distinguish this message from all others.
-     */
-    public final byte[] recordId;
-
-    public IncomingMessage(
-        byte[] elementBytes,
-        long timestampMsSinceEpoch,
-        long requestTimeMsSinceEpoch,
-        String ackId,
-        byte[] recordId) {
-      this.elementBytes = elementBytes;
-      this.timestampMsSinceEpoch = timestampMsSinceEpoch;
-      this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
-      this.ackId = ackId;
-      this.recordId = recordId;
-    }
-  }
-
-  /**
-   * Gracefully close the underlying transport.
-   */
-  @Override
-  void close();
-
-
-  /**
-   * Publish {@code outgoingMessages} to Pubsub {@code topic}. Return number of messages
-   * published.
-   *
-   * @throws IOException
-   */
-  int publish(TopicPath topic, Iterable<OutgoingMessage> outgoingMessages) throws IOException;
-
-  /**
-   * Request the next batch of up to {@code batchSize} messages from {@code subscription}.
-   * Return the received messages, or empty collection if none were available. Does not
-   * wait for messages to arrive. Returned messages will record heir request time
-   * as {@code requestTimeMsSinceEpoch}.
-   *
-   * @throws IOException
-   */
-  Collection<IncomingMessage> pull(
-      long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize)
-      throws IOException;
-
-  /**
-   * Acknowldege messages from {@code subscription} with {@code ackIds}.
-   *
-   * @throws IOException
-   */
-  void acknowledge(SubscriptionPath subscription, Iterable<String> ackIds) throws IOException;
-
-  /**
-   * Modify the ack deadline for messages from {@code subscription} with {@code ackIds} to
-   * be {@code deadlineSeconds} from now.
-   *
-   * @throws IOException
-   */
-  void modifyAckDeadline(
-      SubscriptionPath subscription, Iterable<String> ackIds,
-      int deadlineSeconds)
-      throws IOException;
-
-  /**
-   * Create {@code topic}.
-   *
-   * @throws IOException
-   */
-  void createTopic(TopicPath topic) throws IOException;
-
-  /*
-   * Delete {@code topic}.
-   *
-   * @throws IOException
-   */
-  void deleteTopic(TopicPath topic) throws IOException;
-
-  /**
-   * Return a list of topics for {@code project}.
-   *
-   * @throws IOException
-   */
-  Collection<TopicPath> listTopics(ProjectPath project) throws IOException;
-
-  /**
-   * Create {@code subscription} to {@code topic}.
-   *
-   * @throws IOException
-   */
-  void createSubscription(
-      TopicPath topic, SubscriptionPath subscription,
-      int ackDeadlineSeconds) throws IOException;
-
-  /**
-   * Delete {@code subscription}.
-   *
-   * @throws IOException
-   */
-  void deleteSubscription(SubscriptionPath subscription) throws IOException;
-
-  /**
-   * Return a list of subscriptions for {@code topic} in {@code project}.
-   *
-   * @throws IOException
-   */
-  Collection<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
-      throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java
deleted file mode 100644
index 6e34705..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.api.client.util.DateTime;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.cloud.dataflow.sdk.options.GcpOptions;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.hash.Hashing;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Timestamp;
-import com.google.pubsub.v1.AcknowledgeRequest;
-import com.google.pubsub.v1.DeleteSubscriptionRequest;
-import com.google.pubsub.v1.DeleteTopicRequest;
-import com.google.pubsub.v1.ListSubscriptionsRequest;
-import com.google.pubsub.v1.ListSubscriptionsResponse;
-import com.google.pubsub.v1.ListTopicsRequest;
-import com.google.pubsub.v1.ListTopicsResponse;
-import com.google.pubsub.v1.ModifyAckDeadlineRequest;
-import com.google.pubsub.v1.PublishRequest;
-import com.google.pubsub.v1.PublishResponse;
-import com.google.pubsub.v1.PublisherGrpc;
-import com.google.pubsub.v1.PubsubMessage;
-import com.google.pubsub.v1.PullRequest;
-import com.google.pubsub.v1.PullResponse;
-import com.google.pubsub.v1.ReceivedMessage;
-import com.google.pubsub.v1.SubscriberGrpc;
-import com.google.pubsub.v1.Subscription;
-import com.google.pubsub.v1.Topic;
-import io.grpc.Channel;
-import io.grpc.ClientInterceptors;
-import io.grpc.ManagedChannel;
-import io.grpc.auth.ClientAuthInterceptor;
-import io.grpc.netty.GrpcSslContexts;
-import io.grpc.netty.NegotiationType;
-import io.grpc.netty.NettyChannelBuilder;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-/**
- * A helper class for talking to Pubsub via grpc.
- */
-public class PubsubGrpcClient implements PubsubClient {
-  private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com";
-  private static final int PUBSUB_PORT = 443;
-  private static final List<String> PUBSUB_SCOPES =
-      Collections.singletonList("https://www.googleapis.com/auth/pubsub");
-  private static final int LIST_BATCH_SIZE = 1000;
-
-  /**
-   * Timeout for grpc calls (in s).
-   */
-  private static final int TIMEOUT_S = 15;
-
-  /**
-   * Underlying netty channel, or {@literal null} if closed.
-   */
-  @Nullable
-  private ManagedChannel publisherChannel;
-
-  /**
-   * Credentials determined from options and environment.
-   */
-  private final GoogleCredentials credentials;
-
-  /**
-   * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
-   * instead.
-   */
-  @Nullable
-  private final String timestampLabel;
-
-  /**
-   * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids.
-   */
-  @Nullable
-  private final String idLabel;
-
-  /**
-   * Cached stubs, or null if not cached.
-   */
-  @Nullable
-  private PublisherGrpc.PublisherBlockingStub cachedPublisherStub;
-  private SubscriberGrpc.SubscriberBlockingStub cachedSubscriberStub;
-
-  private PubsubGrpcClient(
-      @Nullable String timestampLabel, @Nullable String idLabel,
-      ManagedChannel publisherChannel, GoogleCredentials credentials) {
-    this.timestampLabel = timestampLabel;
-    this.idLabel = idLabel;
-    this.publisherChannel = publisherChannel;
-    this.credentials = credentials;
-  }
-
-  /**
-   * Construct a new Pubsub grpc client. It should be closed via {@link #close} in order
-   * to ensure tidy cleanup of underlying netty resources. (Or use the try-with-resources
-   * construct since this class is {@link AutoCloseable}). If non-{@literal null}, use
-   * {@code timestampLabel} and {@code idLabel} to store custom timestamps/ids within
-   * message metadata.
-   */
-  public static PubsubGrpcClient newClient(
-      @Nullable String timestampLabel, @Nullable String idLabel,
-      GcpOptions options) throws IOException {
-    ManagedChannel channel = NettyChannelBuilder
-        .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT)
-        .negotiationType(NegotiationType.TLS)
-        .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
-        .build();
-    // TODO: GcpOptions needs to support building com.google.auth.oauth2.Credentials from the
-    // various command line options. It currently only supports the older
-    // com.google.api.client.auth.oauth2.Credentials.
-    GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
-    return new PubsubGrpcClient(timestampLabel, idLabel, channel, credentials);
-  }
-
-  /**
-   * Gracefully close the underlying netty channel.
-   */
-  @Override
-  public void close() {
-    Preconditions.checkState(publisherChannel != null, "Client has already been closed");
-    publisherChannel.shutdown();
-    try {
-      publisherChannel.awaitTermination(TIMEOUT_S, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      // Ignore.
-      Thread.currentThread().interrupt();
-    }
-    publisherChannel = null;
-    cachedPublisherStub = null;
-    cachedSubscriberStub = null;
-  }
-
-  /**
-   * Return channel with interceptor for returning credentials.
-   */
-  private Channel newChannel() throws IOException {
-    Preconditions.checkState(publisherChannel != null, "PubsubGrpcClient has been closed");
-    ClientAuthInterceptor interceptor =
-        new ClientAuthInterceptor(credentials, Executors.newSingleThreadExecutor());
-    return ClientInterceptors.intercept(publisherChannel, interceptor);
-  }
-
-  /**
-   * Return a stub for making a publish request with a timeout.
-   */
-  private PublisherGrpc.PublisherBlockingStub publisherStub() throws IOException {
-    if (cachedPublisherStub == null) {
-      cachedPublisherStub = PublisherGrpc.newBlockingStub(newChannel());
-    }
-    return cachedPublisherStub.withDeadlineAfter(TIMEOUT_S, TimeUnit.SECONDS);
-  }
-
-  /**
-   * Return a stub for making a subscribe request with a timeout.
-   */
-  private SubscriberGrpc.SubscriberBlockingStub subscriberStub() throws IOException {
-    if (cachedSubscriberStub == null) {
-      cachedSubscriberStub = SubscriberGrpc.newBlockingStub(newChannel());
-    }
-    return cachedSubscriberStub.withDeadlineAfter(TIMEOUT_S, TimeUnit.SECONDS);
-  }
-
-  @Override
-  public int publish(TopicPath topic, Iterable<OutgoingMessage> outgoingMessages)
-      throws IOException {
-    PublishRequest.Builder request = PublishRequest.newBuilder()
-                                                   .setTopic(topic.getPath());
-    for (OutgoingMessage outgoingMessage : outgoingMessages) {
-      PubsubMessage.Builder message =
-          PubsubMessage.newBuilder()
-                       .setData(ByteString.copyFrom(outgoingMessage.elementBytes));
-
-      if (timestampLabel != null) {
-        message.getMutableAttributes()
-               .put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
-      }
-
-      if (idLabel != null) {
-        message.getMutableAttributes()
-               .put(idLabel,
-                   Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString());
-      }
-
-      request.addMessages(message);
-    }
-
-    PublishResponse response = publisherStub().publish(request.build());
-    return response.getMessageIdsCount();
-  }
-
-  @Override
-  public Collection<IncomingMessage> pull(
-      long requestTimeMsSinceEpoch,
-      SubscriptionPath subscription,
-      int batchSize) throws IOException {
-    PullRequest request = PullRequest.newBuilder()
-                                     .setSubscription(subscription.getPath())
-                                     .setReturnImmediately(true)
-                                     .setMaxMessages(batchSize)
-                                     .build();
-    PullResponse response = subscriberStub().pull(request);
-    if (response.getReceivedMessagesCount() == 0) {
-      return ImmutableList.of();
-    }
-    List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessagesCount());
-    for (ReceivedMessage message : response.getReceivedMessagesList()) {
-      PubsubMessage pubsubMessage = message.getMessage();
-      Map<String, String> attributes = pubsubMessage.getAttributes();
-
-      // Payload.
-      byte[] elementBytes = pubsubMessage.getData().toByteArray();
-
-      // Timestamp.
-      // Start with Pubsub processing time.
-      Timestamp timestampProto = pubsubMessage.getPublishTime();
-      long timestampMsSinceEpoch = timestampProto.getSeconds() + timestampProto.getNanos() / 1000L;
-      if (timestampLabel != null && attributes != null) {
-        String timestampString = attributes.get(timestampLabel);
-        if (timestampString != null && !timestampString.isEmpty()) {
-          try {
-            // Try parsing as milliseconds since epoch. Note there is no way to parse a
-            // string in RFC 3339 format here.
-            // Expected IllegalArgumentException if parsing fails; we use that to fall back
-            // to RFC 3339.
-            timestampMsSinceEpoch = Long.parseLong(timestampString);
-          } catch (IllegalArgumentException e1) {
-            try {
-              // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an
-              // IllegalArgumentException if parsing fails, and the caller should handle.
-              timestampMsSinceEpoch = DateTime.parseRfc3339(timestampString).getValue();
-            } catch (IllegalArgumentException e2) {
-              // Fallback to Pubsub processing time.
-            }
-          }
-        }
-        // else: fallback to Pubsub processing time.
-      }
-      // else: fallback to Pubsub processing time.
-
-      // Ack id.
-      String ackId = message.getAckId();
-      Preconditions.checkState(ackId != null && !ackId.isEmpty());
-
-      // Record id, if any.
-      @Nullable byte[] recordId = null;
-      if (idLabel != null && attributes != null) {
-        String recordIdString = attributes.get(idLabel);
-        if (recordIdString != null && !recordIdString.isEmpty()) {
-          recordId = recordIdString.getBytes();
-        }
-      }
-      if (recordId == null) {
-        recordId = pubsubMessage.getMessageId().getBytes();
-      }
-
-      incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch,
-          requestTimeMsSinceEpoch, ackId, recordId));
-    }
-    return incomingMessages;
-  }
-
-  @Override
-  public void acknowledge(SubscriptionPath subscription, Iterable<String> ackIds)
-      throws IOException {
-    AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
-                                                   .setSubscription(subscription.getPath())
-                                                   .addAllAckIds(ackIds)
-                                                   .build();
-    subscriberStub().acknowledge(request); // ignore Empty result.
-  }
-
-  @Override
-  public void modifyAckDeadline(
-      SubscriptionPath subscription, Iterable<String> ackIds, int
-      deadlineSeconds)
-      throws IOException {
-    ModifyAckDeadlineRequest request =
-        ModifyAckDeadlineRequest.newBuilder()
-                                .setSubscription(subscription.getPath())
-                                .addAllAckIds(ackIds)
-                                .setAckDeadlineSeconds(deadlineSeconds)
-                                .build();
-    subscriberStub().modifyAckDeadline(request); // ignore Empty result.
-  }
-
-  @Override
-  public void createTopic(TopicPath topic) throws IOException {
-    Topic request = Topic.newBuilder()
-                         .setName(topic.getPath())
-                         .build();
-    publisherStub().createTopic(request); // ignore Topic result.
-  }
-
-  @Override
-  public void deleteTopic(TopicPath topic) throws IOException {
-    DeleteTopicRequest request = DeleteTopicRequest.newBuilder()
-                                                   .setTopic(topic.getPath())
-                                                   .build();
-    publisherStub().deleteTopic(request); // ignore Empty result.
-  }
-
-  @Override
-  public Collection<TopicPath> listTopics(ProjectPath project) throws IOException {
-    ListTopicsRequest.Builder request =
-        ListTopicsRequest.newBuilder()
-                         .setProject(project.getPath())
-                         .setPageSize(LIST_BATCH_SIZE);
-    ListTopicsResponse response = publisherStub().listTopics(request.build());
-    if (response.getTopicsCount() == 0) {
-      return ImmutableList.of();
-    }
-    List<TopicPath> topics = new ArrayList<>(response.getTopicsCount());
-    while (true) {
-      for (Topic topic : response.getTopicsList()) {
-        topics.add(new TopicPath(topic.getName()));
-      }
-      if (response.getNextPageToken().isEmpty()) {
-        break;
-      }
-      request.setPageToken(response.getNextPageToken());
-      response = publisherStub().listTopics(request.build());
-    }
-    return topics;
-  }
-
-  @Override
-  public void createSubscription(
-      TopicPath topic, SubscriptionPath subscription,
-      int ackDeadlineSeconds) throws IOException {
-    Subscription request = Subscription.newBuilder()
-                                       .setTopic(topic.getPath())
-                                       .setName(subscription.getPath())
-                                       .setAckDeadlineSeconds(ackDeadlineSeconds)
-                                       .build();
-    subscriberStub().createSubscription(request); // ignore Subscription result.
-  }
-
-  @Override
-  public void deleteSubscription(SubscriptionPath subscription) throws IOException {
-    DeleteSubscriptionRequest request =
-        DeleteSubscriptionRequest.newBuilder()
-                                 .setSubscription(subscription.getPath())
-                                 .build();
-    subscriberStub().deleteSubscription(request); // ignore Empty result.
-  }
-
-  @Override
-  public Collection<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
-      throws IOException {
-    ListSubscriptionsRequest.Builder request =
-        ListSubscriptionsRequest.newBuilder()
-                                .setProject(project.getPath())
-                                .setPageSize(LIST_BATCH_SIZE);
-    ListSubscriptionsResponse response = subscriberStub().listSubscriptions(request.build());
-    if (response.getSubscriptionsCount() == 0) {
-      return ImmutableList.of();
-    }
-    List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptionsCount());
-    while (true) {
-      for (Subscription subscription : response.getSubscriptionsList()) {
-        if (subscription.getTopic().equals(topic.getPath())) {
-          subscriptions.add(new SubscriptionPath(subscription.getName()));
-        }
-      }
-      if (response.getNextPageToken().isEmpty()) {
-        break;
-      }
-      request.setPageToken(response.getNextPageToken());
-      response = subscriberStub().listSubscriptions(request.build());
-    }
-    return subscriptions;
-  }
-}