You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:48:08 UTC
[44/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedSource.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedSource.java
deleted file mode 100644
index be3a415..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedSource.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * A {@link Source} that reads a finite amount of input and, because of that, supports
- * some additional operations.
- *
- * <p>The operations are:
- * <ul>
- * <li>Splitting into bundles of given size: {@link #splitIntoBundles};
- * <li>Size estimation: {@link #getEstimatedSizeBytes};
- * <li>Telling whether or not this source produces key/value pairs in sorted order:
- * {@link #producesSortedKeys};
- * <li>The reader ({@link BoundedReader}) supports progress estimation
- * ({@link BoundedReader#getFractionConsumed}) and dynamic splitting
- * ({@link BoundedReader#splitAtFraction}).
- * </ul>
- *
- * <p>To use this class for supporting your custom input type, derive your class
- * class from it, and override the abstract methods. For an example, see {@link DatastoreIO}.
- *
- * @param <T> Type of records read by the source.
- */
-public abstract class BoundedSource<T> extends Source<T> {
- /**
- * Splits the source into bundles of approximately {@code desiredBundleSizeBytes}.
- */
- public abstract List<? extends BoundedSource<T>> splitIntoBundles(
- long desiredBundleSizeBytes, PipelineOptions options) throws Exception;
-
- /**
- * An estimate of the total size (in bytes) of the data that would be read from this source.
- * This estimate is in terms of external storage size, before any decompression or other
- * processing done by the reader.
- */
- public abstract long getEstimatedSizeBytes(PipelineOptions options) throws Exception;
-
- /**
- * Whether this source is known to produce key/value pairs sorted by lexicographic order on
- * the bytes of the encoded key.
- */
- public abstract boolean producesSortedKeys(PipelineOptions options) throws Exception;
-
- /**
- * Returns a new {@link BoundedReader} that reads from this source.
- */
- public abstract BoundedReader<T> createReader(PipelineOptions options) throws IOException;
-
- /**
- * A {@code Reader} that reads a bounded amount of input and supports some additional
- * operations, such as progress estimation and dynamic work rebalancing.
- *
- * <h3>Boundedness</h3>
- * <p>Once {@link #start} or {@link #advance} has returned false, neither will be called
- * again on this object.
- *
- * <h3>Thread safety</h3>
- * All methods will be run from the same thread except {@link #splitAtFraction},
- * {@link #getFractionConsumed} and {@link #getCurrentSource}, which can be called concurrently
- * from a different thread. There will not be multiple concurrent calls to
- * {@link #splitAtFraction} but there can be for {@link #getFractionConsumed} if
- * {@link #splitAtFraction} is implemented.
- *
- * <p>If the source does not implement {@link #splitAtFraction}, you do not need to worry about
- * thread safety. If implemented, it must be safe to call {@link #splitAtFraction} and
- * {@link #getFractionConsumed} concurrently with other methods.
- *
- * <p>Additionally, a successful {@link #splitAtFraction} call must, by definition, cause
- * {@link #getCurrentSource} to start returning a different value.
- * Callers of {@link #getCurrentSource} need to be aware of the possibility that the returned
- * value can change at any time, and must only access the properties of the source returned by
- * {@link #getCurrentSource} which do not change between {@link #splitAtFraction} calls.
- *
- * <h3>Implementing {@link #splitAtFraction}</h3>
- * In the course of dynamic work rebalancing, the method {@link #splitAtFraction}
- * may be called concurrently with {@link #advance} or {@link #start}. It is critical that
- * their interaction is implemented in a thread-safe way, otherwise data loss is possible.
- *
- * <p>Sources which support dynamic work rebalancing should use
- * {@link com.google.cloud.dataflow.sdk.io.range.RangeTracker} to manage the (source-specific)
- * range of positions that is being split. If your source supports dynamic work rebalancing,
- * please use that class to implement it if possible; if not possible, please contact the team
- * at <i>dataflow-feedback@google.com</i>.
- */
- @Experimental(Experimental.Kind.SOURCE_SINK)
- public abstract static class BoundedReader<T> extends Source.Reader<T> {
- /**
- * Returns a value in [0, 1] representing approximately what fraction of the
- * {@link #getCurrentSource current source} this reader has read so far, or {@code null} if such
- * an estimate is not available.
- *
- * <p>It is recommended that this method should satisfy the following properties:
- * <ul>
- * <li>Should return 0 before the {@link #start} call.
- * <li>Should return 1 after a {@link #start} or {@link #advance} call that returns false.
- * <li>The returned values should be non-decreasing (though they don't have to be unique).
- * </ul>
- *
- * <p>By default, returns null to indicate that this cannot be estimated.
- *
- * <h5>Thread safety</h5>
- * If {@link #splitAtFraction} is implemented, this method can be called concurrently to other
- * methods (including itself), and it is therefore critical for it to be implemented
- * in a thread-safe way.
- */
- public Double getFractionConsumed() {
- return null;
- }
-
- /**
- * Returns a {@code Source} describing the same input that this {@code Reader} currently reads
- * (including items already read).
- *
- * <h3>Usage</h3>
- * <p>Reader subclasses can use this method for convenience to access unchanging properties of
- * the source being read. Alternatively, they can cache these properties in the constructor.
- * <p>The framework will call this method in the course of dynamic work rebalancing, e.g. after
- * a successful {@link BoundedSource.BoundedReader#splitAtFraction} call.
- *
- * <h3>Mutability and thread safety</h3>
- * Remember that {@link Source} objects must always be immutable. However, the return value of
- * this function may be affected by dynamic work rebalancing, happening asynchronously via
- * {@link BoundedSource.BoundedReader#splitAtFraction}, meaning it can return a different
- * {@link Source} object. However, the returned object itself will still itself be immutable.
- * Callers must take care not to rely on properties of the returned source that may be
- * asynchronously changed as a result of this process (e.g. do not cache an end offset when
- * reading a file).
- *
- * <h3>Implementation</h3>
- * For convenience, subclasses should usually return the most concrete subclass of
- * {@link Source} possible.
- * In practice, the implementation of this method should nearly always be one of the following:
- * <ul>
- * <li>Source that inherits from a base class that already implements
- * {@link #getCurrentSource}: delegate to base class. In this case, it is almost always
- * an error for the subclass to maintain its own copy of the source.
- * <pre>{@code
- * public FooReader(FooSource<T> source) {
- * super(source);
- * }
- *
- * public FooSource<T> getCurrentSource() {
- * return (FooSource<T>)super.getCurrentSource();
- * }
- * }</pre>
- * <li>Source that does not support dynamic work rebalancing: return a private final variable.
- * <pre>{@code
- * private final FooSource<T> source;
- *
- * public FooReader(FooSource<T> source) {
- * this.source = source;
- * }
- *
- * public FooSource<T> getCurrentSource() {
- * return source;
- * }
- * }</pre>
- * <li>{@link BoundedSource.BoundedReader} that explicitly supports dynamic work rebalancing:
- * maintain a variable pointing to an immutable source object, and protect it with
- * synchronization.
- * <pre>{@code
- * private FooSource<T> source;
- *
- * public FooReader(FooSource<T> source) {
- * this.source = source;
- * }
- *
- * public synchronized FooSource<T> getCurrentSource() {
- * return source;
- * }
- *
- * public synchronized FooSource<T> splitAtFraction(double fraction) {
- * ...
- * FooSource<T> primary = ...;
- * FooSource<T> residual = ...;
- * this.source = primary;
- * return residual;
- * }
- * }</pre>
- * </ul>
- */
- @Override
- public abstract BoundedSource<T> getCurrentSource();
-
- /**
- * Tells the reader to narrow the range of the input it's going to read and give up
- * the remainder, so that the new range would contain approximately the given
- * fraction of the amount of data in the current range.
- *
- * <p>Returns a {@code BoundedSource} representing the remainder.
- *
- * <h5>Detailed description</h5>
- * Assuming the following sequence of calls:
- * <pre>{@code
- * BoundedSource<T> initial = reader.getCurrentSource();
- * BoundedSource<T> residual = reader.splitAtFraction(fraction);
- * BoundedSource<T> primary = reader.getCurrentSource();
- * }</pre>
- * <ul>
- * <li> The "primary" and "residual" sources, when read, should together cover the same
- * set of records as "initial".
- * <li> The current reader should continue to be in a valid state, and continuing to read
- * from it should, together with the records it already read, yield the same records
- * as would have been read by "primary".
- * <li> The amount of data read by "primary" should ideally represent approximately
- * the given fraction of the amount of data read by "initial".
- * </ul>
- * For example, a reader that reads a range of offsets <i>[A, B)</i> in a file might implement
- * this method by truncating the current range to <i>[A, A + fraction*(B-A))</i> and returning
- * a Source representing the range <i>[A + fraction*(B-A), B)</i>.
- *
- * <p>This method should return {@code null} if the split cannot be performed for this fraction
- * while satisfying the semantics above. E.g., a reader that reads a range of offsets
- * in a file should return {@code null} if it is already past the position in its range
- * corresponding to the given fraction. In this case, the method MUST have no effect
- * (the reader must behave as if the method hadn't been called at all).
- *
- * <h5>Statefulness</h5>
- * Since this method (if successful) affects the reader's source, in subsequent invocations
- * "fraction" should be interpreted relative to the new current source.
- *
- * <h5>Thread safety and blocking</h5>
- * This method will be called concurrently to other methods (however there will not be multiple
- * concurrent invocations of this method itself), and it is critical for it to be implemented
- * in a thread-safe way (otherwise data loss is possible).
- *
- * <p>It is also very important that this method always completes quickly. In particular,
- * it should not perform or wait on any blocking operations such as I/O, RPCs etc. Violating
- * this requirement may stall completion of the work item or even cause it to fail.
- *
- * <p>It is incorrect to make both this method and {@link #start}/{@link #advance}
- * {@code synchronized}, because those methods can perform blocking operations, and then
- * this method would have to wait for those calls to complete.
- *
- * <p>{@link com.google.cloud.dataflow.sdk.io.range.RangeTracker} makes it easy to implement
- * this method safely and correctly.
- *
- * <p>By default, returns null to indicate that splitting is not possible.
- */
- public BoundedSource<T> splitAtFraction(double fraction) {
- return null;
- }
-
- /**
- * By default, returns the minimum possible timestamp.
- */
- @Override
- public Instant getCurrentTimestamp() throws NoSuchElementException {
- return BoundedWindow.TIMESTAMP_MIN_VALUE;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java
deleted file mode 100644
index e3dca91..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java
+++ /dev/null
@@ -1,413 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.common.base.Preconditions;
-import com.google.common.io.ByteStreams;
-import com.google.common.primitives.Ints;
-
-import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
-import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
-
-import java.io.IOException;
-import java.io.PushbackInputStream;
-import java.io.Serializable;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.util.NoSuchElementException;
-import java.util.zip.GZIPInputStream;
-
-/**
- * A Source that reads from compressed files. A {@code CompressedSources} wraps a delegate
- * {@link FileBasedSource} that is able to read the decompressed file format.
- *
- * <p>For example, use the following to read from a gzip-compressed XML file:
- *
- * <pre> {@code
- * XmlSource mySource = XmlSource.from(...);
- * PCollection<T> collection = p.apply(Read.from(CompressedSource
- * .from(mySource)
- * .withDecompression(CompressedSource.CompressionMode.GZIP)));
- * } </pre>
- *
- * <p>Supported compression algorithms are {@link CompressionMode#GZIP} and
- * {@link CompressionMode#BZIP2}. User-defined compression types are supported by implementing
- * {@link DecompressingChannelFactory}.
- *
- * <p>By default, the compression algorithm is selected from those supported in
- * {@link CompressionMode} based on the file name provided to the source, namely
- * {@code ".bz2"} indicates {@link CompressionMode#BZIP2} and {@code ".gz"} indicates
- * {@link CompressionMode#GZIP}. If the file name does not match any of the supported
- * algorithms, it is assumed to be uncompressed data.
- *
- * @param <T> The type to read from the compressed file.
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public class CompressedSource<T> extends FileBasedSource<T> {
- /**
- * Factory interface for creating channels that decompress the content of an underlying channel.
- */
- public static interface DecompressingChannelFactory extends Serializable {
- /**
- * Given a channel, create a channel that decompresses the content read from the channel.
- * @throws IOException
- */
- public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
- throws IOException;
- }
-
- /**
- * Factory interface for creating channels that decompress the content of an underlying channel,
- * based on both the channel and the file name.
- */
- private static interface FileNameBasedDecompressingChannelFactory
- extends DecompressingChannelFactory {
- /**
- * Given a channel, create a channel that decompresses the content read from the channel.
- * @throws IOException
- */
- ReadableByteChannel createDecompressingChannel(String fileName, ReadableByteChannel channel)
- throws IOException;
-
- /**
- * Given a file name, returns true if the file name matches any supported compression
- * scheme.
- */
- boolean isCompressed(String fileName);
- }
-
- /**
- * Default compression types supported by the {@code CompressedSource}.
- */
- public enum CompressionMode implements DecompressingChannelFactory {
- /**
- * Reads a byte channel assuming it is compressed with gzip.
- */
- GZIP {
- @Override
- public boolean matches(String fileName) {
- return fileName.toLowerCase().endsWith(".gz");
- }
-
- @Override
- public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
- throws IOException {
- // Determine if the input stream is gzipped. The input stream returned from the
- // GCS connector may already be decompressed; GCS does this based on the
- // content-encoding property.
- PushbackInputStream stream = new PushbackInputStream(Channels.newInputStream(channel), 2);
- byte[] headerBytes = new byte[2];
- int bytesRead = ByteStreams.read(
- stream /* source */, headerBytes /* dest */, 0 /* offset */, 2 /* len */);
- stream.unread(headerBytes, 0, bytesRead);
- if (bytesRead >= 2) {
- byte zero = 0x00;
- int header = Ints.fromBytes(zero, zero, headerBytes[1], headerBytes[0]);
- if (header == GZIPInputStream.GZIP_MAGIC) {
- return Channels.newChannel(new GzipCompressorInputStream(stream));
- }
- }
- return Channels.newChannel(stream);
- }
- },
-
- /**
- * Reads a byte channel assuming it is compressed with bzip2.
- */
- BZIP2 {
- @Override
- public boolean matches(String fileName) {
- return fileName.toLowerCase().endsWith(".bz2");
- }
-
- @Override
- public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
- throws IOException {
- return Channels.newChannel(
- new BZip2CompressorInputStream(Channels.newInputStream(channel)));
- }
- };
-
- /**
- * Returns {@code true} if the given file name implies that the contents are compressed
- * according to the compression embodied by this factory.
- */
- public abstract boolean matches(String fileName);
-
- @Override
- public abstract ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
- throws IOException;
- }
-
- /**
- * Reads a byte channel detecting compression according to the file name. If the filename
- * is not any other known {@link CompressionMode}, it is presumed to be uncompressed.
- */
- private static class DecompressAccordingToFilename
- implements FileNameBasedDecompressingChannelFactory {
-
- @Override
- public ReadableByteChannel createDecompressingChannel(
- String fileName, ReadableByteChannel channel) throws IOException {
- for (CompressionMode type : CompressionMode.values()) {
- if (type.matches(fileName)) {
- return type.createDecompressingChannel(channel);
- }
- }
- // Uncompressed
- return channel;
- }
-
- @Override
- public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel) {
- throw new UnsupportedOperationException(
- String.format("%s does not support createDecompressingChannel(%s) but only"
- + " createDecompressingChannel(%s,%s)",
- getClass().getSimpleName(),
- String.class.getSimpleName(),
- ReadableByteChannel.class.getSimpleName(),
- ReadableByteChannel.class.getSimpleName()));
- }
-
- @Override
- public boolean isCompressed(String fileName) {
- for (CompressionMode type : CompressionMode.values()) {
- if (type.matches(fileName)) {
- return true;
- }
- }
- return false;
- }
- }
-
- private final FileBasedSource<T> sourceDelegate;
- private final DecompressingChannelFactory channelFactory;
-
- /**
- * Creates a {@link Read} transform that reads from that reads from the underlying
- * {@link FileBasedSource} {@code sourceDelegate} after decompressing it with a {@link
- * DecompressingChannelFactory}.
- */
- public static <T> Read.Bounded<T> readFromSource(
- FileBasedSource<T> sourceDelegate, DecompressingChannelFactory channelFactory) {
- return Read.from(new CompressedSource<>(sourceDelegate, channelFactory));
- }
-
- /**
- * Creates a {@code CompressedSource} from an underlying {@code FileBasedSource}. The type
- * of compression used will be based on the file name extension unless explicitly
- * configured via {@link CompressedSource#withDecompression}.
- */
- public static <T> CompressedSource<T> from(FileBasedSource<T> sourceDelegate) {
- return new CompressedSource<>(sourceDelegate, new DecompressAccordingToFilename());
- }
-
- /**
- * Return a {@code CompressedSource} that is like this one but will decompress its underlying file
- * with the given {@link DecompressingChannelFactory}.
- */
- public CompressedSource<T> withDecompression(DecompressingChannelFactory channelFactory) {
- return new CompressedSource<>(this.sourceDelegate, channelFactory);
- }
-
- /**
- * Creates a {@code CompressedSource} from a delegate file based source and a decompressing
- * channel factory.
- */
- private CompressedSource(
- FileBasedSource<T> sourceDelegate, DecompressingChannelFactory channelFactory) {
- super(sourceDelegate.getFileOrPatternSpec(), Long.MAX_VALUE);
- this.sourceDelegate = sourceDelegate;
- this.channelFactory = channelFactory;
- }
-
- /**
- * Creates a {@code CompressedSource} for an individual file. Used by {@link
- * CompressedSource#createForSubrangeOfFile}.
- */
- private CompressedSource(FileBasedSource<T> sourceDelegate,
- DecompressingChannelFactory channelFactory, String filePatternOrSpec, long minBundleSize,
- long startOffset, long endOffset) {
- super(filePatternOrSpec, minBundleSize, startOffset, endOffset);
- Preconditions.checkArgument(
- startOffset == 0,
- "CompressedSources must start reading at offset 0. Requested offset: " + startOffset);
- this.sourceDelegate = sourceDelegate;
- this.channelFactory = channelFactory;
- }
-
- /**
- * Validates that the delegate source is a valid source and that the channel factory is not null.
- */
- @Override
- public void validate() {
- super.validate();
- Preconditions.checkNotNull(sourceDelegate);
- sourceDelegate.validate();
- Preconditions.checkNotNull(channelFactory);
- }
-
- /**
- * Creates a {@code CompressedSource} for a subrange of a file. Called by superclass to create a
- * source for a single file.
- */
- @Override
- protected FileBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end) {
- return new CompressedSource<>(sourceDelegate.createForSubrangeOfFile(fileName, start, end),
- channelFactory, fileName, Long.MAX_VALUE, start, end);
- }
-
- /**
- * Determines whether a single file represented by this source is splittable. Returns true
- * if we are using the default decompression factory and and it determines
- * from the requested file name that the file is not compressed.
- */
- @Override
- protected final boolean isSplittable() throws Exception {
- if (channelFactory instanceof FileNameBasedDecompressingChannelFactory) {
- FileNameBasedDecompressingChannelFactory fileNameBasedChannelFactory =
- (FileNameBasedDecompressingChannelFactory) channelFactory;
- return !fileNameBasedChannelFactory.isCompressed(getFileOrPatternSpec());
- }
- return true;
- }
-
- /**
- * Creates a {@code FileBasedReader} to read a single file.
- *
- * <p>Uses the delegate source to create a single file reader for the delegate source.
- * Utilizes the default decompression channel factory to not wrap the source reader
- * if the file name does not represent a compressed file allowing for splitting of
- * the source.
- */
- @Override
- protected final FileBasedReader<T> createSingleFileReader(PipelineOptions options) {
- if (channelFactory instanceof FileNameBasedDecompressingChannelFactory) {
- FileNameBasedDecompressingChannelFactory fileNameBasedChannelFactory =
- (FileNameBasedDecompressingChannelFactory) channelFactory;
- if (!fileNameBasedChannelFactory.isCompressed(getFileOrPatternSpec())) {
- return sourceDelegate.createSingleFileReader(options);
- }
- }
- return new CompressedReader<T>(
- this, sourceDelegate.createSingleFileReader(options));
- }
-
- /**
- * Returns whether the delegate source produces sorted keys.
- */
- @Override
- public final boolean producesSortedKeys(PipelineOptions options) throws Exception {
- return sourceDelegate.producesSortedKeys(options);
- }
-
- /**
- * Returns the delegate source's default output coder.
- */
- @Override
- public final Coder<T> getDefaultOutputCoder() {
- return sourceDelegate.getDefaultOutputCoder();
- }
-
- public final DecompressingChannelFactory getChannelFactory() {
- return channelFactory;
- }
-
- /**
- * Reader for a {@link CompressedSource}. Decompresses its input and uses a delegate
- * reader to read elements from the decompressed input.
- * @param <T> The type of records read from the source.
- */
- public static class CompressedReader<T> extends FileBasedReader<T> {
-
- private final FileBasedReader<T> readerDelegate;
- private final CompressedSource<T> source;
- private int numRecordsRead;
-
- /**
- * Create a {@code CompressedReader} from a {@code CompressedSource} and delegate reader.
- */
- public CompressedReader(CompressedSource<T> source, FileBasedReader<T> readerDelegate) {
- super(source);
- this.source = source;
- this.readerDelegate = readerDelegate;
- }
-
- /**
- * Gets the current record from the delegate reader.
- */
- @Override
- public T getCurrent() throws NoSuchElementException {
- return readerDelegate.getCurrent();
- }
-
- /**
- * Returns true only for the first record; compressed sources cannot be split.
- */
- @Override
- protected final boolean isAtSplitPoint() {
- // We have to return true for the first record, but not for the state before reading it,
- // and not for the state after reading any other record. Hence == rather than >= or <=.
- // This is required because FileBasedReader is intended for readers that can read a range
- // of offsets in a file and where the range can be split in parts. CompressedReader,
- // however, is a degenerate case because it cannot be split, but it has to satisfy the
- // semantics of offsets and split points anyway.
- return numRecordsRead == 1;
- }
-
- /**
- * Creates a decompressing channel from the input channel and passes it to its delegate reader's
- * {@link FileBasedReader#startReading(ReadableByteChannel)}.
- */
- @Override
- protected final void startReading(ReadableByteChannel channel) throws IOException {
- if (source.getChannelFactory() instanceof FileNameBasedDecompressingChannelFactory) {
- FileNameBasedDecompressingChannelFactory channelFactory =
- (FileNameBasedDecompressingChannelFactory) source.getChannelFactory();
- readerDelegate.startReading(channelFactory.createDecompressingChannel(
- getCurrentSource().getFileOrPatternSpec(),
- channel));
- } else {
- readerDelegate.startReading(source.getChannelFactory().createDecompressingChannel(
- channel));
- }
- }
-
- /**
- * Reads the next record via the delegate reader.
- */
- @Override
- protected final boolean readNextRecord() throws IOException {
- if (!readerDelegate.readNextRecord()) {
- return false;
- }
- ++numRecordsRead;
- return true;
- }
-
- /**
- * Returns the delegate reader's current offset in the decompressed input.
- */
- @Override
- protected final long getCurrentOffset() {
- return readerDelegate.getCurrentOffset();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java
deleted file mode 100644
index 07609ba..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.cloud.dataflow.sdk.io.CountingSource.NowTimestampFn;
-import com.google.cloud.dataflow.sdk.io.Read.Unbounded;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.values.PBegin;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
-import com.google.common.base.Optional;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * A {@link PTransform} that produces longs. When used to produce a
- * {@link IsBounded#BOUNDED bounded} {@link PCollection}, {@link CountingInput} starts at {@code 0}
- * and counts up to a specified maximum. When used to produce an
- * {@link IsBounded#UNBOUNDED unbounded} {@link PCollection}, it counts up to {@link Long#MAX_VALUE}
- * and then never produces more output. (In practice, this limit should never be reached.)
- *
- * <p>The bounded {@link CountingInput} is implemented based on {@link OffsetBasedSource} and
- * {@link OffsetBasedSource.OffsetBasedReader}, so it performs efficient initial splitting and it
- * supports dynamic work rebalancing.
- *
- * <p>To produce a bounded {@code PCollection<Long>}, use {@link CountingInput#upTo(long)}:
- *
- * <pre>{@code
- * Pipeline p = ...
- * PTransform<PBegin, PCollection<Long>> producer = CountingInput.upTo(1000);
- * PCollection<Long> bounded = p.apply(producer);
- * }</pre>
- *
- * <p>To produce an unbounded {@code PCollection<Long>}, use {@link CountingInput#unbounded()},
- * calling {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to provide values
- * with timestamps other than {@link Instant#now}.
- *
- * <pre>{@code
- * Pipeline p = ...
- *
- * // To create an unbounded producer that uses processing time as the element timestamp.
- * PCollection<Long> unbounded = p.apply(CountingInput.unbounded());
- * // Or, to create an unbounded source that uses a provided function to set the element timestamp.
- * PCollection<Long> unboundedWithTimestamps =
- * p.apply(CountingInput.unbounded().withTimestampFn(someFn));
- * }</pre>
- */
-public class CountingInput {
- /**
- * Creates a {@link BoundedCountingInput} that will produce the specified number of elements,
- * from {@code 0} to {@code numElements - 1}.
- */
- public static BoundedCountingInput upTo(long numElements) {
- checkArgument(numElements > 0, "numElements (%s) must be greater than 0", numElements);
- return new BoundedCountingInput(numElements);
- }
-
- /**
- * Creates an {@link UnboundedCountingInput} that will produce numbers starting from {@code 0} up
- * to {@link Long#MAX_VALUE}.
- *
- * <p>After {@link Long#MAX_VALUE}, the transform never produces more output. (In practice, this
- * limit should never be reached.)
- *
- * <p>Elements in the resulting {@link PCollection PCollection<Long>} will by default have
- * timestamps corresponding to processing time at element generation, provided by
- * {@link Instant#now}. Use the transform returned by
- * {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to control the output
- * timestamps.
- */
- public static UnboundedCountingInput unbounded() {
- return new UnboundedCountingInput(
- new NowTimestampFn(), Optional.<Long>absent(), Optional.<Duration>absent());
- }
-
- /**
- * A {@link PTransform} that will produce a specified number of {@link Long Longs} starting from
- * 0.
- */
- public static class BoundedCountingInput extends PTransform<PBegin, PCollection<Long>> {
- private final long numElements;
-
- private BoundedCountingInput(long numElements) {
- this.numElements = numElements;
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public PCollection<Long> apply(PBegin begin) {
- return begin.apply(Read.from(CountingSource.upTo(numElements)));
- }
- }
-
- /**
- * A {@link PTransform} that will produce numbers starting from {@code 0} up to
- * {@link Long#MAX_VALUE}.
- *
- * <p>After {@link Long#MAX_VALUE}, the transform never produces more output. (In practice, this
- * limit should never be reached.)
- *
- * <p>Elements in the resulting {@link PCollection PCollection<Long>} will by default have
- * timestamps corresponding to processing time at element generation, provided by
- * {@link Instant#now}. Use the transform returned by
- * {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to control the output
- * timestamps.
- */
- public static class UnboundedCountingInput extends PTransform<PBegin, PCollection<Long>> {
- private final SerializableFunction<Long, Instant> timestampFn;
- private final Optional<Long> maxNumRecords;
- private final Optional<Duration> maxReadTime;
-
- private UnboundedCountingInput(
- SerializableFunction<Long, Instant> timestampFn,
- Optional<Long> maxNumRecords,
- Optional<Duration> maxReadTime) {
- this.timestampFn = timestampFn;
- this.maxNumRecords = maxNumRecords;
- this.maxReadTime = maxReadTime;
- }
-
- /**
- * Returns an {@link UnboundedCountingInput} like this one, but where output elements have the
- * timestamp specified by the timestampFn.
- *
- * <p>Note that the timestamps produced by {@code timestampFn} may not decrease.
- */
- public UnboundedCountingInput withTimestampFn(SerializableFunction<Long, Instant> timestampFn) {
- return new UnboundedCountingInput(timestampFn, maxNumRecords, maxReadTime);
- }
-
- /**
- * Returns an {@link UnboundedCountingInput} like this one, but that will read at most the
- * specified number of elements.
- *
- * <p>A bounded amount of elements will be produced by the result transform, and the result
- * {@link PCollection} will be {@link IsBounded#BOUNDED bounded}.
- */
- public UnboundedCountingInput withMaxNumRecords(long maxRecords) {
- checkArgument(
- maxRecords > 0, "MaxRecords must be a positive (nonzero) value. Got %s", maxRecords);
- return new UnboundedCountingInput(timestampFn, Optional.of(maxRecords), maxReadTime);
- }
-
- /**
- * Returns an {@link UnboundedCountingInput} like this one, but that will read for at most the
- * specified amount of time.
- *
- * <p>A bounded amount of elements will be produced by the result transform, and the result
- * {@link PCollection} will be {@link IsBounded#BOUNDED bounded}.
- */
- public UnboundedCountingInput withMaxReadTime(Duration readTime) {
- checkNotNull(readTime, "ReadTime cannot be null");
- return new UnboundedCountingInput(timestampFn, maxNumRecords, Optional.of(readTime));
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public PCollection<Long> apply(PBegin begin) {
- Unbounded<Long> read = Read.from(CountingSource.unboundedWithTimestampFn(timestampFn));
- if (!maxNumRecords.isPresent() && !maxReadTime.isPresent()) {
- return begin.apply(read);
- } else if (maxNumRecords.isPresent() && !maxReadTime.isPresent()) {
- return begin.apply(read.withMaxNumRecords(maxNumRecords.get()));
- } else if (!maxNumRecords.isPresent() && maxReadTime.isPresent()) {
- return begin.apply(read.withMaxReadTime(maxReadTime.get()));
- } else {
- return begin.apply(
- read.withMaxReadTime(maxReadTime.get()).withMaxNumRecords(maxNumRecords.get()));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java
deleted file mode 100644
index 412f3a7..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java
+++ /dev/null
@@ -1,397 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.cloud.dataflow.sdk.coders.AvroCoder;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
-import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
-import com.google.cloud.dataflow.sdk.io.CountingInput.UnboundedCountingInput;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.collect.ImmutableList;
-
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * A source that produces longs. When used as a {@link BoundedSource}, {@link CountingSource}
- * starts at {@code 0} and counts up to a specified maximum. When used as an
- * {@link UnboundedSource}, it counts up to {@link Long#MAX_VALUE} and then never produces more
- * output. (In practice, this limit should never be reached.)
- *
- * <p>The bounded {@link CountingSource} is implemented based on {@link OffsetBasedSource} and
- * {@link OffsetBasedSource.OffsetBasedReader}, so it performs efficient initial splitting and it
- * supports dynamic work rebalancing.
- *
- * <p>To produce a bounded {@code PCollection<Long>}, use {@link CountingSource#upTo(long)}:
- *
- * <pre>{@code
- * Pipeline p = ...
- * PTransform<PBegin, PCollection<Long>> producer = CountingInput.upTo(1000);
- * PCollection<Long> bounded = p.apply(producer);
- * }</pre>
- *
- * <p>To produce an unbounded {@code PCollection<Long>}, use {@link CountingInput#unbounded()},
- * calling {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to provide values
- * with timestamps other than {@link Instant#now}.
- *
- * <pre>{@code
- * Pipeline p = ...
- *
- * // To create an unbounded PCollection that uses processing time as the element timestamp.
- * PCollection<Long> unbounded = p.apply(CountingInput.unbounded());
- * // Or, to create an unbounded source that uses a provided function to set the element timestamp.
- * PCollection<Long> unboundedWithTimestamps =
- * p.apply(CountingInput.unbounded().withTimestampFn(someFn));
- *
- * }</pre>
- */
-public class CountingSource {
- /**
- * Creates a {@link BoundedSource} that will produce the specified number of elements,
- * from {@code 0} to {@code numElements - 1}.
- *
- * @deprecated use {@link CountingInput#upTo(long)} instead
- */
- @Deprecated
- public static BoundedSource<Long> upTo(long numElements) {
- checkArgument(numElements > 0, "numElements (%s) must be greater than 0", numElements);
- return new BoundedCountingSource(0, numElements);
- }
-
- /**
- * Creates an {@link UnboundedSource} that will produce numbers starting from {@code 0} up to
- * {@link Long#MAX_VALUE}.
- *
- * <p>After {@link Long#MAX_VALUE}, the source never produces more output. (In practice, this
- * limit should never be reached.)
- *
- * <p>Elements in the resulting {@link PCollection PCollection<Long>} will have timestamps
- * corresponding to processing time at element generation, provided by {@link Instant#now}.
- *
- * @deprecated use {@link CountingInput#unbounded()} instead
- */
- @Deprecated
- public static UnboundedSource<Long, CounterMark> unbounded() {
- return unboundedWithTimestampFn(new NowTimestampFn());
- }
-
- /**
- * Creates an {@link UnboundedSource} that will produce numbers starting from {@code 0} up to
- * {@link Long#MAX_VALUE}, with element timestamps supplied by the specified function.
- *
- * <p>After {@link Long#MAX_VALUE}, the source never produces more output. (In practice, this
- * limit should never be reached.)
- *
- * <p>Note that the timestamps produced by {@code timestampFn} may not decrease.
- *
- * @deprecated use {@link CountingInput#unbounded()} and call
- * {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} instead
- */
- @Deprecated
- public static UnboundedSource<Long, CounterMark> unboundedWithTimestampFn(
- SerializableFunction<Long, Instant> timestampFn) {
- return new UnboundedCountingSource(0, 1, timestampFn);
- }
-
- /////////////////////////////////////////////////////////////////////////////////////////////
-
- /** Prevent instantiation. */
- private CountingSource() {}
-
- /**
- * A function that returns {@link Instant#now} as the timestamp for each generated element.
- */
- static class NowTimestampFn implements SerializableFunction<Long, Instant> {
- @Override
- public Instant apply(Long input) {
- return Instant.now();
- }
- }
-
- /**
- * An implementation of {@link CountingSource} that produces a bounded {@link PCollection}.
- * It is implemented on top of {@link OffsetBasedSource} (with associated reader
- * {@link BoundedCountingReader}) and performs efficient initial splitting and supports dynamic
- * work rebalancing.
- */
- private static class BoundedCountingSource extends OffsetBasedSource<Long> {
- /**
- * Creates a {@link BoundedCountingSource} that generates the numbers in the specified
- * {@code [start, end)} range.
- */
- public BoundedCountingSource(long start, long end) {
- super(start, end, 1 /* can be split every 1 offset */);
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////
-
- @Override
- public long getBytesPerOffset() {
- return 8;
- }
-
- @Override
- public long getMaxEndOffset(PipelineOptions options) throws Exception {
- return getEndOffset();
- }
-
- @Override
- public OffsetBasedSource<Long> createSourceForSubrange(long start, long end) {
- return new BoundedCountingSource(start, end);
- }
-
- @Override
- public boolean producesSortedKeys(PipelineOptions options) throws Exception {
- return true;
- }
-
- @Override
- public com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader<Long> createReader(
- PipelineOptions options) throws IOException {
- return new BoundedCountingReader(this);
- }
-
- @Override
- public Coder<Long> getDefaultOutputCoder() {
- return VarLongCoder.of();
- }
- }
-
- /**
- * The reader associated with {@link BoundedCountingSource}.
- *
- * @see BoundedCountingSource
- */
- private static class BoundedCountingReader extends OffsetBasedSource.OffsetBasedReader<Long> {
- private long current;
-
- public BoundedCountingReader(OffsetBasedSource<Long> source) {
- super(source);
- }
-
- @Override
- protected long getCurrentOffset() throws NoSuchElementException {
- return current;
- }
-
- @Override
- public synchronized BoundedCountingSource getCurrentSource() {
- return (BoundedCountingSource) super.getCurrentSource();
- }
-
- @Override
- public Long getCurrent() throws NoSuchElementException {
- return current;
- }
-
- @Override
- protected boolean startImpl() throws IOException {
- current = getCurrentSource().getStartOffset();
- return true;
- }
-
- @Override
- protected boolean advanceImpl() throws IOException {
- current++;
- return true;
- }
-
- @Override
- public void close() throws IOException {}
- }
-
- /**
- * An implementation of {@link CountingSource} that produces an unbounded {@link PCollection}.
- */
- private static class UnboundedCountingSource extends UnboundedSource<Long, CounterMark> {
- /** The first number (>= 0) generated by this {@link UnboundedCountingSource}. */
- private final long start;
- /** The interval between numbers generated by this {@link UnboundedCountingSource}. */
- private final long stride;
- /** The function used to produce timestamps for the generated elements. */
- private final SerializableFunction<Long, Instant> timestampFn;
-
- /**
- * Creates an {@link UnboundedSource} that will produce numbers starting from {@code 0} up to
- * {@link Long#MAX_VALUE}, with element timestamps supplied by the specified function.
- *
- * <p>After {@link Long#MAX_VALUE}, the source never produces more output. (In practice, this
- * limit should never be reached.)
- *
- * <p>Note that the timestamps produced by {@code timestampFn} may not decrease.
- */
- public UnboundedCountingSource(
- long start, long stride, SerializableFunction<Long, Instant> timestampFn) {
- this.start = start;
- this.stride = stride;
- this.timestampFn = timestampFn;
- }
-
- /**
- * Splits an unbounded source {@code desiredNumSplits} ways by giving each split every
- * {@code desiredNumSplits}th element that this {@link UnboundedCountingSource}
- * produces.
- *
- * <p>E.g., if a source produces all even numbers {@code [0, 2, 4, 6, 8, ...)} and we want to
- * split into 3 new sources, then the new sources will produce numbers that are 6 apart and
- * are offset at the start by the original stride: {@code [0, 6, 12, ...)},
- * {@code [2, 8, 14, ...)}, and {@code [4, 10, 16, ...)}.
- */
- @Override
- public List<? extends UnboundedSource<Long, CountingSource.CounterMark>> generateInitialSplits(
- int desiredNumSplits, PipelineOptions options) throws Exception {
- // Using Javadoc example, stride 2 with 3 splits becomes stride 6.
- long newStride = stride * desiredNumSplits;
-
- ImmutableList.Builder<UnboundedCountingSource> splits = ImmutableList.builder();
- for (int i = 0; i < desiredNumSplits; ++i) {
- // Starts offset by the original stride. Using Javadoc example, this generates starts of
- // 0, 2, and 4.
- splits.add(new UnboundedCountingSource(start + i * stride, newStride, timestampFn));
- }
- return splits.build();
- }
-
- @Override
- public UnboundedReader<Long> createReader(
- PipelineOptions options, CounterMark checkpointMark) {
- return new UnboundedCountingReader(this, checkpointMark);
- }
-
- @Override
- public Coder<CountingSource.CounterMark> getCheckpointMarkCoder() {
- return AvroCoder.of(CountingSource.CounterMark.class);
- }
-
- @Override
- public void validate() {}
-
- @Override
- public Coder<Long> getDefaultOutputCoder() {
- return VarLongCoder.of();
- }
- }
-
- /**
- * The reader associated with {@link UnboundedCountingSource}.
- *
- * @see UnboundedCountingSource
- */
- private static class UnboundedCountingReader extends UnboundedReader<Long> {
- private UnboundedCountingSource source;
- private long current;
- private Instant currentTimestamp;
-
- public UnboundedCountingReader(UnboundedCountingSource source, CounterMark mark) {
- this.source = source;
- if (mark == null) {
- // Because we have not emitted an element yet, and start() calls advance, we need to
- // "un-advance" so that start() produces the correct output.
- this.current = source.start - source.stride;
- } else {
- this.current = mark.getLastEmitted();
- }
- }
-
- @Override
- public boolean start() throws IOException {
- return advance();
- }
-
- @Override
- public boolean advance() throws IOException {
- // Overflow-safe check that (current + source.stride) <= LONG.MAX_VALUE. Else, stop producing.
- if (Long.MAX_VALUE - source.stride < current) {
- return false;
- }
- current += source.stride;
- currentTimestamp = source.timestampFn.apply(current);
- return true;
- }
-
- @Override
- public Instant getWatermark() {
- return source.timestampFn.apply(current);
- }
-
- @Override
- public CounterMark getCheckpointMark() {
- return new CounterMark(current);
- }
-
- @Override
- public UnboundedSource<Long, CounterMark> getCurrentSource() {
- return source;
- }
-
- @Override
- public Long getCurrent() throws NoSuchElementException {
- return current;
- }
-
- @Override
- public Instant getCurrentTimestamp() throws NoSuchElementException {
- return currentTimestamp;
- }
-
- @Override
- public void close() throws IOException {}
- }
-
- /**
- * The checkpoint for an unbounded {@link CountingSource} is simply the last value produced. The
- * associated source object encapsulates the information needed to produce the next value.
- */
- @DefaultCoder(AvroCoder.class)
- public static class CounterMark implements UnboundedSource.CheckpointMark {
- /** The last value emitted. */
- private final long lastEmitted;
-
- /**
- * Creates a checkpoint mark reflecting the last emitted value.
- */
- public CounterMark(long lastEmitted) {
- this.lastEmitted = lastEmitted;
- }
-
- /**
- * Returns the last value emitted by the reader.
- */
- public long getLastEmitted() {
- return lastEmitted;
- }
-
- /////////////////////////////////////////////////////////////////////////////////////
-
- @SuppressWarnings("unused") // For AvroCoder
- private CounterMark() {
- this.lastEmitted = 0L;
- }
-
- @Override
- public void finalizeCheckpoint() throws IOException {}
- }
-}