You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 17:09:45 UTC
[18/28] beam git commit: Revert "[BEAM-2610] This closes #3553"
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 583af60..8102316 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -33,7 +33,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
-import java.lang.reflect.TypeVariable;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
@@ -50,10 +49,8 @@ import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.StructuredCoder;
@@ -76,7 +73,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
import org.apache.beam.sdk.util.MimeTypes;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
import org.joda.time.Instant;
@@ -86,43 +82,43 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Abstract class for file-based output. An implementation of FileBasedSink writes file-based output
- * and defines the format of output files (how values are written, headers/footers, MIME type,
- * etc.).
+ * Abstract class for file-based output. An implementation of FileBasedSink writes file-based
+ * output and defines the format of output files (how values are written, headers/footers, MIME
+ * type, etc.).
*
* <p>At pipeline construction time, the methods of FileBasedSink are called to validate the sink
* and to create a {@link WriteOperation} that manages the process of writing to the sink.
*
* <p>The process of writing to file-based sink is as follows:
- *
* <ol>
- * <li>An optional subclass-defined initialization,
- * <li>a parallel write of bundles to temporary files, and finally,
- * <li>these temporary files are renamed with final output filenames.
+ * <li>An optional subclass-defined initialization,
+ * <li>a parallel write of bundles to temporary files, and finally,
+ * <li>these temporary files are renamed with final output filenames.
* </ol>
*
* <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the
* event of failure/retry or for redundancy). However, exactly one of these executions will have its
- * result passed to the finalize method. Each call to {@link Writer#openWindowed} or {@link
- * Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called by the WriteFiles
- * transform, so even redundant or retried bundles will have a unique way of identifying their
- * output.
+ * result passed to the finalize method. Each call to {@link Writer#openWindowed}
+ * or {@link Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called
+ * by the WriteFiles transform, so even redundant or retried bundles will have a unique way of
+ * identifying
+ * their output.
*
* <p>The bundle id should be used to guarantee that a bundle's output is unique. This uniqueness
* guarantee is important; if a bundle is to be output to a file, for example, the name of the file
* will encode the unique bundle id to avoid conflicts with other writers.
*
- * <p>{@link FileBasedSink} can take a custom {@link FilenamePolicy} object to determine output
- * filenames, and this policy object can be used to write windowed or triggered PCollections into
- * separate files per window pane. This allows file output from unbounded PCollections, and also
- * works for bounded PCollecctions.
+ * {@link FileBasedSink} can take a custom {@link FilenamePolicy} object to determine output
+ * filenames, and this policy object can be used to write windowed or triggered
+ * PCollections into separate files per window pane. This allows file output from unbounded
+ * PCollections, and also works for bounded PCollecctions.
*
* <p>Supported file systems are those registered with {@link FileSystems}.
*
- * @param <OutputT> the type of values written to the sink.
+ * @param <T> the type of values written to the sink.
*/
@Experimental(Kind.FILESYSTEM)
-public abstract class FileBasedSink<OutputT, DestinationT> implements Serializable, HasDisplayData {
+public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
/**
@@ -177,7 +173,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
}
@Override
- public String getSuggestedFilenameSuffix() {
+ public String getFilenameSuffix() {
return filenameSuffix;
}
@@ -209,8 +205,6 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
}
}
- private final DynamicDestinations<?, DestinationT> dynamicDestinations;
-
/**
* The {@link WritableByteChannelFactory} that is used to wrap the raw data output to the
* underlying channel. The default is to not compress the output using
@@ -219,70 +213,8 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
private final WritableByteChannelFactory writableByteChannelFactory;
/**
- * A class that allows value-dependent writes in {@link FileBasedSink}.
- *
- * <p>Users can define a custom type to represent destinations, and provide a mapping to turn this
- * destination type into an instance of {@link FilenamePolicy}.
+ * A naming policy for output files.
*/
- @Experimental(Kind.FILESYSTEM)
- public abstract static class DynamicDestinations<UserT, DestinationT>
- implements HasDisplayData, Serializable {
- /**
- * Returns an object that represents at a high level the destination being written to. May not
- * return null.
- */
- public abstract DestinationT getDestination(UserT element);
-
- /**
- * Returns the default destination. This is used for collections that have no elements as the
- * destination to write empty files to.
- */
- public abstract DestinationT getDefaultDestination();
-
- /**
- * Returns the coder for {@link DestinationT}. If this is not overridden, then the coder
- * registry will be use to find a suitable coder. This must be a deterministic coder, as {@link
- * DestinationT} will be used as a key type in a {@link
- * org.apache.beam.sdk.transforms.GroupByKey}.
- */
- @Nullable
- public Coder<DestinationT> getDestinationCoder() {
- return null;
- }
-
- /** Converts a destination into a {@link FilenamePolicy}. May not return null. */
- public abstract FilenamePolicy getFilenamePolicy(DestinationT destination);
-
- /** Populates the display data. */
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {}
-
- // Gets the destination coder. If the user does not provide one, try to find one in the coder
- // registry. If no coder can be found, throws CannotProvideCoderException.
- final Coder<DestinationT> getDestinationCoderWithDefault(CoderRegistry registry)
- throws CannotProvideCoderException {
- Coder<DestinationT> destinationCoder = getDestinationCoder();
- if (destinationCoder != null) {
- return destinationCoder;
- }
- // If dynamicDestinations doesn't provide a coder, try to find it in the coder registry.
- // We must first use reflection to figure out what the type parameter is.
- TypeDescriptor<?> superDescriptor =
- TypeDescriptor.of(getClass()).getSupertype(DynamicDestinations.class);
- if (!superDescriptor.getRawType().equals(DynamicDestinations.class)) {
- throw new AssertionError(
- "Couldn't find the DynamicDestinations superclass of " + this.getClass());
- }
- TypeVariable typeVariable = superDescriptor.getTypeParameter("DestinationT");
- @SuppressWarnings("unchecked")
- TypeDescriptor<DestinationT> descriptor =
- (TypeDescriptor<DestinationT>) superDescriptor.resolveType(typeVariable);
- return registry.getCoder(descriptor);
- }
- }
-
- /** A naming policy for output files. */
- @Experimental(Kind.FILESYSTEM)
public abstract static class FilenamePolicy implements Serializable {
/**
* Context used for generating a name based on shard number, and num shards.
@@ -355,28 +287,29 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
/**
* When a sink has requested windowed or triggered output, this method will be invoked to return
* the file {@link ResourceId resource} to be created given the base output directory and a
- * {@link OutputFileHints} containing information about the file, including a suggested
- * extension (e.g. coming from {@link CompressionType}).
+ * (possibly empty) extension from {@link FileBasedSink} configuration
+ * (e.g., {@link CompressionType}).
*
- * <p>The {@link WindowedContext} object gives access to the window and pane, as well as
- * sharding information. The policy must return unique and consistent filenames for different
- * windows and panes.
+ * <p>The {@link WindowedContext} object gives access to the window and pane,
+ * as well as sharding information. The policy must return unique and consistent filenames
+ * for different windows and panes.
*/
@Experimental(Kind.FILESYSTEM)
- public abstract ResourceId windowedFilename(WindowedContext c, OutputFileHints outputFileHints);
+ public abstract ResourceId windowedFilename(
+ ResourceId outputDirectory, WindowedContext c, String extension);
/**
* When a sink has not requested windowed or triggered output, this method will be invoked to
* return the file {@link ResourceId resource} to be created given the base output directory and
- * a {@link OutputFileHints} containing information about the file, including a suggested (e.g.
- * coming from {@link CompressionType}).
+ * a (possibly empty) extension applied by additional {@link FileBasedSink} configuration
+ * (e.g., {@link CompressionType}).
*
* <p>The {@link Context} object only provides sharding information, which is used by the policy
* to generate unique and consistent filenames.
*/
@Experimental(Kind.FILESYSTEM)
- @Nullable
- public abstract ResourceId unwindowedFilename(Context c, OutputFileHints outputFileHints);
+ @Nullable public abstract ResourceId unwindowedFilename(
+ ResourceId outputDirectory, Context c, String extension);
/**
* Populates the display data.
@@ -385,8 +318,19 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
}
}
+ /** The policy used to generate names of files to be produced. */
+ private final FilenamePolicy filenamePolicy;
/** The directory to which files will be written. */
- private final ValueProvider<ResourceId> tempDirectoryProvider;
+ private final ValueProvider<ResourceId> baseOutputDirectoryProvider;
+
+ /**
+ * Construct a {@link FileBasedSink} with the given filename policy, producing uncompressed files.
+ */
+ @Experimental(Kind.FILESYSTEM)
+ public FileBasedSink(
+ ValueProvider<ResourceId> baseOutputDirectoryProvider, FilenamePolicy filenamePolicy) {
+ this(baseOutputDirectoryProvider, filenamePolicy, CompressionType.UNCOMPRESSED);
+ }
private static class ExtractDirectory implements SerializableFunction<ResourceId, ResourceId> {
@Override
@@ -396,91 +340,95 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
}
/**
- * Construct a {@link FileBasedSink} with the given temp directory, producing uncompressed files.
+ * Construct a {@link FileBasedSink} with the given filename policy and output channel type.
*/
@Experimental(Kind.FILESYSTEM)
public FileBasedSink(
- ValueProvider<ResourceId> tempDirectoryProvider,
- DynamicDestinations<?, DestinationT> dynamicDestinations) {
- this(tempDirectoryProvider, dynamicDestinations, CompressionType.UNCOMPRESSED);
- }
-
- /** Construct a {@link FileBasedSink} with the given temp directory and output channel type. */
- @Experimental(Kind.FILESYSTEM)
- public FileBasedSink(
- ValueProvider<ResourceId> tempDirectoryProvider,
- DynamicDestinations<?, DestinationT> dynamicDestinations,
+ ValueProvider<ResourceId> baseOutputDirectoryProvider,
+ FilenamePolicy filenamePolicy,
WritableByteChannelFactory writableByteChannelFactory) {
- this.tempDirectoryProvider =
- NestedValueProvider.of(tempDirectoryProvider, new ExtractDirectory());
- this.dynamicDestinations = checkNotNull(dynamicDestinations);
+ this.baseOutputDirectoryProvider =
+ NestedValueProvider.of(baseOutputDirectoryProvider, new ExtractDirectory());
+ this.filenamePolicy = filenamePolicy;
this.writableByteChannelFactory = writableByteChannelFactory;
}
- /** Return the {@link DynamicDestinations} used. */
- @SuppressWarnings("unchecked")
- public <UserT> DynamicDestinations<UserT, DestinationT> getDynamicDestinations() {
- return (DynamicDestinations<UserT, DestinationT>) dynamicDestinations;
+ /**
+ * Returns the base directory inside which files will be written according to the configured
+ * {@link FilenamePolicy}.
+ */
+ @Experimental(Kind.FILESYSTEM)
+ public ValueProvider<ResourceId> getBaseOutputDirectoryProvider() {
+ return baseOutputDirectoryProvider;
}
/**
- * Returns the directory inside which temprary files will be written according to the configured
- * {@link FilenamePolicy}.
+ * Returns the policy by which files will be named inside of the base output directory. Note that
+ * the {@link FilenamePolicy} may itself specify one or more inner directories before each output
+ * file, say when writing windowed outputs in a {@code output/YYYY/MM/DD/file.txt} format.
*/
@Experimental(Kind.FILESYSTEM)
- public ValueProvider<ResourceId> getTempDirectoryProvider() {
- return tempDirectoryProvider;
+ public final FilenamePolicy getFilenamePolicy() {
+ return filenamePolicy;
}
public void validate(PipelineOptions options) {}
- /** Return a subclass of {@link WriteOperation} that will manage the write to the sink. */
- public abstract WriteOperation<OutputT, DestinationT> createWriteOperation();
+ /**
+ * Return a subclass of {@link WriteOperation} that will manage the write
+ * to the sink.
+ */
+ public abstract WriteOperation<T> createWriteOperation();
public void populateDisplayData(DisplayData.Builder builder) {
- getDynamicDestinations().populateDisplayData(builder);
+ getFilenamePolicy().populateDisplayData(builder);
}
/**
* Abstract operation that manages the process of writing to {@link FileBasedSink}.
*
- * <p>The primary responsibilities of the WriteOperation is the management of output files. During
- * a write, {@link Writer}s write bundles to temporary file locations. After the bundles have been
- * written,
- *
+ * <p>The primary responsibilities of the WriteOperation is the management of output
+ * files. During a write, {@link Writer}s write bundles to temporary file
+ * locations. After the bundles have been written,
* <ol>
- * <li>{@link WriteOperation#finalize} is given a list of the temporary files containing the
- * output bundles.
- * <li>During finalize, these temporary files are copied to final output locations and named
- * according to a file naming template.
- * <li>Finally, any temporary files that were created during the write are removed.
+ * <li>{@link WriteOperation#finalize} is given a list of the temporary
+ * files containing the output bundles.
+ * <li>During finalize, these temporary files are copied to final output locations and named
+ * according to a file naming template.
+ * <li>Finally, any temporary files that were created during the write are removed.
* </ol>
*
- * <p>Subclass implementations of WriteOperation must implement {@link
- * WriteOperation#createWriter} to return a concrete FileBasedSinkWriter.
+ * <p>Subclass implementations of WriteOperation must implement
+ * {@link WriteOperation#createWriter} to return a concrete
+ * FileBasedSinkWriter.
*
- * <h2>Temporary and Output File Naming:</h2>
+ * <h2>Temporary and Output File Naming:</h2> During the write, bundles are written to temporary
+ * files using the tempDirectory that can be provided via the constructor of
+ * WriteOperation. These temporary files will be named
+ * {@code {tempDirectory}/{bundleId}}, where bundleId is the unique id of the bundle.
+ * For example, if tempDirectory is "gs://my-bucket/my_temp_output", the output for a
+ * bundle with bundle id 15723 will be "gs://my-bucket/my_temp_output/15723".
*
- * <p>During the write, bundles are written to temporary files using the tempDirectory that can be
- * provided via the constructor of WriteOperation. These temporary files will be named {@code
- * {tempDirectory}/{bundleId}}, where bundleId is the unique id of the bundle. For example, if
- * tempDirectory is "gs://my-bucket/my_temp_output", the output for a bundle with bundle id 15723
- * will be "gs://my-bucket/my_temp_output/15723".
+ * <p>Final output files are written to baseOutputFilename with the format
+ * {@code {baseOutputFilename}-0000i-of-0000n.{extension}} where n is the total number of bundles
+ * written and extension is the file extension. Both baseOutputFilename and extension are required
+ * constructor arguments.
*
- * <p>Final output files are written to the location specified by the {@link FilenamePolicy}. If
- * no filename policy is specified, then the {@link DefaultFilenamePolicy} will be used. The
- * directory that the files are written to is determined by the {@link FilenamePolicy} instance.
+ * <p>Subclass implementations can change the file naming template by supplying a value for
+ * fileNamingTemplate.
*
* <p>Note that in the case of permanent failure of a bundle's write, no clean up of temporary
* files will occur.
*
* <p>If there are no elements in the PCollection being written, no output will be generated.
*
- * @param <OutputT> the type of values written to the sink.
+ * @param <T> the type of values written to the sink.
*/
- public abstract static class WriteOperation<OutputT, DestinationT> implements Serializable {
- /** The Sink that this WriteOperation will write to. */
- protected final FileBasedSink<OutputT, DestinationT> sink;
+ public abstract static class WriteOperation<T> implements Serializable {
+ /**
+ * The Sink that this WriteOperation will write to.
+ */
+ protected final FileBasedSink<T> sink;
/** Directory for temporary output files. */
protected final ValueProvider<ResourceId> tempDirectory;
@@ -497,19 +445,17 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
}
/**
- * Constructs a WriteOperation using the default strategy for generating a temporary directory
- * from the base output filename.
+ * Constructs a WriteOperation using the default strategy for generating a temporary
+ * directory from the base output filename.
*
- * <p>Default is a uniquely named subdirectory of the provided tempDirectory, e.g. if
- * tempDirectory is /path/to/foo/, the temporary directory will be
- * /path/to/foo/temp-beam-foo-$date.
+ * <p>Default is a uniquely named sibling of baseOutputFilename, e.g. if baseOutputFilename is
+ * /path/to/foo, the temporary directory will be /path/to/temp-beam-foo-$date.
*
* @param sink the FileBasedSink that will be used to configure this write operation.
*/
- public WriteOperation(FileBasedSink<OutputT, DestinationT> sink) {
- this(
- sink,
- NestedValueProvider.of(sink.getTempDirectoryProvider(), new TemporaryDirectoryBuilder()));
+ public WriteOperation(FileBasedSink<T> sink) {
+ this(sink, NestedValueProvider.of(
+ sink.getBaseOutputDirectoryProvider(), new TemporaryDirectoryBuilder()));
}
private static class TemporaryDirectoryBuilder
@@ -525,12 +471,10 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
private final Long tempId = TEMP_COUNT.getAndIncrement();
@Override
- public ResourceId apply(ResourceId tempDirectory) {
+ public ResourceId apply(ResourceId baseOutputDirectory) {
// Temp directory has a timestamp and a unique ID
String tempDirName = String.format(".temp-beam-%s-%s", timestamp, tempId);
- return tempDirectory
- .getCurrentDirectory()
- .resolve(tempDirName, StandardResolveOptions.RESOLVE_DIRECTORY);
+ return baseOutputDirectory.resolve(tempDirName, StandardResolveOptions.RESOLVE_DIRECTORY);
}
}
@@ -541,22 +485,22 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
* @param tempDirectory the base directory to be used for temporary output files.
*/
@Experimental(Kind.FILESYSTEM)
- public WriteOperation(FileBasedSink<OutputT, DestinationT> sink, ResourceId tempDirectory) {
+ public WriteOperation(FileBasedSink<T> sink, ResourceId tempDirectory) {
this(sink, StaticValueProvider.of(tempDirectory));
}
private WriteOperation(
- FileBasedSink<OutputT, DestinationT> sink, ValueProvider<ResourceId> tempDirectory) {
+ FileBasedSink<T> sink, ValueProvider<ResourceId> tempDirectory) {
this.sink = sink;
this.tempDirectory = tempDirectory;
this.windowedWrites = false;
}
/**
- * Clients must implement to return a subclass of {@link Writer}. This method must not mutate
- * the state of the object.
+ * Clients must implement to return a subclass of {@link Writer}. This
+ * method must not mutate the state of the object.
*/
- public abstract Writer<OutputT, DestinationT> createWriter() throws Exception;
+ public abstract Writer<T> createWriter() throws Exception;
/**
* Indicates that the operation will be performing windowed writes.
@@ -570,8 +514,8 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
* removing temporary files.
*
* <p>Finalization may be overridden by subclass implementations to perform customized
- * finalization (e.g., initiating some operation on output bundles, merging them, etc.). {@code
- * writerResults} contains the filenames of written bundles.
+ * finalization (e.g., initiating some operation on output bundles, merging them, etc.).
+ * {@code writerResults} contains the filenames of written bundles.
*
* <p>If subclasses override this method, they must guarantee that its implementation is
* idempotent, as it may be executed multiple times in the case of failure or for redundancy. It
@@ -579,7 +523,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
*
* @param writerResults the results of writes (FileResult).
*/
- public void finalize(Iterable<FileResult<DestinationT>> writerResults) throws Exception {
+ public void finalize(Iterable<FileResult> writerResults) throws Exception {
// Collect names of temporary files and rename them.
Map<ResourceId, ResourceId> outputFilenames = buildOutputFilenames(writerResults);
copyToOutputFiles(outputFilenames);
@@ -598,14 +542,17 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
@Experimental(Kind.FILESYSTEM)
protected final Map<ResourceId, ResourceId> buildOutputFilenames(
- Iterable<FileResult<DestinationT>> writerResults) {
+ Iterable<FileResult> writerResults) {
int numShards = Iterables.size(writerResults);
Map<ResourceId, ResourceId> outputFilenames = new HashMap<>();
+ FilenamePolicy policy = getSink().getFilenamePolicy();
+ ResourceId baseOutputDir = getSink().getBaseOutputDirectoryProvider().get();
+
// Either all results have a shard number set (if the sink is configured with a fixed
// number of shards), or they all don't (otherwise).
Boolean isShardNumberSetEverywhere = null;
- for (FileResult<DestinationT> result : writerResults) {
+ for (FileResult result : writerResults) {
boolean isShardNumberSetHere = (result.getShard() != UNKNOWN_SHARDNUM);
if (isShardNumberSetEverywhere == null) {
isShardNumberSetEverywhere = isShardNumberSetHere;
@@ -621,7 +568,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
isShardNumberSetEverywhere = true;
}
- List<FileResult<DestinationT>> resultsWithShardNumbers = Lists.newArrayList();
+ List<FileResult> resultsWithShardNumbers = Lists.newArrayList();
if (isShardNumberSetEverywhere) {
resultsWithShardNumbers = Lists.newArrayList(writerResults);
} else {
@@ -630,32 +577,29 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
// case of triggers, the list of FileResult objects in the Finalize iterable is not
// deterministic, and might change over retries. This breaks the assumption below that
// sorting the FileResult objects provides idempotency.
- List<FileResult<DestinationT>> sortedByTempFilename =
+ List<FileResult> sortedByTempFilename =
Ordering.from(
- new Comparator<FileResult<DestinationT>>() {
- @Override
- public int compare(
- FileResult<DestinationT> first, FileResult<DestinationT> second) {
- String firstFilename = first.getTempFilename().toString();
- String secondFilename = second.getTempFilename().toString();
- return firstFilename.compareTo(secondFilename);
- }
- })
+ new Comparator<FileResult>() {
+ @Override
+ public int compare(FileResult first, FileResult second) {
+ String firstFilename = first.getTempFilename().toString();
+ String secondFilename = second.getTempFilename().toString();
+ return firstFilename.compareTo(secondFilename);
+ }
+ })
.sortedCopy(writerResults);
for (int i = 0; i < sortedByTempFilename.size(); i++) {
resultsWithShardNumbers.add(sortedByTempFilename.get(i).withShard(i));
}
}
- for (FileResult<DestinationT> result : resultsWithShardNumbers) {
+ for (FileResult result : resultsWithShardNumbers) {
checkArgument(
result.getShard() != UNKNOWN_SHARDNUM, "Should have set shard number on %s", result);
outputFilenames.put(
result.getTempFilename(),
result.getDestinationFile(
- getSink().getDynamicDestinations(),
- numShards,
- getSink().getWritableByteChannelFactory()));
+ policy, baseOutputDir, numShards, getSink().getExtension()));
}
int numDistinctShards = new HashSet<>(outputFilenames.values()).size();
@@ -671,18 +615,18 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
*
* <p>Can be called from subclasses that override {@link WriteOperation#finalize}.
*
- * <p>Files will be named according to the {@link FilenamePolicy}. The order of the output files
- * will be the same as the sorted order of the input filenames. In other words (when using
- * {@link DefaultFilenamePolicy}), if the input filenames are ["C", "A", "B"], baseFilename (int
- * the policy) is "dir/file", the extension is ".txt", and the fileNamingTemplate is
- * "-SSS-of-NNN", the contents of A will be copied to dir/file-000-of-003.txt, the contents of B
- * will be copied to dir/file-001-of-003.txt, etc.
+ * <p>Files will be named according to the file naming template. The order of the output files
+ * will be the same as the sorted order of the input filenames. In other words, if the input
+ * filenames are ["C", "A", "B"], baseOutputFilename is "file", the extension is ".txt", and
+ * the fileNamingTemplate is "-SSS-of-NNN", the contents of A will be copied to
+ * file-000-of-003.txt, the contents of B will be copied to file-001-of-003.txt, etc.
*
* @param filenames the filenames of temporary files.
*/
@VisibleForTesting
@Experimental(Kind.FILESYSTEM)
- final void copyToOutputFiles(Map<ResourceId, ResourceId> filenames) throws IOException {
+ final void copyToOutputFiles(Map<ResourceId, ResourceId> filenames)
+ throws IOException {
int numFiles = filenames.size();
if (numFiles > 0) {
LOG.debug("Copying {} files.", numFiles);
@@ -754,8 +698,10 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
}
}
- /** Returns the FileBasedSink for this write operation. */
- public FileBasedSink<OutputT, DestinationT> getSink() {
+ /**
+ * Returns the FileBasedSink for this write operation.
+ */
+ public FileBasedSink<T> getSink() {
return sink;
}
@@ -773,28 +719,33 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
}
}
- /** Returns the {@link WritableByteChannelFactory} used. */
- protected final WritableByteChannelFactory getWritableByteChannelFactory() {
- return writableByteChannelFactory;
+ /** Returns the extension that will be written to the produced files. */
+ protected final String getExtension() {
+ String extension = MoreObjects.firstNonNull(writableByteChannelFactory.getFilenameSuffix(), "");
+ if (!extension.isEmpty() && !extension.startsWith(".")) {
+ extension = "." + extension;
+ }
+ return extension;
}
/**
- * Abstract writer that writes a bundle to a {@link FileBasedSink}. Subclass implementations
- * provide a method that can write a single value to a {@link WritableByteChannel}.
+ * Abstract writer that writes a bundle to a {@link FileBasedSink}. Subclass
+ * implementations provide a method that can write a single value to a
+ * {@link WritableByteChannel}.
*
* <p>Subclass implementations may also override methods that write headers and footers before and
* after the values in a bundle, respectively, as well as provide a MIME type for the output
* channel.
*
- * <p>Multiple {@link Writer} instances may be created on the same worker, and therefore any
- * access to static members or methods should be thread safe.
+ * <p>Multiple {@link Writer} instances may be created on the same worker, and therefore
+ * any access to static members or methods should be thread safe.
*
- * @param <OutputT> the type of values to write.
+ * @param <T> the type of values to write.
*/
- public abstract static class Writer<OutputT, DestinationT> {
+ public abstract static class Writer<T> {
private static final Logger LOG = LoggerFactory.getLogger(Writer.class);
- private final WriteOperation<OutputT, DestinationT> writeOperation;
+ private final WriteOperation<T> writeOperation;
/** Unique id for this output bundle. */
private String id;
@@ -802,7 +753,6 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
private BoundedWindow window;
private PaneInfo paneInfo;
private int shard = -1;
- private DestinationT destination;
/** The output file for this bundle. May be null if opening failed. */
private @Nullable ResourceId outputFile;
@@ -822,8 +772,10 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
*/
private final String mimeType;
- /** Construct a new {@link Writer} that will produce files of the given MIME type. */
- public Writer(WriteOperation<OutputT, DestinationT> writeOperation, String mimeType) {
+ /**
+ * Construct a new {@link Writer} that will produce files of the given MIME type.
+ */
+ public Writer(WriteOperation<T> writeOperation, String mimeType) {
checkNotNull(writeOperation);
this.writeOperation = writeOperation;
this.mimeType = mimeType;
@@ -866,29 +818,28 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
* id populated for the case of static sharding. In cases where the runner is dynamically
* picking sharding, shard might be set to -1.
*/
- public final void openWindowed(
- String uId, BoundedWindow window, PaneInfo paneInfo, int shard, DestinationT destination)
+ public final void openWindowed(String uId, BoundedWindow window, PaneInfo paneInfo, int shard)
throws Exception {
if (!getWriteOperation().windowedWrites) {
throw new IllegalStateException("openWindowed called a non-windowed sink.");
}
- open(uId, window, paneInfo, shard, destination);
+ open(uId, window, paneInfo, shard);
}
/**
* Called for each value in the bundle.
*/
- public abstract void write(OutputT value) throws Exception;
+ public abstract void write(T value) throws Exception;
/**
- * Similar to {@link #openWindowed} however for the case where unwindowed writes were requested.
+ * Similar to {@link #openWindowed} however for the case where unwindowed writes were
+ * requested.
*/
- public final void openUnwindowed(String uId, int shard, DestinationT destination)
- throws Exception {
+ public final void openUnwindowed(String uId, int shard) throws Exception {
if (getWriteOperation().windowedWrites) {
throw new IllegalStateException("openUnwindowed called a windowed sink.");
}
- open(uId, null, null, shard, destination);
+ open(uId, null, null, shard);
}
// Helper function to close a channel, on exception cases.
@@ -904,18 +855,14 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
}
}
- private void open(
- String uId,
- @Nullable BoundedWindow window,
- @Nullable PaneInfo paneInfo,
- int shard,
- DestinationT destination)
- throws Exception {
+ private void open(String uId,
+ @Nullable BoundedWindow window,
+ @Nullable PaneInfo paneInfo,
+ int shard) throws Exception {
this.id = uId;
this.window = window;
this.paneInfo = paneInfo;
this.shard = shard;
- this.destination = destination;
ResourceId tempDirectory = getWriteOperation().tempDirectory.get();
outputFile = tempDirectory.resolve(id, StandardResolveOptions.RESOLVE_FILE);
verifyNotNull(
@@ -961,7 +908,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
}
/** Closes the channel and returns the bundle result. */
- public final FileResult<DestinationT> close() throws Exception {
+ public final FileResult close() throws Exception {
checkState(outputFile != null, "FileResult.close cannot be called with a null outputFile");
LOG.debug("Writing footer to {}.", outputFile);
@@ -991,41 +938,35 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
throw new IOException(String.format("Failed closing channel to %s", outputFile), e);
}
- FileResult<DestinationT> result =
- new FileResult<>(outputFile, shard, window, paneInfo, destination);
+ FileResult result = new FileResult(outputFile, shard, window, paneInfo);
LOG.debug("Result for bundle {}: {}", this.id, outputFile);
return result;
}
- /** Return the WriteOperation that this Writer belongs to. */
- public WriteOperation<OutputT, DestinationT> getWriteOperation() {
+ /**
+ * Return the WriteOperation that this Writer belongs to.
+ */
+ public WriteOperation<T> getWriteOperation() {
return writeOperation;
}
}
/**
- * Result of a single bundle write. Contains the filename produced by the bundle, and if known the
- * final output filename.
+ * Result of a single bundle write. Contains the filename produced by the bundle, and if known
+ * the final output filename.
*/
- public static final class FileResult<DestinationT> {
+ public static final class FileResult {
private final ResourceId tempFilename;
private final int shard;
private final BoundedWindow window;
private final PaneInfo paneInfo;
- private final DestinationT destination;
@Experimental(Kind.FILESYSTEM)
- public FileResult(
- ResourceId tempFilename,
- int shard,
- BoundedWindow window,
- PaneInfo paneInfo,
- DestinationT destination) {
+ public FileResult(ResourceId tempFilename, int shard, BoundedWindow window, PaneInfo paneInfo) {
this.tempFilename = tempFilename;
this.shard = shard;
this.window = window;
this.paneInfo = paneInfo;
- this.destination = destination;
}
@Experimental(Kind.FILESYSTEM)
@@ -1037,8 +978,8 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
return shard;
}
- public FileResult<DestinationT> withShard(int shard) {
- return new FileResult<>(tempFilename, shard, window, paneInfo, destination);
+ public FileResult withShard(int shard) {
+ return new FileResult(tempFilename, shard, window, paneInfo);
}
public BoundedWindow getWindow() {
@@ -1049,24 +990,17 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
return paneInfo;
}
- public DestinationT getDestination() {
- return destination;
- }
-
@Experimental(Kind.FILESYSTEM)
- public ResourceId getDestinationFile(
- DynamicDestinations<?, DestinationT> dynamicDestinations,
- int numShards,
- OutputFileHints outputFileHints) {
+ public ResourceId getDestinationFile(FilenamePolicy policy, ResourceId outputDirectory,
+ int numShards, String extension) {
checkArgument(getShard() != UNKNOWN_SHARDNUM);
checkArgument(numShards > 0);
- FilenamePolicy policy = dynamicDestinations.getFilenamePolicy(destination);
if (getWindow() != null) {
- return policy.windowedFilename(
- new WindowedContext(getWindow(), getPaneInfo(), getShard(), numShards),
- outputFileHints);
+ return policy.windowedFilename(outputDirectory, new WindowedContext(
+ getWindow(), getPaneInfo(), getShard(), numShards), extension);
} else {
- return policy.unwindowedFilename(new Context(getShard(), numShards), outputFileHints);
+ return policy.unwindowedFilename(outputDirectory, new Context(getShard(), numShards),
+ extension);
}
}
@@ -1080,24 +1014,22 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
}
}
- /** A coder for {@link FileResult} objects. */
- public static final class FileResultCoder<DestinationT>
- extends StructuredCoder<FileResult<DestinationT>> {
+ /**
+ * A coder for {@link FileResult} objects.
+ */
+ public static final class FileResultCoder extends StructuredCoder<FileResult> {
private static final Coder<String> FILENAME_CODER = StringUtf8Coder.of();
private static final Coder<Integer> SHARD_CODER = VarIntCoder.of();
private static final Coder<PaneInfo> PANE_INFO_CODER = NullableCoder.of(PaneInfoCoder.INSTANCE);
+
private final Coder<BoundedWindow> windowCoder;
- private final Coder<DestinationT> destinationCoder;
- protected FileResultCoder(
- Coder<BoundedWindow> windowCoder, Coder<DestinationT> destinationCoder) {
+ protected FileResultCoder(Coder<BoundedWindow> windowCoder) {
this.windowCoder = NullableCoder.of(windowCoder);
- this.destinationCoder = destinationCoder;
}
- public static <DestinationT> FileResultCoder<DestinationT> of(
- Coder<BoundedWindow> windowCoder, Coder<DestinationT> destinationCoder) {
- return new FileResultCoder<>(windowCoder, destinationCoder);
+ public static FileResultCoder of(Coder<BoundedWindow> windowCoder) {
+ return new FileResultCoder(windowCoder);
}
@Override
@@ -1106,7 +1038,8 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
}
@Override
- public void encode(FileResult<DestinationT> value, OutputStream outStream) throws IOException {
+ public void encode(FileResult value, OutputStream outStream)
+ throws IOException {
if (value == null) {
throw new CoderException("cannot encode a null value");
}
@@ -1114,22 +1047,17 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
windowCoder.encode(value.getWindow(), outStream);
PANE_INFO_CODER.encode(value.getPaneInfo(), outStream);
SHARD_CODER.encode(value.getShard(), outStream);
- destinationCoder.encode(value.getDestination(), outStream);
}
@Override
- public FileResult<DestinationT> decode(InputStream inStream) throws IOException {
+ public FileResult decode(InputStream inStream)
+ throws IOException {
String tempFilename = FILENAME_CODER.decode(inStream);
BoundedWindow window = windowCoder.decode(inStream);
PaneInfo paneInfo = PANE_INFO_CODER.decode(inStream);
int shard = SHARD_CODER.decode(inStream);
- DestinationT destination = destinationCoder.decode(inStream);
- return new FileResult<>(
- FileSystems.matchNewResource(tempFilename, false /* isDirectory */),
- shard,
- window,
- paneInfo,
- destination);
+ return new FileResult(FileSystems.matchNewResource(tempFilename, false /* isDirectory */),
+ shard, window, paneInfo);
}
@Override
@@ -1138,15 +1066,25 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
windowCoder.verifyDeterministic();
PANE_INFO_CODER.verifyDeterministic();
SHARD_CODER.verifyDeterministic();
- destinationCoder.verifyDeterministic();
}
}
/**
- * Provides hints about how to generate output files, such as a suggested filename suffix (e.g.
- * based on the compression type), and the file MIME type.
+ * Implementations create instances of {@link WritableByteChannel} used by {@link FileBasedSink}
+ * and related classes to allow <em>decorating</em>, or otherwise transforming, the raw data that
+ * would normally be written directly to the {@link WritableByteChannel} passed into
+ * {@link WritableByteChannelFactory#create(WritableByteChannel)}.
+ *
+ * <p>Subclasses should override {@link #toString()} with something meaningful, as it is used when
+ * building {@link DisplayData}.
*/
- public interface OutputFileHints extends Serializable {
+ public interface WritableByteChannelFactory extends Serializable {
+ /**
+ * @param channel the {@link WritableByteChannel} to wrap
+ * @return the {@link WritableByteChannel} to be used during output
+ */
+ WritableByteChannel create(WritableByteChannel channel) throws IOException;
+
/**
* Returns the MIME type that should be used for the files that will hold the output data. May
* return {@code null} if this {@code WritableByteChannelFactory} does not meaningfully change
@@ -1163,23 +1101,6 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
* @return an optional filename suffix, eg, ".gz" is returned by {@link CompressionType#GZIP}
*/
@Nullable
- String getSuggestedFilenameSuffix();
- }
-
- /**
- * Implementations create instances of {@link WritableByteChannel} used by {@link FileBasedSink}
- * and related classes to allow <em>decorating</em>, or otherwise transforming, the raw data that
- * would normally be written directly to the {@link WritableByteChannel} passed into {@link
- * WritableByteChannelFactory#create(WritableByteChannel)}.
- *
- * <p>Subclasses should override {@link #toString()} with something meaningful, as it is used when
- * building {@link DisplayData}.
- */
- public interface WritableByteChannelFactory extends OutputFileHints {
- /**
- * @param channel the {@link WritableByteChannel} to wrap
- * @return the {@link WritableByteChannel} to be used during output
- */
- WritableByteChannel create(WritableByteChannel channel) throws IOException;
+ String getFilenameSuffix();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
index c3687a9..05f0d97 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
-import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.io.range.OffsetRangeTracker;
import org.apache.beam.sdk.io.range.RangeTracker;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -111,7 +110,8 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
@Override
public List<? extends OffsetBasedSource<T>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
- // Split the range into bundles based on the desiredBundleSizeBytes. If the desired bundle
+ // Split the range into bundles based on the desiredBundleSizeBytes. Final bundle is adjusted to
+ // make sure that we do not end up with a too small bundle at the end. If the desired bundle
// size is smaller than the minBundleSize of the source then minBundleSize will be used instead.
long desiredBundleSizeOffsetUnits = Math.max(
@@ -119,10 +119,20 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
minBundleSize);
List<OffsetBasedSource<T>> subSources = new ArrayList<>();
- for (OffsetRange range :
- new OffsetRange(startOffset, Math.min(endOffset, getMaxEndOffset(options)))
- .split(desiredBundleSizeOffsetUnits, minBundleSize)) {
- subSources.add(createSourceForSubrange(range.getFrom(), range.getTo()));
+ long start = startOffset;
+ long maxEnd = Math.min(endOffset, getMaxEndOffset(options));
+
+ while (start < maxEnd) {
+ long end = start + desiredBundleSizeOffsetUnits;
+ end = Math.min(end, maxEnd);
+ // Avoid having a too small bundle at the end and ensure that we respect minBundleSize.
+ long remaining = maxEnd - end;
+ if ((remaining < desiredBundleSizeOffsetUnits / 4) || (remaining < minBundleSize)) {
+ end = maxEnd;
+ }
+ subSources.add(createSourceForSubrange(start, end));
+
+ start = end;
}
return subSources;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index 6e7b243..e288075 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -45,7 +45,6 @@ import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.PBegin;
@@ -356,11 +355,12 @@ public class TFRecordIO {
public PDone expand(PCollection<byte[]> input) {
checkState(getOutputPrefix() != null,
"need to set the output prefix of a TFRecordIO.Write transform");
- WriteFiles<byte[], Void, byte[]> write =
- WriteFiles.<byte[], Void, byte[]>to(
+ WriteFiles<byte[]> write = WriteFiles.to(
new TFRecordSink(
- getOutputPrefix(), getShardTemplate(), getFilenameSuffix(), getCompressionType()),
- SerializableFunctions.<byte[]>identity());
+ getOutputPrefix(),
+ getShardTemplate(),
+ getFilenameSuffix(),
+ getCompressionType()));
if (getNumShards() > 0) {
write = write.withNumShards(getNumShards());
}
@@ -546,20 +546,20 @@ public class TFRecordIO {
}
}
- /** A {@link FileBasedSink} for TFRecord files. Produces TFRecord files. */
+ /**
+ * A {@link FileBasedSink} for TFRecord files. Produces TFRecord files.
+ */
@VisibleForTesting
- static class TFRecordSink extends FileBasedSink<byte[], Void> {
+ static class TFRecordSink extends FileBasedSink<byte[]> {
@VisibleForTesting
- TFRecordSink(
- ValueProvider<ResourceId> outputPrefix,
+ TFRecordSink(ValueProvider<ResourceId> outputPrefix,
@Nullable String shardTemplate,
@Nullable String suffix,
TFRecordIO.CompressionType compressionType) {
super(
outputPrefix,
- DynamicFileDestinations.constant(
- DefaultFilenamePolicy.fromStandardParameters(
- outputPrefix, shardTemplate, suffix, false)),
+ DefaultFilenamePolicy.constructUsingStandardParameters(
+ outputPrefix, shardTemplate, suffix, false),
writableByteChannelFactory(compressionType));
}
@@ -571,7 +571,7 @@ public class TFRecordIO {
}
@Override
- public WriteOperation<byte[], Void> createWriteOperation() {
+ public WriteOperation<byte[]> createWriteOperation() {
return new TFRecordWriteOperation(this);
}
@@ -590,24 +590,30 @@ public class TFRecordIO {
return CompressionType.UNCOMPRESSED;
}
- /** A {@link WriteOperation WriteOperation} for TFRecord files. */
- private static class TFRecordWriteOperation extends WriteOperation<byte[], Void> {
+ /**
+ * A {@link WriteOperation
+ * WriteOperation} for TFRecord files.
+ */
+ private static class TFRecordWriteOperation extends WriteOperation<byte[]> {
private TFRecordWriteOperation(TFRecordSink sink) {
super(sink);
}
@Override
- public Writer<byte[], Void> createWriter() throws Exception {
+ public Writer<byte[]> createWriter() throws Exception {
return new TFRecordWriter(this);
}
}
- /** A {@link Writer Writer} for TFRecord files. */
- private static class TFRecordWriter extends Writer<byte[], Void> {
+ /**
+ * A {@link Writer Writer}
+ * for TFRecord files.
+ */
+ private static class TFRecordWriter extends Writer<byte[]> {
private WritableByteChannel outChannel;
private TFRecordCodec codec;
- private TFRecordWriter(WriteOperation<byte[], Void> writeOperation) {
+ private TFRecordWriter(WriteOperation<byte[]> writeOperation) {
super(writeOperation, MimeTypes.BINARY);
}