You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/14 06:48:02 UTC
[15/74] [partial] incubator-beam git commit: Rename
com/google/cloud/dataflow->org/apache/beam
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java
deleted file mode 100644
index 3ad32b4..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java
+++ /dev/null
@@ -1,663 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.util.IOChannelFactory;
-import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.SeekableByteChannel;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.NoSuchElementException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-
-/**
- * A common base class for all file-based {@link Source}s. Extend this class to implement your own
- * file-based custom source.
- *
- * <p>A file-based {@code Source} is a {@code Source} backed by a file pattern defined as a Java
- * glob, a single file, or a offset range for a single file. See {@link OffsetBasedSource} and
- * {@link com.google.cloud.dataflow.sdk.io.range.RangeTracker} for semantics of offset ranges.
- *
- * <p>This source stores a {@code String} that is an {@link IOChannelFactory} specification for a
- * file or file pattern. There should be an {@code IOChannelFactory} defined for the file
- * specification provided. Please refer to {@link IOChannelUtils} and {@link IOChannelFactory} for
- * more information on this.
- *
- * <p>In addition to the methods left abstract from {@code BoundedSource}, subclasses must implement
- * methods to create a sub-source and a reader for a range of a single file -
- * {@link #createForSubrangeOfFile} and {@link #createSingleFileReader}. Please refer to
- * {@link XmlSource} for an example implementation of {@code FileBasedSource}.
- *
- * @param <T> Type of records represented by the source.
- */
-public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
- private static final Logger LOG = LoggerFactory.getLogger(FileBasedSource.class);
- private static final float FRACTION_OF_FILES_TO_STAT = 0.01f;
-
- // Package-private for testing
- static final int MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT = 100;
-
- // Size of the thread pool to be used for performing file operations in parallel.
- // Package-private for testing.
- static final int THREAD_POOL_SIZE = 128;
-
- private final String fileOrPatternSpec;
- private final Mode mode;
-
- /**
- * A given {@code FileBasedSource} represents a file resource of one of these types.
- */
- public enum Mode {
- FILEPATTERN,
- SINGLE_FILE_OR_SUBRANGE
- }
-
- /**
- * Create a {@code FileBaseSource} based on a file or a file pattern specification. This
- * constructor must be used when creating a new {@code FileBasedSource} for a file pattern.
- *
- * <p>See {@link OffsetBasedSource} for a detailed description of {@code minBundleSize}.
- *
- * @param fileOrPatternSpec {@link IOChannelFactory} specification of file or file pattern
- * represented by the {@link FileBasedSource}.
- * @param minBundleSize minimum bundle size in bytes.
- */
- public FileBasedSource(String fileOrPatternSpec, long minBundleSize) {
- super(0, Long.MAX_VALUE, minBundleSize);
- mode = Mode.FILEPATTERN;
- this.fileOrPatternSpec = fileOrPatternSpec;
- }
-
- /**
- * Create a {@code FileBasedSource} based on a single file. This constructor must be used when
- * creating a new {@code FileBasedSource} for a subrange of a single file.
- * Additionally, this constructor must be used to create new {@code FileBasedSource}s when
- * subclasses implement the method {@link #createForSubrangeOfFile}.
- *
- * <p>See {@link OffsetBasedSource} for detailed descriptions of {@code minBundleSize},
- * {@code startOffset}, and {@code endOffset}.
- *
- * @param fileName {@link IOChannelFactory} specification of the file represented by the
- * {@link FileBasedSource}.
- * @param minBundleSize minimum bundle size in bytes.
- * @param startOffset starting byte offset.
- * @param endOffset ending byte offset. If the specified value {@code >= #getMaxEndOffset()} it
- * implies {@code #getMaxEndOffSet()}.
- */
- public FileBasedSource(String fileName, long minBundleSize,
- long startOffset, long endOffset) {
- super(startOffset, endOffset, minBundleSize);
- mode = Mode.SINGLE_FILE_OR_SUBRANGE;
- this.fileOrPatternSpec = fileName;
- }
-
- public final String getFileOrPatternSpec() {
- return fileOrPatternSpec;
- }
-
- public final Mode getMode() {
- return mode;
- }
-
- @Override
- public final FileBasedSource<T> createSourceForSubrange(long start, long end) {
- checkArgument(mode != Mode.FILEPATTERN,
- "Cannot split a file pattern based source based on positions");
- checkArgument(start >= getStartOffset(),
- "Start offset value %s of the subrange cannot be smaller than the start offset value %s"
- + " of the parent source",
- start,
- getStartOffset());
- checkArgument(end <= getEndOffset(),
- "End offset value %s of the subrange cannot be larger than the end offset value %s",
- end,
- getEndOffset());
-
- FileBasedSource<T> source = createForSubrangeOfFile(fileOrPatternSpec, start, end);
- if (start > 0 || end != Long.MAX_VALUE) {
- checkArgument(source.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE,
- "Source created for the range [%s,%s) must be a subrange source", start, end);
- }
- return source;
- }
-
- /**
- * Creates and returns a new {@code FileBasedSource} of the same type as the current
- * {@code FileBasedSource} backed by a given file and an offset range. When current source is
- * being split, this method is used to generate new sub-sources. When creating the source
- * subclasses must call the constructor {@link #FileBasedSource(String, long, long, long)} of
- * {@code FileBasedSource} with corresponding parameter values passed here.
- *
- * @param fileName file backing the new {@code FileBasedSource}.
- * @param start starting byte offset of the new {@code FileBasedSource}.
- * @param end ending byte offset of the new {@code FileBasedSource}. May be Long.MAX_VALUE,
- * in which case it will be inferred using {@link #getMaxEndOffset}.
- */
- protected abstract FileBasedSource<T> createForSubrangeOfFile(
- String fileName, long start, long end);
-
- /**
- * Creates and returns an instance of a {@code FileBasedReader} implementation for the current
- * source assuming the source represents a single file. File patterns will be handled by
- * {@code FileBasedSource} implementation automatically.
- */
- protected abstract FileBasedReader<T> createSingleFileReader(
- PipelineOptions options);
-
- @Override
- public final long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
- // This implementation of method getEstimatedSizeBytes is provided to simplify subclasses. Here
- // we perform the size estimation of files and file patterns using the interface provided by
- // IOChannelFactory.
-
- IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
- if (mode == Mode.FILEPATTERN) {
- // TODO Implement a more efficient parallel/batch size estimation mechanism for file patterns.
- long startTime = System.currentTimeMillis();
- long totalSize = 0;
- Collection<String> inputs = factory.match(fileOrPatternSpec);
- if (inputs.size() <= MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT) {
- totalSize = getExactTotalSizeOfFiles(inputs, factory);
- LOG.debug("Size estimation of all files of pattern {} took {} ms",
- fileOrPatternSpec,
- System.currentTimeMillis() - startTime);
- } else {
- totalSize = getEstimatedSizeOfFilesBySampling(inputs, factory);
- LOG.debug("Size estimation of pattern {} by sampling took {} ms",
- fileOrPatternSpec,
- System.currentTimeMillis() - startTime);
- }
- return totalSize;
- } else {
- long start = getStartOffset();
- long end = Math.min(getEndOffset(), getMaxEndOffset(options));
- return end - start;
- }
- }
-
- // Get the exact total size of the given set of files.
- // Invokes multiple requests for size estimation in parallel using a thread pool.
- // TODO: replace this with bulk request API when it is available. Will require updates
- // to IOChannelFactory interface.
- private static long getExactTotalSizeOfFiles(
- Collection<String> files, IOChannelFactory ioChannelFactory) throws IOException {
- List<ListenableFuture<Long>> futures = new ArrayList<>();
- ListeningExecutorService service =
- MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_POOL_SIZE));
- try {
- long totalSize = 0;
- for (String file : files) {
- futures.add(createFutureForSizeEstimation(file, ioChannelFactory, service));
- }
-
- for (Long val : Futures.allAsList(futures).get()) {
- totalSize += val;
- }
-
- return totalSize;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException(e);
- } catch (ExecutionException e) {
- throw new IOException(e.getCause());
- } finally {
- service.shutdown();
- }
- }
-
- private static ListenableFuture<Long> createFutureForSizeEstimation(
- final String file,
- final IOChannelFactory ioChannelFactory,
- ListeningExecutorService service) {
- return service.submit(
- new Callable<Long>() {
- @Override
- public Long call() throws IOException {
- return ioChannelFactory.getSizeBytes(file);
- }
- });
- }
-
- // Estimate the total size of the given set of files through sampling and extrapolation.
- // Currently we use uniform sampling which requires a linear sampling size for a reasonable
- // estimate.
- // TODO: Implement a more efficient sampling mechanism.
- private static long getEstimatedSizeOfFilesBySampling(
- Collection<String> files, IOChannelFactory ioChannelFactory) throws IOException {
- int sampleSize = (int) (FRACTION_OF_FILES_TO_STAT * files.size());
- sampleSize = Math.max(MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT, sampleSize);
-
- List<String> selectedFiles = new ArrayList<String>(files);
- Collections.shuffle(selectedFiles);
- selectedFiles = selectedFiles.subList(0, sampleSize);
-
- return files.size() * getExactTotalSizeOfFiles(selectedFiles, ioChannelFactory)
- / selectedFiles.size();
- }
-
- private ListenableFuture<List<? extends FileBasedSource<T>>> createFutureForFileSplit(
- final String file,
- final long desiredBundleSizeBytes,
- final PipelineOptions options,
- ListeningExecutorService service) {
- return service.submit(new Callable<List<? extends FileBasedSource<T>>>() {
- @Override
- public List<? extends FileBasedSource<T>> call() throws Exception {
- return createForSubrangeOfFile(file, 0, Long.MAX_VALUE)
- .splitIntoBundles(desiredBundleSizeBytes, options);
- }
- });
- }
-
- @Override
- public final List<? extends FileBasedSource<T>> splitIntoBundles(
- long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
- // This implementation of method splitIntoBundles is provided to simplify subclasses. Here we
- // split a FileBasedSource based on a file pattern to FileBasedSources based on full single
- // files. For files that can be efficiently seeked, we further split FileBasedSources based on
- // those files to FileBasedSources based on sub ranges of single files.
-
- if (mode == Mode.FILEPATTERN) {
- long startTime = System.currentTimeMillis();
- List<ListenableFuture<List<? extends FileBasedSource<T>>>> futures = new ArrayList<>();
-
- ListeningExecutorService service =
- MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_POOL_SIZE));
- try {
- for (final String file : FileBasedSource.expandFilePattern(fileOrPatternSpec)) {
- futures.add(createFutureForFileSplit(file, desiredBundleSizeBytes, options, service));
- }
- List<? extends FileBasedSource<T>> splitResults =
- ImmutableList.copyOf(Iterables.concat(Futures.allAsList(futures).get()));
- LOG.debug(
- "Splitting the source based on file pattern {} took {} ms",
- fileOrPatternSpec,
- System.currentTimeMillis() - startTime);
- return splitResults;
- } finally {
- service.shutdown();
- }
- } else {
- if (isSplittable()) {
- List<FileBasedSource<T>> splitResults = new ArrayList<>();
- for (OffsetBasedSource<T> split :
- super.splitIntoBundles(desiredBundleSizeBytes, options)) {
- splitResults.add((FileBasedSource<T>) split);
- }
- return splitResults;
- } else {
- LOG.debug("The source for file {} is not split into sub-range based sources since "
- + "the file is not seekable",
- fileOrPatternSpec);
- return ImmutableList.of(this);
- }
- }
- }
-
- /**
- * Determines whether a file represented by this source is can be split into bundles.
- *
- * <p>By default, a file is splittable if it is on a file system that supports efficient read
- * seeking. Subclasses may override to provide different behavior.
- */
- protected boolean isSplittable() throws Exception {
- // We split a file-based source into subranges only if the file is efficiently seekable.
- // If a file is not efficiently seekable it would be highly inefficient to create and read a
- // source based on a subrange of that file.
- IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
- return factory.isReadSeekEfficient(fileOrPatternSpec);
- }
-
- @Override
- public final BoundedReader<T> createReader(PipelineOptions options) throws IOException {
- // Validate the current source prior to creating a reader for it.
- this.validate();
-
- if (mode == Mode.FILEPATTERN) {
- long startTime = System.currentTimeMillis();
- Collection<String> files = FileBasedSource.expandFilePattern(fileOrPatternSpec);
- List<FileBasedReader<T>> fileReaders = new ArrayList<>();
- for (String fileName : files) {
- long endOffset;
- try {
- endOffset = IOChannelUtils.getFactory(fileName).getSizeBytes(fileName);
- } catch (IOException e) {
- LOG.warn("Failed to get size of {}", fileName, e);
- endOffset = Long.MAX_VALUE;
- }
- fileReaders.add(
- createForSubrangeOfFile(fileName, 0, endOffset).createSingleFileReader(options));
- }
- LOG.debug(
- "Creating a reader for file pattern {} took {} ms",
- fileOrPatternSpec,
- System.currentTimeMillis() - startTime);
- if (fileReaders.size() == 1) {
- return fileReaders.get(0);
- }
- return new FilePatternReader(this, fileReaders);
- } else {
- return createSingleFileReader(options);
- }
- }
-
- @Override
- public String toString() {
- switch (mode) {
- case FILEPATTERN:
- return fileOrPatternSpec;
- case SINGLE_FILE_OR_SUBRANGE:
- return fileOrPatternSpec + " range " + super.toString();
- default:
- throw new IllegalStateException("Unexpected mode: " + mode);
- }
- }
-
- @Override
- public void validate() {
- super.validate();
- switch (mode) {
- case FILEPATTERN:
- checkArgument(getStartOffset() == 0,
- "FileBasedSource is based on a file pattern or a full single file "
- + "but the starting offset proposed %s is not zero", getStartOffset());
- checkArgument(getEndOffset() == Long.MAX_VALUE,
- "FileBasedSource is based on a file pattern or a full single file "
- + "but the ending offset proposed %s is not Long.MAX_VALUE", getEndOffset());
- break;
- case SINGLE_FILE_OR_SUBRANGE:
- // Nothing more to validate.
- break;
- default:
- throw new IllegalStateException("Unknown mode: " + mode);
- }
- }
-
- @Override
- public final long getMaxEndOffset(PipelineOptions options) throws IOException {
- checkArgument(
- mode != Mode.FILEPATTERN, "Cannot determine the exact end offset of a file pattern");
- if (getEndOffset() == Long.MAX_VALUE) {
- IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
- return factory.getSizeBytes(fileOrPatternSpec);
- } else {
- return getEndOffset();
- }
- }
-
- protected static final Collection<String> expandFilePattern(String fileOrPatternSpec)
- throws IOException {
- IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
- Collection<String> matches = factory.match(fileOrPatternSpec);
- LOG.info("Matched {} files for pattern {}", matches.size(), fileOrPatternSpec);
- return matches;
- }
-
- /**
- * A {@link Source.Reader reader} that implements code common to readers of
- * {@code FileBasedSource}s.
- *
- * <h2>Seekability</h2>
- *
- * <p>This reader uses a {@link ReadableByteChannel} created for the file represented by the
- * corresponding source to efficiently move to the correct starting position defined in the
- * source. Subclasses of this reader should implement {@link #startReading} to get access to this
- * channel. If the source corresponding to the reader is for a subrange of a file the
- * {@code ReadableByteChannel} provided is guaranteed to be an instance of the type
- * {@link SeekableByteChannel}, which may be used by subclass to traverse back in the channel to
- * determine the correct starting position.
- *
- * <h2>Reading Records</h2>
- *
- * <p>Sequential reading is implemented using {@link #readNextRecord}.
- *
- * <p>Then {@code FileBasedReader} implements "reading a range [A, B)" in the following way.
- * <ol>
- * <li>{@link #start} opens the file
- * <li>{@link #start} seeks the {@code SeekableByteChannel} to A (reading offset ranges for
- * non-seekable files is not supported) and calls {@code startReading()}
- * <li>{@link #start} calls {@link #advance} once, which, via {@link #readNextRecord},
- * locates the first record which is at a split point AND its offset is at or after A.
- * If this record is at or after B, {@link #advance} returns false and reading is finished.
- * <li>if the previous advance call returned {@code true} sequential reading starts and
- * {@code advance()} will be called repeatedly
- * </ol>
- * {@code advance()} calls {@code readNextRecord()} on the subclass, and stops (returns false) if
- * the new record is at a split point AND the offset of the new record is at or after B.
- *
- * <h2>Thread Safety</h2>
- *
- * <p>Since this class implements {@link Source.Reader} it guarantees thread safety. Abstract
- * methods defined here will not be accessed by more than one thread concurrently.
- */
- public abstract static class FileBasedReader<T> extends OffsetBasedReader<T> {
- private ReadableByteChannel channel = null;
-
- /**
- * Subclasses should not perform IO operations at the constructor. All IO operations should be
- * delayed until the {@link #startReading} method is invoked.
- */
- public FileBasedReader(FileBasedSource<T> source) {
- super(source);
- checkArgument(source.getMode() != Mode.FILEPATTERN,
- "FileBasedReader does not support reading file patterns");
- }
-
- @Override
- public FileBasedSource<T> getCurrentSource() {
- return (FileBasedSource<T>) super.getCurrentSource();
- }
-
- @Override
- protected final boolean startImpl() throws IOException {
- FileBasedSource<T> source = getCurrentSource();
- IOChannelFactory factory = IOChannelUtils.getFactory(source.getFileOrPatternSpec());
- this.channel = factory.open(source.getFileOrPatternSpec());
-
- if (channel instanceof SeekableByteChannel) {
- SeekableByteChannel seekChannel = (SeekableByteChannel) channel;
- seekChannel.position(source.getStartOffset());
- } else {
- // Channel is not seekable. Must not be a subrange.
- checkArgument(source.mode != Mode.SINGLE_FILE_OR_SUBRANGE,
- "Subrange-based sources must only be defined for file types that support seekable "
- + " read channels");
- checkArgument(source.getStartOffset() == 0,
- "Start offset %s is not zero but channel for reading the file is not seekable.",
- source.getStartOffset());
- }
-
- startReading(channel);
-
- // Advance once to load the first record.
- return advanceImpl();
- }
-
- @Override
- protected final boolean advanceImpl() throws IOException {
- return readNextRecord();
- }
-
- /**
- * Closes any {@link ReadableByteChannel} created for the current reader. This implementation is
- * idempotent. Any {@code close()} method introduced by a subclass must be idempotent and must
- * call the {@code close()} method in the {@code FileBasedReader}.
- */
- @Override
- public void close() throws IOException {
- if (channel != null) {
- channel.close();
- }
- }
-
- /**
- * Performs any initialization of the subclass of {@code FileBasedReader} that involves IO
- * operations. Will only be invoked once and before that invocation the base class will seek the
- * channel to the source's starting offset.
- *
- * <p>Provided {@link ReadableByteChannel} is for the file represented by the source of this
- * reader. Subclass may use the {@code channel} to build a higher level IO abstraction, e.g., a
- * BufferedReader or an XML parser.
- *
- * <p>If the corresponding source is for a subrange of a file, {@code channel} is guaranteed to
- * be an instance of the type {@link SeekableByteChannel}.
- *
- * <p>After this method is invoked the base class will not be reading data from the channel or
- * adjusting the position of the channel. But the base class is responsible for properly closing
- * the channel.
- *
- * @param channel a byte channel representing the file backing the reader.
- */
- protected abstract void startReading(ReadableByteChannel channel) throws IOException;
-
- /**
- * Reads the next record from the channel provided by {@link #startReading}. Methods
- * {@link #getCurrent}, {@link #getCurrentOffset}, and {@link #isAtSplitPoint()} should return
- * the corresponding information about the record read by the last invocation of this method.
- *
- * <p>Note that this method will be called the same way for reading the first record in the
- * source (file or offset range in the file) and for reading subsequent records. It is up to the
- * subclass to do anything special for locating and reading the first record, if necessary.
- *
- * @return {@code true} if a record was successfully read, {@code false} if the end of the
- * channel was reached before successfully reading a new record.
- */
- protected abstract boolean readNextRecord() throws IOException;
- }
-
- // An internal Reader implementation that concatenates a sequence of FileBasedReaders.
- private class FilePatternReader extends BoundedReader<T> {
- private final FileBasedSource<T> source;
- private final List<FileBasedReader<T>> fileReaders;
- final ListIterator<FileBasedReader<T>> fileReadersIterator;
- FileBasedReader<T> currentReader = null;
-
- public FilePatternReader(FileBasedSource<T> source, List<FileBasedReader<T>> fileReaders) {
- this.source = source;
- this.fileReaders = fileReaders;
- this.fileReadersIterator = fileReaders.listIterator();
- }
-
- @Override
- public boolean start() throws IOException {
- return startNextNonemptyReader();
- }
-
- @Override
- public boolean advance() throws IOException {
- checkState(currentReader != null, "Call start() before advance()");
- if (currentReader.advance()) {
- return true;
- }
- return startNextNonemptyReader();
- }
-
- private boolean startNextNonemptyReader() throws IOException {
- while (fileReadersIterator.hasNext()) {
- currentReader = fileReadersIterator.next();
- if (currentReader.start()) {
- return true;
- }
- currentReader.close();
- }
- return false;
- }
-
- @Override
- public T getCurrent() throws NoSuchElementException {
- // A NoSuchElement will be thrown by the last FileBasedReader if getCurrent() is called after
- // advance() returns false.
- return currentReader.getCurrent();
- }
-
- @Override
- public Instant getCurrentTimestamp() throws NoSuchElementException {
- // A NoSuchElement will be thrown by the last FileBasedReader if getCurrentTimestamp()
- // is called after advance() returns false.
- return currentReader.getCurrentTimestamp();
- }
-
- @Override
- public void close() throws IOException {
- // Close all readers that may have not yet been closed.
- // If this reader has not been started, currentReader is null.
- if (currentReader != null) {
- currentReader.close();
- }
- while (fileReadersIterator.hasNext()) {
- fileReadersIterator.next().close();
- }
- }
-
- @Override
- public FileBasedSource<T> getCurrentSource() {
- return source;
- }
-
- @Override
- public FileBasedSource<T> splitAtFraction(double fraction) {
- // Unsupported. TODO: implement.
- LOG.debug("Dynamic splitting of FilePatternReader is unsupported.");
- return null;
- }
-
- @Override
- public Double getFractionConsumed() {
- if (currentReader == null) {
- return 0.0;
- }
- if (fileReaders.isEmpty()) {
- return 1.0;
- }
- int index = fileReadersIterator.previousIndex();
- int numReaders = fileReaders.size();
- if (index == numReaders) {
- return 1.0;
- }
- double before = 1.0 * index / numReaders;
- double after = 1.0 * (index + 1) / numReaders;
- Double fractionOfCurrentReader = currentReader.getFractionConsumed();
- if (fractionOfCurrentReader == null) {
- return before;
- }
- return before + fractionOfCurrentReader * (after - before);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/OffsetBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/OffsetBasedSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/OffsetBasedSource.java
deleted file mode 100644
index 288688e..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/OffsetBasedSource.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.cloud.dataflow.sdk.io.range.OffsetRangeTracker;
-import com.google.cloud.dataflow.sdk.io.range.RangeTracker;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.common.base.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * A {@link BoundedSource} that uses offsets to define starting and ending positions.
- *
- * <p>{@link OffsetBasedSource} is a common base class for all bounded sources where the input can
- * be represented as a single range, and an input can be efficiently processed in parallel by
- * splitting the range into a set of disjoint ranges whose union is the original range. This class
- * should be used for sources that can be cheaply read starting at any given offset.
- * {@link OffsetBasedSource} stores the range and implements splitting into bundles.
- *
- * <p>Extend {@link OffsetBasedSource} to implement your own offset-based custom source.
- * {@link FileBasedSource}, which is a subclass of this, adds additional functionality useful for
- * custom sources that are based on files. If possible implementors should start from
- * {@link FileBasedSource} instead of {@link OffsetBasedSource}.
- *
- * <p>Consult {@link RangeTracker} for important semantics common to all sources defined by a range
- * of positions of a certain type, including the semantics of split points
- * ({@link OffsetBasedReader#isAtSplitPoint}).
- *
- * @param <T> Type of records represented by the source.
- * @see BoundedSource
- * @see FileBasedSource
- * @see RangeTracker
- */
-public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
- private final long startOffset;
- private final long endOffset;
- private final long minBundleSize;
-
- /**
- * @param startOffset starting offset (inclusive) of the source. Must be non-negative.
- *
- * @param endOffset ending offset (exclusive) of the source. Use {@link Long#MAX_VALUE} to
- * indicate that the entire source after {@code startOffset} should be read. Must be
- * {@code > startOffset}.
- *
- * @param minBundleSize minimum bundle size in offset units that should be used when splitting the
- * source into sub-sources. This value may not be respected if the total
- * range of the source is smaller than the specified {@code minBundleSize}.
- * Must be non-negative.
- */
- public OffsetBasedSource(long startOffset, long endOffset, long minBundleSize) {
- this.startOffset = startOffset;
- this.endOffset = endOffset;
- this.minBundleSize = minBundleSize;
- }
-
- /**
- * Returns the starting offset of the source.
- */
- public long getStartOffset() {
- return startOffset;
- }
-
- /**
- * Returns the specified ending offset of the source. Any returned value greater than or equal to
- * {@link #getMaxEndOffset(PipelineOptions)} should be treated as
- * {@link #getMaxEndOffset(PipelineOptions)}.
- */
- public long getEndOffset() {
- return endOffset;
- }
-
- /**
- * Returns the minimum bundle size that should be used when splitting the source into sub-sources.
- * This value may not be respected if the total range of the source is smaller than the specified
- * {@code minBundleSize}.
- */
- public long getMinBundleSize() {
- return minBundleSize;
- }
-
- @Override
- public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
- long trueEndOffset = (endOffset == Long.MAX_VALUE) ? getMaxEndOffset(options) : endOffset;
- return getBytesPerOffset() * (trueEndOffset - getStartOffset());
- }
-
- @Override
- public List<? extends OffsetBasedSource<T>> splitIntoBundles(
- long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
- // Split the range into bundles based on the desiredBundleSizeBytes. Final bundle is adjusted to
- // make sure that we do not end up with a too small bundle at the end. If the desired bundle
- // size is smaller than the minBundleSize of the source then minBundleSize will be used instead.
-
- long desiredBundleSizeOffsetUnits = Math.max(
- Math.max(1, desiredBundleSizeBytes / getBytesPerOffset()),
- minBundleSize);
-
- List<OffsetBasedSource<T>> subSources = new ArrayList<>();
- long start = startOffset;
- long maxEnd = Math.min(endOffset, getMaxEndOffset(options));
-
- while (start < maxEnd) {
- long end = start + desiredBundleSizeOffsetUnits;
- end = Math.min(end, maxEnd);
- // Avoid having a too small bundle at the end and ensure that we respect minBundleSize.
- long remaining = maxEnd - end;
- if ((remaining < desiredBundleSizeOffsetUnits / 4) || (remaining < minBundleSize)) {
- end = maxEnd;
- }
- subSources.add(createSourceForSubrange(start, end));
-
- start = end;
- }
- return subSources;
- }
-
- @Override
- public void validate() {
- Preconditions.checkArgument(
- this.startOffset >= 0,
- "Start offset has value %s, must be non-negative", this.startOffset);
- Preconditions.checkArgument(
- this.endOffset >= 0,
- "End offset has value %s, must be non-negative", this.endOffset);
- Preconditions.checkArgument(
- this.startOffset < this.endOffset,
- "Start offset %s must be before end offset %s",
- this.startOffset, this.endOffset);
- Preconditions.checkArgument(
- this.minBundleSize >= 0,
- "minBundleSize has value %s, must be non-negative",
- this.minBundleSize);
- }
-
- @Override
- public String toString() {
- return "[" + startOffset + ", " + endOffset + ")";
- }
-
- /**
- * Returns approximately how many bytes of data correspond to a single offset in this source.
- * Used for translation between this source's range and methods defined in terms of bytes, such
- * as {@link #getEstimatedSizeBytes} and {@link #splitIntoBundles}.
- *
- * <p>Defaults to {@code 1} byte, which is the common case for, e.g., file sources.
- */
- public long getBytesPerOffset() {
- return 1L;
- }
-
- /**
- * Returns the actual ending offset of the current source. The value returned by this function
- * will be used to clip the end of the range {@code [startOffset, endOffset)} such that the
- * range used is {@code [startOffset, min(endOffset, maxEndOffset))}.
- *
- * <p>As an example in which {@link OffsetBasedSource} is used to implement a file source, suppose
- * that this source was constructed with an {@code endOffset} of {@link Long#MAX_VALUE} to
- * indicate that a file should be read to the end. Then {@link #getMaxEndOffset} should determine
- * the actual, exact size of the file in bytes and return it.
- */
- public abstract long getMaxEndOffset(PipelineOptions options) throws Exception;
-
- /**
- * Returns an {@link OffsetBasedSource} for a subrange of the current source. The
- * subrange {@code [start, end)} must be within the range {@code [startOffset, endOffset)} of
- * the current source, i.e. {@code startOffset <= start < end <= endOffset}.
- */
- public abstract OffsetBasedSource<T> createSourceForSubrange(long start, long end);
-
- /**
- * Whether this source should allow dynamic splitting of the offset ranges.
- *
- * <p>True by default. Override this to return false if the source cannot
- * support dynamic splitting correctly. If this returns false,
- * {@link OffsetBasedSource.OffsetBasedReader#splitAtFraction} will refuse all split requests.
- */
- public boolean allowsDynamicSplitting() {
- return true;
- }
-
- /**
- * A {@link Source.Reader} that implements code common to readers of all
- * {@link OffsetBasedSource}s.
- *
- * <p>Subclasses have to implement:
- * <ul>
- * <li>The methods {@link #startImpl} and {@link #advanceImpl} for reading the
- * first or subsequent records.
- * <li>The methods {@link #getCurrent}, {@link #getCurrentOffset}, and optionally
- * {@link #isAtSplitPoint} and {@link #getCurrentTimestamp} to access properties of
- * the last record successfully read by {@link #startImpl} or {@link #advanceImpl}.
- * </ul>
- */
- public abstract static class OffsetBasedReader<T> extends BoundedReader<T> {
- private static final Logger LOG = LoggerFactory.getLogger(OffsetBasedReader.class);
-
- private OffsetBasedSource<T> source;
-
- /** The {@link OffsetRangeTracker} managing the range and current position of the source. */
- private final OffsetRangeTracker rangeTracker;
-
- /**
- * @param source the {@link OffsetBasedSource} to be read by the current reader.
- */
- public OffsetBasedReader(OffsetBasedSource<T> source) {
- this.source = source;
- this.rangeTracker = new OffsetRangeTracker(source.getStartOffset(), source.getEndOffset());
- }
-
- /**
- * Returns the <i>starting</i> offset of the {@link Source.Reader#getCurrent current record},
- * which has been read by the last successful {@link Source.Reader#start} or
- * {@link Source.Reader#advance} call.
- * <p>If no such call has been made yet, the return value is unspecified.
- * <p>See {@link RangeTracker} for description of offset semantics.
- */
- protected abstract long getCurrentOffset() throws NoSuchElementException;
-
- /**
- * Returns whether the current record is at a split point (i.e., whether the current record
- * would be the first record to be read by a source with a specified start offset of
- * {@link #getCurrentOffset}).
- *
- * <p>See detailed documentation about split points in {@link RangeTracker}.
- */
- protected boolean isAtSplitPoint() throws NoSuchElementException {
- return true;
- }
-
- @Override
- public final boolean start() throws IOException {
- return startImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint(), getCurrentOffset());
- }
-
- @Override
- public final boolean advance() throws IOException {
- return advanceImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint(), getCurrentOffset());
- }
-
- /**
- * Initializes the {@link OffsetBasedSource.OffsetBasedReader} and advances to the first record,
- * returning {@code true} if there is a record available to be read. This method will be
- * invoked exactly once and may perform expensive setup operations that are needed to
- * initialize the reader.
- *
- * <p>This function is the {@code OffsetBasedReader} implementation of
- * {@link BoundedReader#start}. The key difference is that the implementor can ignore the
- * possibility that it should no longer produce the first record, either because it has exceeded
- * the original {@code endOffset} assigned to the reader, or because a concurrent call to
- * {@link #splitAtFraction} has changed the source to shrink the offset range being read.
- *
- * @see BoundedReader#start
- */
- protected abstract boolean startImpl() throws IOException;
-
- /**
- * Advances to the next record and returns {@code true}, or returns false if there is no next
- * record.
- *
- * <p>This function is the {@code OffsetBasedReader} implementation of
- * {@link BoundedReader#advance}. The key difference is that the implementor can ignore the
- * possibility that it should no longer produce the next record, either because it has exceeded
- * the original {@code endOffset} assigned to the reader, or because a concurrent call to
- * {@link #splitAtFraction} has changed the source to shrink the offset range being read.
- *
- * @see BoundedReader#advance
- */
- protected abstract boolean advanceImpl() throws IOException;
-
- @Override
- public synchronized OffsetBasedSource<T> getCurrentSource() {
- return source;
- }
-
- @Override
- public Double getFractionConsumed() {
- return rangeTracker.getFractionConsumed();
- }
-
- @Override
- public final synchronized OffsetBasedSource<T> splitAtFraction(double fraction) {
- if (!getCurrentSource().allowsDynamicSplitting()) {
- return null;
- }
- if (rangeTracker.getStopPosition() == Long.MAX_VALUE) {
- LOG.debug(
- "Refusing to split unbounded OffsetBasedReader {} at fraction {}",
- rangeTracker, fraction);
- return null;
- }
- long splitOffset = rangeTracker.getPositionForFractionConsumed(fraction);
- LOG.debug(
- "Proposing to split OffsetBasedReader {} at fraction {} (offset {})",
- rangeTracker, fraction, splitOffset);
- if (!rangeTracker.trySplitAtPosition(splitOffset)) {
- return null;
- }
- long start = source.getStartOffset();
- long end = source.getEndOffset();
- OffsetBasedSource<T> primary = source.createSourceForSubrange(start, splitOffset);
- OffsetBasedSource<T> residual = source.createSourceForSubrange(splitOffset, end);
- this.source = primary;
- return residual;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java
deleted file mode 100644
index e5b8a39..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java
+++ /dev/null
@@ -1,322 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.api.client.repackaged.com.google.common.base.Preconditions;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-
-/**
- * A helper interface for talking to Pubsub via an underlying transport.
- */
-public interface PubsubClient extends AutoCloseable {
- /**
- * Path representing a cloud project id.
- */
- class ProjectPath implements Serializable {
- private final String path;
-
- public ProjectPath(String path) {
- this.path = path;
- }
-
- public String getPath() {
- return path;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- ProjectPath that = (ProjectPath) o;
-
- return path.equals(that.path);
-
- }
-
- @Override
- public int hashCode() {
- return path.hashCode();
- }
-
- @Override
- public String toString() {
- return path;
- }
-
- public static ProjectPath fromId(String projectId) {
- return new ProjectPath(String.format("projects/%s", projectId));
- }
- }
-
- /**
- * Path representing a Pubsub subscription.
- */
- class SubscriptionPath implements Serializable {
- private final String path;
-
- public SubscriptionPath(String path) {
- this.path = path;
- }
-
- public String getPath() {
- return path;
- }
-
- public String getV1Beta1Path() {
- String[] splits = path.split("/");
- Preconditions.checkState(splits.length == 4);
- return String.format("/subscriptions/%s/%s", splits[1], splits[3]);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- SubscriptionPath that = (SubscriptionPath) o;
- return path.equals(that.path);
- }
-
- @Override
- public int hashCode() {
- return path.hashCode();
- }
-
- @Override
- public String toString() {
- return path;
- }
-
- public static SubscriptionPath fromName(String projectId, String subscriptionName) {
- return new SubscriptionPath(String.format("projects/%s/subscriptions/%s",
- projectId, subscriptionName));
- }
- }
-
- /**
- * Path representing a Pubsub topic.
- */
- class TopicPath implements Serializable {
- private final String path;
-
- public TopicPath(String path) {
- this.path = path;
- }
-
- public String getPath() {
- return path;
- }
-
- public String getV1Beta1Path() {
- String[] splits = path.split("/");
- Preconditions.checkState(splits.length == 4);
- return String.format("/topics/%s/%s", splits[1], splits[3]);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- TopicPath topicPath = (TopicPath) o;
- return path.equals(topicPath.path);
- }
-
- @Override
- public int hashCode() {
- return path.hashCode();
- }
-
- @Override
- public String toString() {
- return path;
- }
-
- public static TopicPath fromName(String projectId, String topicName) {
- return new TopicPath(String.format("projects/%s/topics/%s", projectId, topicName));
- }
- }
-
- /**
- * A message to be sent to Pubsub.
- */
- class OutgoingMessage {
- /**
- * Underlying (encoded) element.
- */
- public final byte[] elementBytes;
-
- /**
- * Timestamp for element (ms since epoch).
- */
- public final long timestampMsSinceEpoch;
-
- public OutgoingMessage(byte[] elementBytes, long timestampMsSinceEpoch) {
- this.elementBytes = elementBytes;
- this.timestampMsSinceEpoch = timestampMsSinceEpoch;
- }
- }
-
- /**
- * A message received from Pubsub.
- */
- class IncomingMessage {
- /**
- * Underlying (encoded) element.
- */
- public final byte[] elementBytes;
-
- /**
- * Timestamp for element (ms since epoch). Either Pubsub's processing time,
- * or the custom timestamp associated with the message.
- */
- public final long timestampMsSinceEpoch;
-
- /**
- * Timestamp (in system time) at which we requested the message (ms since epoch).
- */
- public final long requestTimeMsSinceEpoch;
-
- /**
- * Id to pass back to Pubsub to acknowledge receipt of this message.
- */
- public final String ackId;
-
- /**
- * Id to pass to the runner to distinguish this message from all others.
- */
- public final byte[] recordId;
-
- public IncomingMessage(
- byte[] elementBytes,
- long timestampMsSinceEpoch,
- long requestTimeMsSinceEpoch,
- String ackId,
- byte[] recordId) {
- this.elementBytes = elementBytes;
- this.timestampMsSinceEpoch = timestampMsSinceEpoch;
- this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
- this.ackId = ackId;
- this.recordId = recordId;
- }
- }
-
- /**
- * Gracefully close the underlying transport.
- */
- @Override
- void close();
-
-
- /**
- * Publish {@code outgoingMessages} to Pubsub {@code topic}. Return number of messages
- * published.
- *
- * @throws IOException
- */
- int publish(TopicPath topic, Iterable<OutgoingMessage> outgoingMessages) throws IOException;
-
- /**
- * Request the next batch of up to {@code batchSize} messages from {@code subscription}.
- * Return the received messages, or empty collection if none were available. Does not
- * wait for messages to arrive. Returned messages will record heir request time
- * as {@code requestTimeMsSinceEpoch}.
- *
- * @throws IOException
- */
- Collection<IncomingMessage> pull(
- long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize)
- throws IOException;
-
- /**
- * Acknowldege messages from {@code subscription} with {@code ackIds}.
- *
- * @throws IOException
- */
- void acknowledge(SubscriptionPath subscription, Iterable<String> ackIds) throws IOException;
-
- /**
- * Modify the ack deadline for messages from {@code subscription} with {@code ackIds} to
- * be {@code deadlineSeconds} from now.
- *
- * @throws IOException
- */
- void modifyAckDeadline(
- SubscriptionPath subscription, Iterable<String> ackIds,
- int deadlineSeconds)
- throws IOException;
-
- /**
- * Create {@code topic}.
- *
- * @throws IOException
- */
- void createTopic(TopicPath topic) throws IOException;
-
- /*
- * Delete {@code topic}.
- *
- * @throws IOException
- */
- void deleteTopic(TopicPath topic) throws IOException;
-
- /**
- * Return a list of topics for {@code project}.
- *
- * @throws IOException
- */
- Collection<TopicPath> listTopics(ProjectPath project) throws IOException;
-
- /**
- * Create {@code subscription} to {@code topic}.
- *
- * @throws IOException
- */
- void createSubscription(
- TopicPath topic, SubscriptionPath subscription,
- int ackDeadlineSeconds) throws IOException;
-
- /**
- * Delete {@code subscription}.
- *
- * @throws IOException
- */
- void deleteSubscription(SubscriptionPath subscription) throws IOException;
-
- /**
- * Return a list of subscriptions for {@code topic} in {@code project}.
- *
- * @throws IOException
- */
- Collection<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
- throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java
deleted file mode 100644
index 6e34705..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.api.client.util.DateTime;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.cloud.dataflow.sdk.options.GcpOptions;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.hash.Hashing;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Timestamp;
-import com.google.pubsub.v1.AcknowledgeRequest;
-import com.google.pubsub.v1.DeleteSubscriptionRequest;
-import com.google.pubsub.v1.DeleteTopicRequest;
-import com.google.pubsub.v1.ListSubscriptionsRequest;
-import com.google.pubsub.v1.ListSubscriptionsResponse;
-import com.google.pubsub.v1.ListTopicsRequest;
-import com.google.pubsub.v1.ListTopicsResponse;
-import com.google.pubsub.v1.ModifyAckDeadlineRequest;
-import com.google.pubsub.v1.PublishRequest;
-import com.google.pubsub.v1.PublishResponse;
-import com.google.pubsub.v1.PublisherGrpc;
-import com.google.pubsub.v1.PubsubMessage;
-import com.google.pubsub.v1.PullRequest;
-import com.google.pubsub.v1.PullResponse;
-import com.google.pubsub.v1.ReceivedMessage;
-import com.google.pubsub.v1.SubscriberGrpc;
-import com.google.pubsub.v1.Subscription;
-import com.google.pubsub.v1.Topic;
-import io.grpc.Channel;
-import io.grpc.ClientInterceptors;
-import io.grpc.ManagedChannel;
-import io.grpc.auth.ClientAuthInterceptor;
-import io.grpc.netty.GrpcSslContexts;
-import io.grpc.netty.NegotiationType;
-import io.grpc.netty.NettyChannelBuilder;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-/**
- * A helper class for talking to Pubsub via grpc.
- */
-public class PubsubGrpcClient implements PubsubClient {
- private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com";
- private static final int PUBSUB_PORT = 443;
- private static final List<String> PUBSUB_SCOPES =
- Collections.singletonList("https://www.googleapis.com/auth/pubsub");
- private static final int LIST_BATCH_SIZE = 1000;
-
- /**
- * Timeout for grpc calls (in s).
- */
- private static final int TIMEOUT_S = 15;
-
- /**
- * Underlying netty channel, or {@literal null} if closed.
- */
- @Nullable
- private ManagedChannel publisherChannel;
-
- /**
- * Credentials determined from options and environment.
- */
- private final GoogleCredentials credentials;
-
- /**
- * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
- * instead.
- */
- @Nullable
- private final String timestampLabel;
-
- /**
- * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids.
- */
- @Nullable
- private final String idLabel;
-
- /**
- * Cached stubs, or null if not cached.
- */
- @Nullable
- private PublisherGrpc.PublisherBlockingStub cachedPublisherStub;
- private SubscriberGrpc.SubscriberBlockingStub cachedSubscriberStub;
-
- private PubsubGrpcClient(
- @Nullable String timestampLabel, @Nullable String idLabel,
- ManagedChannel publisherChannel, GoogleCredentials credentials) {
- this.timestampLabel = timestampLabel;
- this.idLabel = idLabel;
- this.publisherChannel = publisherChannel;
- this.credentials = credentials;
- }
-
- /**
- * Construct a new Pubsub grpc client. It should be closed via {@link #close} in order
- * to ensure tidy cleanup of underlying netty resources. (Or use the try-with-resources
- * construct since this class is {@link AutoCloseable}). If non-{@literal null}, use
- * {@code timestampLabel} and {@code idLabel} to store custom timestamps/ids within
- * message metadata.
- */
- public static PubsubGrpcClient newClient(
- @Nullable String timestampLabel, @Nullable String idLabel,
- GcpOptions options) throws IOException {
- ManagedChannel channel = NettyChannelBuilder
- .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT)
- .negotiationType(NegotiationType.TLS)
- .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
- .build();
- // TODO: GcpOptions needs to support building com.google.auth.oauth2.Credentials from the
- // various command line options. It currently only supports the older
- // com.google.api.client.auth.oauth2.Credentials.
- GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
- return new PubsubGrpcClient(timestampLabel, idLabel, channel, credentials);
- }
-
- /**
- * Gracefully close the underlying netty channel.
- */
- @Override
- public void close() {
- Preconditions.checkState(publisherChannel != null, "Client has already been closed");
- publisherChannel.shutdown();
- try {
- publisherChannel.awaitTermination(TIMEOUT_S, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- // Ignore.
- Thread.currentThread().interrupt();
- }
- publisherChannel = null;
- cachedPublisherStub = null;
- cachedSubscriberStub = null;
- }
-
- /**
- * Return channel with interceptor for returning credentials.
- */
- private Channel newChannel() throws IOException {
- Preconditions.checkState(publisherChannel != null, "PubsubGrpcClient has been closed");
- ClientAuthInterceptor interceptor =
- new ClientAuthInterceptor(credentials, Executors.newSingleThreadExecutor());
- return ClientInterceptors.intercept(publisherChannel, interceptor);
- }
-
- /**
- * Return a stub for making a publish request with a timeout.
- */
- private PublisherGrpc.PublisherBlockingStub publisherStub() throws IOException {
- if (cachedPublisherStub == null) {
- cachedPublisherStub = PublisherGrpc.newBlockingStub(newChannel());
- }
- return cachedPublisherStub.withDeadlineAfter(TIMEOUT_S, TimeUnit.SECONDS);
- }
-
- /**
- * Return a stub for making a subscribe request with a timeout.
- */
- private SubscriberGrpc.SubscriberBlockingStub subscriberStub() throws IOException {
- if (cachedSubscriberStub == null) {
- cachedSubscriberStub = SubscriberGrpc.newBlockingStub(newChannel());
- }
- return cachedSubscriberStub.withDeadlineAfter(TIMEOUT_S, TimeUnit.SECONDS);
- }
-
- @Override
- public int publish(TopicPath topic, Iterable<OutgoingMessage> outgoingMessages)
- throws IOException {
- PublishRequest.Builder request = PublishRequest.newBuilder()
- .setTopic(topic.getPath());
- for (OutgoingMessage outgoingMessage : outgoingMessages) {
- PubsubMessage.Builder message =
- PubsubMessage.newBuilder()
- .setData(ByteString.copyFrom(outgoingMessage.elementBytes));
-
- if (timestampLabel != null) {
- message.getMutableAttributes()
- .put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
- }
-
- if (idLabel != null) {
- message.getMutableAttributes()
- .put(idLabel,
- Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString());
- }
-
- request.addMessages(message);
- }
-
- PublishResponse response = publisherStub().publish(request.build());
- return response.getMessageIdsCount();
- }
-
- @Override
- public Collection<IncomingMessage> pull(
- long requestTimeMsSinceEpoch,
- SubscriptionPath subscription,
- int batchSize) throws IOException {
- PullRequest request = PullRequest.newBuilder()
- .setSubscription(subscription.getPath())
- .setReturnImmediately(true)
- .setMaxMessages(batchSize)
- .build();
- PullResponse response = subscriberStub().pull(request);
- if (response.getReceivedMessagesCount() == 0) {
- return ImmutableList.of();
- }
- List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessagesCount());
- for (ReceivedMessage message : response.getReceivedMessagesList()) {
- PubsubMessage pubsubMessage = message.getMessage();
- Map<String, String> attributes = pubsubMessage.getAttributes();
-
- // Payload.
- byte[] elementBytes = pubsubMessage.getData().toByteArray();
-
- // Timestamp.
- // Start with Pubsub processing time.
- Timestamp timestampProto = pubsubMessage.getPublishTime();
- long timestampMsSinceEpoch = timestampProto.getSeconds() + timestampProto.getNanos() / 1000L;
- if (timestampLabel != null && attributes != null) {
- String timestampString = attributes.get(timestampLabel);
- if (timestampString != null && !timestampString.isEmpty()) {
- try {
- // Try parsing as milliseconds since epoch. Note there is no way to parse a
- // string in RFC 3339 format here.
- // Expected IllegalArgumentException if parsing fails; we use that to fall back
- // to RFC 3339.
- timestampMsSinceEpoch = Long.parseLong(timestampString);
- } catch (IllegalArgumentException e1) {
- try {
- // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an
- // IllegalArgumentException if parsing fails, and the caller should handle.
- timestampMsSinceEpoch = DateTime.parseRfc3339(timestampString).getValue();
- } catch (IllegalArgumentException e2) {
- // Fallback to Pubsub processing time.
- }
- }
- }
- // else: fallback to Pubsub processing time.
- }
- // else: fallback to Pubsub processing time.
-
- // Ack id.
- String ackId = message.getAckId();
- Preconditions.checkState(ackId != null && !ackId.isEmpty());
-
- // Record id, if any.
- @Nullable byte[] recordId = null;
- if (idLabel != null && attributes != null) {
- String recordIdString = attributes.get(idLabel);
- if (recordIdString != null && !recordIdString.isEmpty()) {
- recordId = recordIdString.getBytes();
- }
- }
- if (recordId == null) {
- recordId = pubsubMessage.getMessageId().getBytes();
- }
-
- incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch,
- requestTimeMsSinceEpoch, ackId, recordId));
- }
- return incomingMessages;
- }
-
- @Override
- public void acknowledge(SubscriptionPath subscription, Iterable<String> ackIds)
- throws IOException {
- AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
- .setSubscription(subscription.getPath())
- .addAllAckIds(ackIds)
- .build();
- subscriberStub().acknowledge(request); // ignore Empty result.
- }
-
- @Override
- public void modifyAckDeadline(
- SubscriptionPath subscription, Iterable<String> ackIds, int
- deadlineSeconds)
- throws IOException {
- ModifyAckDeadlineRequest request =
- ModifyAckDeadlineRequest.newBuilder()
- .setSubscription(subscription.getPath())
- .addAllAckIds(ackIds)
- .setAckDeadlineSeconds(deadlineSeconds)
- .build();
- subscriberStub().modifyAckDeadline(request); // ignore Empty result.
- }
-
- @Override
- public void createTopic(TopicPath topic) throws IOException {
- Topic request = Topic.newBuilder()
- .setName(topic.getPath())
- .build();
- publisherStub().createTopic(request); // ignore Topic result.
- }
-
- @Override
- public void deleteTopic(TopicPath topic) throws IOException {
- DeleteTopicRequest request = DeleteTopicRequest.newBuilder()
- .setTopic(topic.getPath())
- .build();
- publisherStub().deleteTopic(request); // ignore Empty result.
- }
-
- @Override
- public Collection<TopicPath> listTopics(ProjectPath project) throws IOException {
- ListTopicsRequest.Builder request =
- ListTopicsRequest.newBuilder()
- .setProject(project.getPath())
- .setPageSize(LIST_BATCH_SIZE);
- ListTopicsResponse response = publisherStub().listTopics(request.build());
- if (response.getTopicsCount() == 0) {
- return ImmutableList.of();
- }
- List<TopicPath> topics = new ArrayList<>(response.getTopicsCount());
- while (true) {
- for (Topic topic : response.getTopicsList()) {
- topics.add(new TopicPath(topic.getName()));
- }
- if (response.getNextPageToken().isEmpty()) {
- break;
- }
- request.setPageToken(response.getNextPageToken());
- response = publisherStub().listTopics(request.build());
- }
- return topics;
- }
-
- @Override
- public void createSubscription(
- TopicPath topic, SubscriptionPath subscription,
- int ackDeadlineSeconds) throws IOException {
- Subscription request = Subscription.newBuilder()
- .setTopic(topic.getPath())
- .setName(subscription.getPath())
- .setAckDeadlineSeconds(ackDeadlineSeconds)
- .build();
- subscriberStub().createSubscription(request); // ignore Subscription result.
- }
-
- @Override
- public void deleteSubscription(SubscriptionPath subscription) throws IOException {
- DeleteSubscriptionRequest request =
- DeleteSubscriptionRequest.newBuilder()
- .setSubscription(subscription.getPath())
- .build();
- subscriberStub().deleteSubscription(request); // ignore Empty result.
- }
-
- @Override
- public Collection<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
- throws IOException {
- ListSubscriptionsRequest.Builder request =
- ListSubscriptionsRequest.newBuilder()
- .setProject(project.getPath())
- .setPageSize(LIST_BATCH_SIZE);
- ListSubscriptionsResponse response = subscriberStub().listSubscriptions(request.build());
- if (response.getSubscriptionsCount() == 0) {
- return ImmutableList.of();
- }
- List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptionsCount());
- while (true) {
- for (Subscription subscription : response.getSubscriptionsList()) {
- if (subscription.getTopic().equals(topic.getPath())) {
- subscriptions.add(new SubscriptionPath(subscription.getName()));
- }
- }
- if (response.getNextPageToken().isEmpty()) {
- break;
- }
- request.setPageToken(response.getNextPageToken());
- response = subscriberStub().listSubscriptions(request.build());
- }
- return subscriptions;
- }
-}