You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/11 01:24:06 UTC
[3/5] beam git commit: Adds DynamicDestinations support to
FileBasedSink
http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
new file mode 100644
index 0000000..e7ef0f6
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy.ParamsCoder;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+
+/** Some helper classes that derive from {@link FileBasedSink.DynamicDestinations}. */
+public class DynamicFileDestinations {
+ /** Always returns a constant {@link FilenamePolicy}. */
+ private static class ConstantFilenamePolicy<T> extends DynamicDestinations<T, Void> {
+ private final FilenamePolicy filenamePolicy;
+
+ public ConstantFilenamePolicy(FilenamePolicy filenamePolicy) {
+ this.filenamePolicy = filenamePolicy;
+ }
+
+ @Override
+ public Void getDestination(T element) {
+ return (Void) null;
+ }
+
+ @Override
+ public Coder<Void> getDestinationCoder() {
+ return null;
+ }
+
+ @Override
+ public Void getDefaultDestination() {
+ return (Void) null;
+ }
+
+ @Override
+ public FilenamePolicy getFilenamePolicy(Void destination) {
+ return filenamePolicy;
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ filenamePolicy.populateDisplayData(builder);
+ }
+ }
+
+ /**
+ * A base class for a {@link DynamicDestinations} object that returns differently-configured
+ * instances of {@link DefaultFilenamePolicy}.
+ */
+ private static class DefaultPolicyDestinations<UserT> extends DynamicDestinations<UserT, Params> {
+ SerializableFunction<UserT, Params> destinationFunction;
+ Params emptyDestination;
+
+ public DefaultPolicyDestinations(
+ SerializableFunction<UserT, Params> destinationFunction, Params emptyDestination) {
+ this.destinationFunction = destinationFunction;
+ this.emptyDestination = emptyDestination;
+ }
+
+ @Override
+ public Params getDestination(UserT element) {
+ return destinationFunction.apply(element);
+ }
+
+ @Override
+ public Params getDefaultDestination() {
+ return emptyDestination;
+ }
+
+ @Nullable
+ @Override
+ public Coder<Params> getDestinationCoder() {
+ return ParamsCoder.of();
+ }
+
+ @Override
+ public FilenamePolicy getFilenamePolicy(DefaultFilenamePolicy.Params params) {
+ return DefaultFilenamePolicy.fromParams(params);
+ }
+ }
+
+ /** Returns a {@link DynamicDestinations} that always returns the same {@link FilenamePolicy}. */
+ public static <T> DynamicDestinations<T, Void> constant(FilenamePolicy filenamePolicy) {
+ return new ConstantFilenamePolicy<>(filenamePolicy);
+ }
+
+ /**
+ * Returns a {@link DynamicDestinations} that returns instances of {@link DefaultFilenamePolicy}
+ * configured with the given {@link Params}.
+ */
+ public static <UserT> DynamicDestinations<UserT, Params> toDefaultPolicies(
+ SerializableFunction<UserT, Params> destinationFunction, Params emptyDestination) {
+ return new DefaultPolicyDestinations<>(destinationFunction, emptyDestination);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 8102316..583af60 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -33,6 +33,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
+import java.lang.reflect.TypeVariable;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
@@ -49,8 +50,10 @@ import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.StructuredCoder;
@@ -73,6 +76,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
import org.joda.time.Instant;
@@ -82,43 +86,43 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Abstract class for file-based output. An implementation of FileBasedSink writes file-based
- * output and defines the format of output files (how values are written, headers/footers, MIME
- * type, etc.).
+ * Abstract class for file-based output. An implementation of FileBasedSink writes file-based output
+ * and defines the format of output files (how values are written, headers/footers, MIME type,
+ * etc.).
*
* <p>At pipeline construction time, the methods of FileBasedSink are called to validate the sink
* and to create a {@link WriteOperation} that manages the process of writing to the sink.
*
* <p>The process of writing to file-based sink is as follows:
+ *
* <ol>
- * <li>An optional subclass-defined initialization,
- * <li>a parallel write of bundles to temporary files, and finally,
- * <li>these temporary files are renamed with final output filenames.
+ * <li>An optional subclass-defined initialization,
+ * <li>a parallel write of bundles to temporary files, and finally,
+ * <li>these temporary files are renamed with final output filenames.
* </ol>
*
* <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the
* event of failure/retry or for redundancy). However, exactly one of these executions will have its
- * result passed to the finalize method. Each call to {@link Writer#openWindowed}
- * or {@link Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called
- * by the WriteFiles transform, so even redundant or retried bundles will have a unique way of
- * identifying
- * their output.
+ * result passed to the finalize method. Each call to {@link Writer#openWindowed} or {@link
+ * Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called by the WriteFiles
+ * transform, so even redundant or retried bundles will have a unique way of identifying their
+ * output.
*
* <p>The bundle id should be used to guarantee that a bundle's output is unique. This uniqueness
* guarantee is important; if a bundle is to be output to a file, for example, the name of the file
* will encode the unique bundle id to avoid conflicts with other writers.
*
- * {@link FileBasedSink} can take a custom {@link FilenamePolicy} object to determine output
- * filenames, and this policy object can be used to write windowed or triggered
- * PCollections into separate files per window pane. This allows file output from unbounded
- * PCollections, and also works for bounded PCollecctions.
+ * <p>{@link FileBasedSink} can take a custom {@link FilenamePolicy} object to determine output
+ * filenames, and this policy object can be used to write windowed or triggered PCollections into
+ * separate files per window pane. This allows file output from unbounded PCollections, and also
+ * works for bounded PCollecctions.
*
* <p>Supported file systems are those registered with {@link FileSystems}.
*
- * @param <T> the type of values written to the sink.
+ * @param <OutputT> the type of values written to the sink.
*/
@Experimental(Kind.FILESYSTEM)
-public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
+public abstract class FileBasedSink<OutputT, DestinationT> implements Serializable, HasDisplayData {
private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
/**
@@ -173,7 +177,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
}
@Override
- public String getFilenameSuffix() {
+ public String getSuggestedFilenameSuffix() {
return filenameSuffix;
}
@@ -205,6 +209,8 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
}
}
+ private final DynamicDestinations<?, DestinationT> dynamicDestinations;
+
/**
* The {@link WritableByteChannelFactory} that is used to wrap the raw data output to the
* underlying channel. The default is to not compress the output using
@@ -213,8 +219,70 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
private final WritableByteChannelFactory writableByteChannelFactory;
/**
- * A naming policy for output files.
+ * A class that allows value-dependent writes in {@link FileBasedSink}.
+ *
+ * <p>Users can define a custom type to represent destinations, and provide a mapping to turn this
+ * destination type into an instance of {@link FilenamePolicy}.
*/
+ @Experimental(Kind.FILESYSTEM)
+ public abstract static class DynamicDestinations<UserT, DestinationT>
+ implements HasDisplayData, Serializable {
+ /**
+ * Returns an object that represents at a high level the destination being written to. May not
+ * return null.
+ */
+ public abstract DestinationT getDestination(UserT element);
+
+ /**
+ * Returns the default destination. This is used for collections that have no elements as the
+ * destination to write empty files to.
+ */
+ public abstract DestinationT getDefaultDestination();
+
+ /**
+ * Returns the coder for {@link DestinationT}. If this is not overridden, then the coder
+ * registry will be use to find a suitable coder. This must be a deterministic coder, as {@link
+ * DestinationT} will be used as a key type in a {@link
+ * org.apache.beam.sdk.transforms.GroupByKey}.
+ */
+ @Nullable
+ public Coder<DestinationT> getDestinationCoder() {
+ return null;
+ }
+
+ /** Converts a destination into a {@link FilenamePolicy}. May not return null. */
+ public abstract FilenamePolicy getFilenamePolicy(DestinationT destination);
+
+ /** Populates the display data. */
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {}
+
+ // Gets the destination coder. If the user does not provide one, try to find one in the coder
+ // registry. If no coder can be found, throws CannotProvideCoderException.
+ final Coder<DestinationT> getDestinationCoderWithDefault(CoderRegistry registry)
+ throws CannotProvideCoderException {
+ Coder<DestinationT> destinationCoder = getDestinationCoder();
+ if (destinationCoder != null) {
+ return destinationCoder;
+ }
+ // If dynamicDestinations doesn't provide a coder, try to find it in the coder registry.
+ // We must first use reflection to figure out what the type parameter is.
+ TypeDescriptor<?> superDescriptor =
+ TypeDescriptor.of(getClass()).getSupertype(DynamicDestinations.class);
+ if (!superDescriptor.getRawType().equals(DynamicDestinations.class)) {
+ throw new AssertionError(
+ "Couldn't find the DynamicDestinations superclass of " + this.getClass());
+ }
+ TypeVariable typeVariable = superDescriptor.getTypeParameter("DestinationT");
+ @SuppressWarnings("unchecked")
+ TypeDescriptor<DestinationT> descriptor =
+ (TypeDescriptor<DestinationT>) superDescriptor.resolveType(typeVariable);
+ return registry.getCoder(descriptor);
+ }
+ }
+
+ /** A naming policy for output files. */
+ @Experimental(Kind.FILESYSTEM)
public abstract static class FilenamePolicy implements Serializable {
/**
* Context used for generating a name based on shard number, and num shards.
@@ -287,29 +355,28 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
/**
* When a sink has requested windowed or triggered output, this method will be invoked to return
* the file {@link ResourceId resource} to be created given the base output directory and a
- * (possibly empty) extension from {@link FileBasedSink} configuration
- * (e.g., {@link CompressionType}).
+ * {@link OutputFileHints} containing information about the file, including a suggested
+ * extension (e.g. coming from {@link CompressionType}).
*
- * <p>The {@link WindowedContext} object gives access to the window and pane,
- * as well as sharding information. The policy must return unique and consistent filenames
- * for different windows and panes.
+ * <p>The {@link WindowedContext} object gives access to the window and pane, as well as
+ * sharding information. The policy must return unique and consistent filenames for different
+ * windows and panes.
*/
@Experimental(Kind.FILESYSTEM)
- public abstract ResourceId windowedFilename(
- ResourceId outputDirectory, WindowedContext c, String extension);
+ public abstract ResourceId windowedFilename(WindowedContext c, OutputFileHints outputFileHints);
/**
* When a sink has not requested windowed or triggered output, this method will be invoked to
* return the file {@link ResourceId resource} to be created given the base output directory and
- * a (possibly empty) extension applied by additional {@link FileBasedSink} configuration
- * (e.g., {@link CompressionType}).
+ * a {@link OutputFileHints} containing information about the file, including a suggested (e.g.
+ * coming from {@link CompressionType}).
*
* <p>The {@link Context} object only provides sharding information, which is used by the policy
* to generate unique and consistent filenames.
*/
@Experimental(Kind.FILESYSTEM)
- @Nullable public abstract ResourceId unwindowedFilename(
- ResourceId outputDirectory, Context c, String extension);
+ @Nullable
+ public abstract ResourceId unwindowedFilename(Context c, OutputFileHints outputFileHints);
/**
* Populates the display data.
@@ -318,19 +385,8 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
}
}
- /** The policy used to generate names of files to be produced. */
- private final FilenamePolicy filenamePolicy;
/** The directory to which files will be written. */
- private final ValueProvider<ResourceId> baseOutputDirectoryProvider;
-
- /**
- * Construct a {@link FileBasedSink} with the given filename policy, producing uncompressed files.
- */
- @Experimental(Kind.FILESYSTEM)
- public FileBasedSink(
- ValueProvider<ResourceId> baseOutputDirectoryProvider, FilenamePolicy filenamePolicy) {
- this(baseOutputDirectoryProvider, filenamePolicy, CompressionType.UNCOMPRESSED);
- }
+ private final ValueProvider<ResourceId> tempDirectoryProvider;
private static class ExtractDirectory implements SerializableFunction<ResourceId, ResourceId> {
@Override
@@ -340,95 +396,91 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
}
/**
- * Construct a {@link FileBasedSink} with the given filename policy and output channel type.
+ * Construct a {@link FileBasedSink} with the given temp directory, producing uncompressed files.
*/
@Experimental(Kind.FILESYSTEM)
public FileBasedSink(
- ValueProvider<ResourceId> baseOutputDirectoryProvider,
- FilenamePolicy filenamePolicy,
+ ValueProvider<ResourceId> tempDirectoryProvider,
+ DynamicDestinations<?, DestinationT> dynamicDestinations) {
+ this(tempDirectoryProvider, dynamicDestinations, CompressionType.UNCOMPRESSED);
+ }
+
+ /** Construct a {@link FileBasedSink} with the given temp directory and output channel type. */
+ @Experimental(Kind.FILESYSTEM)
+ public FileBasedSink(
+ ValueProvider<ResourceId> tempDirectoryProvider,
+ DynamicDestinations<?, DestinationT> dynamicDestinations,
WritableByteChannelFactory writableByteChannelFactory) {
- this.baseOutputDirectoryProvider =
- NestedValueProvider.of(baseOutputDirectoryProvider, new ExtractDirectory());
- this.filenamePolicy = filenamePolicy;
+ this.tempDirectoryProvider =
+ NestedValueProvider.of(tempDirectoryProvider, new ExtractDirectory());
+ this.dynamicDestinations = checkNotNull(dynamicDestinations);
this.writableByteChannelFactory = writableByteChannelFactory;
}
- /**
- * Returns the base directory inside which files will be written according to the configured
- * {@link FilenamePolicy}.
- */
- @Experimental(Kind.FILESYSTEM)
- public ValueProvider<ResourceId> getBaseOutputDirectoryProvider() {
- return baseOutputDirectoryProvider;
+ /** Return the {@link DynamicDestinations} used. */
+ @SuppressWarnings("unchecked")
+ public <UserT> DynamicDestinations<UserT, DestinationT> getDynamicDestinations() {
+ return (DynamicDestinations<UserT, DestinationT>) dynamicDestinations;
}
/**
- * Returns the policy by which files will be named inside of the base output directory. Note that
- * the {@link FilenamePolicy} may itself specify one or more inner directories before each output
- * file, say when writing windowed outputs in a {@code output/YYYY/MM/DD/file.txt} format.
+ * Returns the directory inside which temprary files will be written according to the configured
+ * {@link FilenamePolicy}.
*/
@Experimental(Kind.FILESYSTEM)
- public final FilenamePolicy getFilenamePolicy() {
- return filenamePolicy;
+ public ValueProvider<ResourceId> getTempDirectoryProvider() {
+ return tempDirectoryProvider;
}
public void validate(PipelineOptions options) {}
- /**
- * Return a subclass of {@link WriteOperation} that will manage the write
- * to the sink.
- */
- public abstract WriteOperation<T> createWriteOperation();
+ /** Return a subclass of {@link WriteOperation} that will manage the write to the sink. */
+ public abstract WriteOperation<OutputT, DestinationT> createWriteOperation();
public void populateDisplayData(DisplayData.Builder builder) {
- getFilenamePolicy().populateDisplayData(builder);
+ getDynamicDestinations().populateDisplayData(builder);
}
/**
* Abstract operation that manages the process of writing to {@link FileBasedSink}.
*
- * <p>The primary responsibilities of the WriteOperation is the management of output
- * files. During a write, {@link Writer}s write bundles to temporary file
- * locations. After the bundles have been written,
+ * <p>The primary responsibilities of the WriteOperation is the management of output files. During
+ * a write, {@link Writer}s write bundles to temporary file locations. After the bundles have been
+ * written,
+ *
* <ol>
- * <li>{@link WriteOperation#finalize} is given a list of the temporary
- * files containing the output bundles.
- * <li>During finalize, these temporary files are copied to final output locations and named
- * according to a file naming template.
- * <li>Finally, any temporary files that were created during the write are removed.
+ * <li>{@link WriteOperation#finalize} is given a list of the temporary files containing the
+ * output bundles.
+ * <li>During finalize, these temporary files are copied to final output locations and named
+ * according to a file naming template.
+ * <li>Finally, any temporary files that were created during the write are removed.
* </ol>
*
- * <p>Subclass implementations of WriteOperation must implement
- * {@link WriteOperation#createWriter} to return a concrete
- * FileBasedSinkWriter.
+ * <p>Subclass implementations of WriteOperation must implement {@link
+ * WriteOperation#createWriter} to return a concrete FileBasedSinkWriter.
*
- * <h2>Temporary and Output File Naming:</h2> During the write, bundles are written to temporary
- * files using the tempDirectory that can be provided via the constructor of
- * WriteOperation. These temporary files will be named
- * {@code {tempDirectory}/{bundleId}}, where bundleId is the unique id of the bundle.
- * For example, if tempDirectory is "gs://my-bucket/my_temp_output", the output for a
- * bundle with bundle id 15723 will be "gs://my-bucket/my_temp_output/15723".
+ * <h2>Temporary and Output File Naming:</h2>
*
- * <p>Final output files are written to baseOutputFilename with the format
- * {@code {baseOutputFilename}-0000i-of-0000n.{extension}} where n is the total number of bundles
- * written and extension is the file extension. Both baseOutputFilename and extension are required
- * constructor arguments.
+ * <p>During the write, bundles are written to temporary files using the tempDirectory that can be
+ * provided via the constructor of WriteOperation. These temporary files will be named {@code
+ * {tempDirectory}/{bundleId}}, where bundleId is the unique id of the bundle. For example, if
+ * tempDirectory is "gs://my-bucket/my_temp_output", the output for a bundle with bundle id 15723
+ * will be "gs://my-bucket/my_temp_output/15723".
*
- * <p>Subclass implementations can change the file naming template by supplying a value for
- * fileNamingTemplate.
+ * <p>Final output files are written to the location specified by the {@link FilenamePolicy}. If
+ * no filename policy is specified, then the {@link DefaultFilenamePolicy} will be used. The
+ * directory that the files are written to is determined by the {@link FilenamePolicy} instance.
*
* <p>Note that in the case of permanent failure of a bundle's write, no clean up of temporary
* files will occur.
*
* <p>If there are no elements in the PCollection being written, no output will be generated.
*
- * @param <T> the type of values written to the sink.
+ * @param <OutputT> the type of values written to the sink.
*/
- public abstract static class WriteOperation<T> implements Serializable {
- /**
- * The Sink that this WriteOperation will write to.
- */
- protected final FileBasedSink<T> sink;
+ public abstract static class WriteOperation<OutputT, DestinationT> implements Serializable {
+ /** The Sink that this WriteOperation will write to. */
+ protected final FileBasedSink<OutputT, DestinationT> sink;
/** Directory for temporary output files. */
protected final ValueProvider<ResourceId> tempDirectory;
@@ -445,17 +497,19 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
}
/**
- * Constructs a WriteOperation using the default strategy for generating a temporary
- * directory from the base output filename.
+ * Constructs a WriteOperation using the default strategy for generating a temporary directory
+ * from the base output filename.
*
- * <p>Default is a uniquely named sibling of baseOutputFilename, e.g. if baseOutputFilename is
- * /path/to/foo, the temporary directory will be /path/to/temp-beam-foo-$date.
+ * <p>Default is a uniquely named subdirectory of the provided tempDirectory, e.g. if
+ * tempDirectory is /path/to/foo/, the temporary directory will be
+ * /path/to/foo/temp-beam-foo-$date.
*
* @param sink the FileBasedSink that will be used to configure this write operation.
*/
- public WriteOperation(FileBasedSink<T> sink) {
- this(sink, NestedValueProvider.of(
- sink.getBaseOutputDirectoryProvider(), new TemporaryDirectoryBuilder()));
+ public WriteOperation(FileBasedSink<OutputT, DestinationT> sink) {
+ this(
+ sink,
+ NestedValueProvider.of(sink.getTempDirectoryProvider(), new TemporaryDirectoryBuilder()));
}
private static class TemporaryDirectoryBuilder
@@ -471,10 +525,12 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
private final Long tempId = TEMP_COUNT.getAndIncrement();
@Override
- public ResourceId apply(ResourceId baseOutputDirectory) {
+ public ResourceId apply(ResourceId tempDirectory) {
// Temp directory has a timestamp and a unique ID
String tempDirName = String.format(".temp-beam-%s-%s", timestamp, tempId);
- return baseOutputDirectory.resolve(tempDirName, StandardResolveOptions.RESOLVE_DIRECTORY);
+ return tempDirectory
+ .getCurrentDirectory()
+ .resolve(tempDirName, StandardResolveOptions.RESOLVE_DIRECTORY);
}
}
@@ -485,22 +541,22 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
* @param tempDirectory the base directory to be used for temporary output files.
*/
@Experimental(Kind.FILESYSTEM)
- public WriteOperation(FileBasedSink<T> sink, ResourceId tempDirectory) {
+ public WriteOperation(FileBasedSink<OutputT, DestinationT> sink, ResourceId tempDirectory) {
this(sink, StaticValueProvider.of(tempDirectory));
}
private WriteOperation(
- FileBasedSink<T> sink, ValueProvider<ResourceId> tempDirectory) {
+ FileBasedSink<OutputT, DestinationT> sink, ValueProvider<ResourceId> tempDirectory) {
this.sink = sink;
this.tempDirectory = tempDirectory;
this.windowedWrites = false;
}
/**
- * Clients must implement to return a subclass of {@link Writer}. This
- * method must not mutate the state of the object.
+ * Clients must implement to return a subclass of {@link Writer}. This method must not mutate
+ * the state of the object.
*/
- public abstract Writer<T> createWriter() throws Exception;
+ public abstract Writer<OutputT, DestinationT> createWriter() throws Exception;
/**
* Indicates that the operation will be performing windowed writes.
@@ -514,8 +570,8 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
* removing temporary files.
*
* <p>Finalization may be overridden by subclass implementations to perform customized
- * finalization (e.g., initiating some operation on output bundles, merging them, etc.).
- * {@code writerResults} contains the filenames of written bundles.
+ * finalization (e.g., initiating some operation on output bundles, merging them, etc.). {@code
+ * writerResults} contains the filenames of written bundles.
*
* <p>If subclasses override this method, they must guarantee that its implementation is
* idempotent, as it may be executed multiple times in the case of failure or for redundancy. It
@@ -523,7 +579,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
*
* @param writerResults the results of writes (FileResult).
*/
- public void finalize(Iterable<FileResult> writerResults) throws Exception {
+ public void finalize(Iterable<FileResult<DestinationT>> writerResults) throws Exception {
// Collect names of temporary files and rename them.
Map<ResourceId, ResourceId> outputFilenames = buildOutputFilenames(writerResults);
copyToOutputFiles(outputFilenames);
@@ -542,17 +598,14 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
@Experimental(Kind.FILESYSTEM)
protected final Map<ResourceId, ResourceId> buildOutputFilenames(
- Iterable<FileResult> writerResults) {
+ Iterable<FileResult<DestinationT>> writerResults) {
int numShards = Iterables.size(writerResults);
Map<ResourceId, ResourceId> outputFilenames = new HashMap<>();
- FilenamePolicy policy = getSink().getFilenamePolicy();
- ResourceId baseOutputDir = getSink().getBaseOutputDirectoryProvider().get();
-
// Either all results have a shard number set (if the sink is configured with a fixed
// number of shards), or they all don't (otherwise).
Boolean isShardNumberSetEverywhere = null;
- for (FileResult result : writerResults) {
+ for (FileResult<DestinationT> result : writerResults) {
boolean isShardNumberSetHere = (result.getShard() != UNKNOWN_SHARDNUM);
if (isShardNumberSetEverywhere == null) {
isShardNumberSetEverywhere = isShardNumberSetHere;
@@ -568,7 +621,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
isShardNumberSetEverywhere = true;
}
- List<FileResult> resultsWithShardNumbers = Lists.newArrayList();
+ List<FileResult<DestinationT>> resultsWithShardNumbers = Lists.newArrayList();
if (isShardNumberSetEverywhere) {
resultsWithShardNumbers = Lists.newArrayList(writerResults);
} else {
@@ -577,29 +630,32 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
// case of triggers, the list of FileResult objects in the Finalize iterable is not
// deterministic, and might change over retries. This breaks the assumption below that
// sorting the FileResult objects provides idempotency.
- List<FileResult> sortedByTempFilename =
+ List<FileResult<DestinationT>> sortedByTempFilename =
Ordering.from(
- new Comparator<FileResult>() {
- @Override
- public int compare(FileResult first, FileResult second) {
- String firstFilename = first.getTempFilename().toString();
- String secondFilename = second.getTempFilename().toString();
- return firstFilename.compareTo(secondFilename);
- }
- })
+ new Comparator<FileResult<DestinationT>>() {
+ @Override
+ public int compare(
+ FileResult<DestinationT> first, FileResult<DestinationT> second) {
+ String firstFilename = first.getTempFilename().toString();
+ String secondFilename = second.getTempFilename().toString();
+ return firstFilename.compareTo(secondFilename);
+ }
+ })
.sortedCopy(writerResults);
for (int i = 0; i < sortedByTempFilename.size(); i++) {
resultsWithShardNumbers.add(sortedByTempFilename.get(i).withShard(i));
}
}
- for (FileResult result : resultsWithShardNumbers) {
+ for (FileResult<DestinationT> result : resultsWithShardNumbers) {
checkArgument(
result.getShard() != UNKNOWN_SHARDNUM, "Should have set shard number on %s", result);
outputFilenames.put(
result.getTempFilename(),
result.getDestinationFile(
- policy, baseOutputDir, numShards, getSink().getExtension()));
+ getSink().getDynamicDestinations(),
+ numShards,
+ getSink().getWritableByteChannelFactory()));
}
int numDistinctShards = new HashSet<>(outputFilenames.values()).size();
@@ -615,18 +671,18 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
*
* <p>Can be called from subclasses that override {@link WriteOperation#finalize}.
*
- * <p>Files will be named according to the file naming template. The order of the output files
- * will be the same as the sorted order of the input filenames. In other words, if the input
- * filenames are ["C", "A", "B"], baseOutputFilename is "file", the extension is ".txt", and
- * the fileNamingTemplate is "-SSS-of-NNN", the contents of A will be copied to
- * file-000-of-003.txt, the contents of B will be copied to file-001-of-003.txt, etc.
+ * <p>Files will be named according to the {@link FilenamePolicy}. The order of the output files
+ * will be the same as the sorted order of the input filenames. In other words (when using
+ * {@link DefaultFilenamePolicy}), if the input filenames are ["C", "A", "B"], baseFilename (int
+ * the policy) is "dir/file", the extension is ".txt", and the fileNamingTemplate is
+ * "-SSS-of-NNN", the contents of A will be copied to dir/file-000-of-003.txt, the contents of B
+ * will be copied to dir/file-001-of-003.txt, etc.
*
* @param filenames the filenames of temporary files.
*/
@VisibleForTesting
@Experimental(Kind.FILESYSTEM)
- final void copyToOutputFiles(Map<ResourceId, ResourceId> filenames)
- throws IOException {
+ final void copyToOutputFiles(Map<ResourceId, ResourceId> filenames) throws IOException {
int numFiles = filenames.size();
if (numFiles > 0) {
LOG.debug("Copying {} files.", numFiles);
@@ -698,10 +754,8 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
}
}
- /**
- * Returns the FileBasedSink for this write operation.
- */
- public FileBasedSink<T> getSink() {
+ /** Returns the FileBasedSink for this write operation. */
+ public FileBasedSink<OutputT, DestinationT> getSink() {
return sink;
}
@@ -719,33 +773,28 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
}
}
- /** Returns the extension that will be written to the produced files. */
- protected final String getExtension() {
- String extension = MoreObjects.firstNonNull(writableByteChannelFactory.getFilenameSuffix(), "");
- if (!extension.isEmpty() && !extension.startsWith(".")) {
- extension = "." + extension;
- }
- return extension;
+ /** Returns the {@link WritableByteChannelFactory} used. */
+ protected final WritableByteChannelFactory getWritableByteChannelFactory() {
+ return writableByteChannelFactory;
}
/**
- * Abstract writer that writes a bundle to a {@link FileBasedSink}. Subclass
- * implementations provide a method that can write a single value to a
- * {@link WritableByteChannel}.
+ * Abstract writer that writes a bundle to a {@link FileBasedSink}. Subclass implementations
+ * provide a method that can write a single value to a {@link WritableByteChannel}.
*
* <p>Subclass implementations may also override methods that write headers and footers before and
* after the values in a bundle, respectively, as well as provide a MIME type for the output
* channel.
*
- * <p>Multiple {@link Writer} instances may be created on the same worker, and therefore
- * any access to static members or methods should be thread safe.
+ * <p>Multiple {@link Writer} instances may be created on the same worker, and therefore any
+ * access to static members or methods should be thread safe.
*
- * @param <T> the type of values to write.
+ * @param <OutputT> the type of values to write.
*/
- public abstract static class Writer<T> {
+ public abstract static class Writer<OutputT, DestinationT> {
private static final Logger LOG = LoggerFactory.getLogger(Writer.class);
- private final WriteOperation<T> writeOperation;
+ private final WriteOperation<OutputT, DestinationT> writeOperation;
/** Unique id for this output bundle. */
private String id;
@@ -753,6 +802,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
private BoundedWindow window;
private PaneInfo paneInfo;
private int shard = -1;
+ private DestinationT destination;
/** The output file for this bundle. May be null if opening failed. */
private @Nullable ResourceId outputFile;
@@ -772,10 +822,8 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
*/
private final String mimeType;
- /**
- * Construct a new {@link Writer} that will produce files of the given MIME type.
- */
- public Writer(WriteOperation<T> writeOperation, String mimeType) {
+ /** Construct a new {@link Writer} that will produce files of the given MIME type. */
+ public Writer(WriteOperation<OutputT, DestinationT> writeOperation, String mimeType) {
checkNotNull(writeOperation);
this.writeOperation = writeOperation;
this.mimeType = mimeType;
@@ -818,28 +866,29 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
* id populated for the case of static sharding. In cases where the runner is dynamically
* picking sharding, shard might be set to -1.
*/
- public final void openWindowed(String uId, BoundedWindow window, PaneInfo paneInfo, int shard)
+ public final void openWindowed(
+ String uId, BoundedWindow window, PaneInfo paneInfo, int shard, DestinationT destination)
throws Exception {
if (!getWriteOperation().windowedWrites) {
throw new IllegalStateException("openWindowed called a non-windowed sink.");
}
- open(uId, window, paneInfo, shard);
+ open(uId, window, paneInfo, shard, destination);
}
/**
* Called for each value in the bundle.
*/
- public abstract void write(T value) throws Exception;
+ public abstract void write(OutputT value) throws Exception;
/**
- * Similar to {@link #openWindowed} however for the case where unwindowed writes were
- * requested.
+ * Similar to {@link #openWindowed} however for the case where unwindowed writes were requested.
*/
- public final void openUnwindowed(String uId, int shard) throws Exception {
+ public final void openUnwindowed(String uId, int shard, DestinationT destination)
+ throws Exception {
if (getWriteOperation().windowedWrites) {
throw new IllegalStateException("openUnwindowed called a windowed sink.");
}
- open(uId, null, null, shard);
+ open(uId, null, null, shard, destination);
}
// Helper function to close a channel, on exception cases.
@@ -855,14 +904,18 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
}
}
- private void open(String uId,
- @Nullable BoundedWindow window,
- @Nullable PaneInfo paneInfo,
- int shard) throws Exception {
+ private void open(
+ String uId,
+ @Nullable BoundedWindow window,
+ @Nullable PaneInfo paneInfo,
+ int shard,
+ DestinationT destination)
+ throws Exception {
this.id = uId;
this.window = window;
this.paneInfo = paneInfo;
this.shard = shard;
+ this.destination = destination;
ResourceId tempDirectory = getWriteOperation().tempDirectory.get();
outputFile = tempDirectory.resolve(id, StandardResolveOptions.RESOLVE_FILE);
verifyNotNull(
@@ -908,7 +961,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
}
/** Closes the channel and returns the bundle result. */
- public final FileResult close() throws Exception {
+ public final FileResult<DestinationT> close() throws Exception {
checkState(outputFile != null, "FileResult.close cannot be called with a null outputFile");
LOG.debug("Writing footer to {}.", outputFile);
@@ -938,35 +991,41 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
throw new IOException(String.format("Failed closing channel to %s", outputFile), e);
}
- FileResult result = new FileResult(outputFile, shard, window, paneInfo);
+ FileResult<DestinationT> result =
+ new FileResult<>(outputFile, shard, window, paneInfo, destination);
LOG.debug("Result for bundle {}: {}", this.id, outputFile);
return result;
}
- /**
- * Return the WriteOperation that this Writer belongs to.
- */
- public WriteOperation<T> getWriteOperation() {
+ /** Return the WriteOperation that this Writer belongs to. */
+ public WriteOperation<OutputT, DestinationT> getWriteOperation() {
return writeOperation;
}
}
/**
- * Result of a single bundle write. Contains the filename produced by the bundle, and if known
- * the final output filename.
+ * Result of a single bundle write. Contains the filename produced by the bundle, and if known the
+ * final output filename.
*/
- public static final class FileResult {
+ public static final class FileResult<DestinationT> {
private final ResourceId tempFilename;
private final int shard;
private final BoundedWindow window;
private final PaneInfo paneInfo;
+ private final DestinationT destination;
@Experimental(Kind.FILESYSTEM)
- public FileResult(ResourceId tempFilename, int shard, BoundedWindow window, PaneInfo paneInfo) {
+ public FileResult(
+ ResourceId tempFilename,
+ int shard,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ DestinationT destination) {
this.tempFilename = tempFilename;
this.shard = shard;
this.window = window;
this.paneInfo = paneInfo;
+ this.destination = destination;
}
@Experimental(Kind.FILESYSTEM)
@@ -978,8 +1037,8 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
return shard;
}
- public FileResult withShard(int shard) {
- return new FileResult(tempFilename, shard, window, paneInfo);
+ public FileResult<DestinationT> withShard(int shard) {
+ return new FileResult<>(tempFilename, shard, window, paneInfo, destination);
}
public BoundedWindow getWindow() {
@@ -990,17 +1049,24 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
return paneInfo;
}
+ public DestinationT getDestination() {
+ return destination;
+ }
+
@Experimental(Kind.FILESYSTEM)
- public ResourceId getDestinationFile(FilenamePolicy policy, ResourceId outputDirectory,
- int numShards, String extension) {
+ public ResourceId getDestinationFile(
+ DynamicDestinations<?, DestinationT> dynamicDestinations,
+ int numShards,
+ OutputFileHints outputFileHints) {
checkArgument(getShard() != UNKNOWN_SHARDNUM);
checkArgument(numShards > 0);
+ FilenamePolicy policy = dynamicDestinations.getFilenamePolicy(destination);
if (getWindow() != null) {
- return policy.windowedFilename(outputDirectory, new WindowedContext(
- getWindow(), getPaneInfo(), getShard(), numShards), extension);
+ return policy.windowedFilename(
+ new WindowedContext(getWindow(), getPaneInfo(), getShard(), numShards),
+ outputFileHints);
} else {
- return policy.unwindowedFilename(outputDirectory, new Context(getShard(), numShards),
- extension);
+ return policy.unwindowedFilename(new Context(getShard(), numShards), outputFileHints);
}
}
@@ -1014,22 +1080,24 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
}
}
- /**
- * A coder for {@link FileResult} objects.
- */
- public static final class FileResultCoder extends StructuredCoder<FileResult> {
+ /** A coder for {@link FileResult} objects. */
+ public static final class FileResultCoder<DestinationT>
+ extends StructuredCoder<FileResult<DestinationT>> {
private static final Coder<String> FILENAME_CODER = StringUtf8Coder.of();
private static final Coder<Integer> SHARD_CODER = VarIntCoder.of();
private static final Coder<PaneInfo> PANE_INFO_CODER = NullableCoder.of(PaneInfoCoder.INSTANCE);
-
private final Coder<BoundedWindow> windowCoder;
+ private final Coder<DestinationT> destinationCoder;
- protected FileResultCoder(Coder<BoundedWindow> windowCoder) {
+ protected FileResultCoder(
+ Coder<BoundedWindow> windowCoder, Coder<DestinationT> destinationCoder) {
this.windowCoder = NullableCoder.of(windowCoder);
+ this.destinationCoder = destinationCoder;
}
- public static FileResultCoder of(Coder<BoundedWindow> windowCoder) {
- return new FileResultCoder(windowCoder);
+ public static <DestinationT> FileResultCoder<DestinationT> of(
+ Coder<BoundedWindow> windowCoder, Coder<DestinationT> destinationCoder) {
+ return new FileResultCoder<>(windowCoder, destinationCoder);
}
@Override
@@ -1038,8 +1106,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
}
@Override
- public void encode(FileResult value, OutputStream outStream)
- throws IOException {
+ public void encode(FileResult<DestinationT> value, OutputStream outStream) throws IOException {
if (value == null) {
throw new CoderException("cannot encode a null value");
}
@@ -1047,17 +1114,22 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
windowCoder.encode(value.getWindow(), outStream);
PANE_INFO_CODER.encode(value.getPaneInfo(), outStream);
SHARD_CODER.encode(value.getShard(), outStream);
+ destinationCoder.encode(value.getDestination(), outStream);
}
@Override
- public FileResult decode(InputStream inStream)
- throws IOException {
+ public FileResult<DestinationT> decode(InputStream inStream) throws IOException {
String tempFilename = FILENAME_CODER.decode(inStream);
BoundedWindow window = windowCoder.decode(inStream);
PaneInfo paneInfo = PANE_INFO_CODER.decode(inStream);
int shard = SHARD_CODER.decode(inStream);
- return new FileResult(FileSystems.matchNewResource(tempFilename, false /* isDirectory */),
- shard, window, paneInfo);
+ DestinationT destination = destinationCoder.decode(inStream);
+ return new FileResult<>(
+ FileSystems.matchNewResource(tempFilename, false /* isDirectory */),
+ shard,
+ window,
+ paneInfo,
+ destination);
}
@Override
@@ -1066,25 +1138,15 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
windowCoder.verifyDeterministic();
PANE_INFO_CODER.verifyDeterministic();
SHARD_CODER.verifyDeterministic();
+ destinationCoder.verifyDeterministic();
}
}
/**
- * Implementations create instances of {@link WritableByteChannel} used by {@link FileBasedSink}
- * and related classes to allow <em>decorating</em>, or otherwise transforming, the raw data that
- * would normally be written directly to the {@link WritableByteChannel} passed into
- * {@link WritableByteChannelFactory#create(WritableByteChannel)}.
- *
- * <p>Subclasses should override {@link #toString()} with something meaningful, as it is used when
- * building {@link DisplayData}.
+ * Provides hints about how to generate output files, such as a suggested filename suffix (e.g.
+ * based on the compression type), and the file MIME type.
*/
- public interface WritableByteChannelFactory extends Serializable {
- /**
- * @param channel the {@link WritableByteChannel} to wrap
- * @return the {@link WritableByteChannel} to be used during output
- */
- WritableByteChannel create(WritableByteChannel channel) throws IOException;
-
+ public interface OutputFileHints extends Serializable {
/**
* Returns the MIME type that should be used for the files that will hold the output data. May
* return {@code null} if this {@code WritableByteChannelFactory} does not meaningfully change
@@ -1101,6 +1163,23 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
* @return an optional filename suffix, eg, ".gz" is returned by {@link CompressionType#GZIP}
*/
@Nullable
- String getFilenameSuffix();
+ String getSuggestedFilenameSuffix();
+ }
+
+ /**
+ * Implementations create instances of {@link WritableByteChannel} used by {@link FileBasedSink}
+ * and related classes to allow <em>decorating</em>, or otherwise transforming, the raw data that
+ * would normally be written directly to the {@link WritableByteChannel} passed into {@link
+ * WritableByteChannelFactory#create(WritableByteChannel)}.
+ *
+ * <p>Subclasses should override {@link #toString()} with something meaningful, as it is used when
+ * building {@link DisplayData}.
+ */
+ public interface WritableByteChannelFactory extends OutputFileHints {
+ /**
+ * @param channel the {@link WritableByteChannel} to wrap
+ * @return the {@link WritableByteChannel} to be used during output
+ */
+ WritableByteChannel create(WritableByteChannel channel) throws IOException;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index e288075..6e7b243 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -45,6 +45,7 @@ import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.PBegin;
@@ -355,12 +356,11 @@ public class TFRecordIO {
public PDone expand(PCollection<byte[]> input) {
checkState(getOutputPrefix() != null,
"need to set the output prefix of a TFRecordIO.Write transform");
- WriteFiles<byte[]> write = WriteFiles.to(
+ WriteFiles<byte[], Void, byte[]> write =
+ WriteFiles.<byte[], Void, byte[]>to(
new TFRecordSink(
- getOutputPrefix(),
- getShardTemplate(),
- getFilenameSuffix(),
- getCompressionType()));
+ getOutputPrefix(), getShardTemplate(), getFilenameSuffix(), getCompressionType()),
+ SerializableFunctions.<byte[]>identity());
if (getNumShards() > 0) {
write = write.withNumShards(getNumShards());
}
@@ -546,20 +546,20 @@ public class TFRecordIO {
}
}
- /**
- * A {@link FileBasedSink} for TFRecord files. Produces TFRecord files.
- */
+ /** A {@link FileBasedSink} for TFRecord files. Produces TFRecord files. */
@VisibleForTesting
- static class TFRecordSink extends FileBasedSink<byte[]> {
+ static class TFRecordSink extends FileBasedSink<byte[], Void> {
@VisibleForTesting
- TFRecordSink(ValueProvider<ResourceId> outputPrefix,
+ TFRecordSink(
+ ValueProvider<ResourceId> outputPrefix,
@Nullable String shardTemplate,
@Nullable String suffix,
TFRecordIO.CompressionType compressionType) {
super(
outputPrefix,
- DefaultFilenamePolicy.constructUsingStandardParameters(
- outputPrefix, shardTemplate, suffix, false),
+ DynamicFileDestinations.constant(
+ DefaultFilenamePolicy.fromStandardParameters(
+ outputPrefix, shardTemplate, suffix, false)),
writableByteChannelFactory(compressionType));
}
@@ -571,7 +571,7 @@ public class TFRecordIO {
}
@Override
- public WriteOperation<byte[]> createWriteOperation() {
+ public WriteOperation<byte[], Void> createWriteOperation() {
return new TFRecordWriteOperation(this);
}
@@ -590,30 +590,24 @@ public class TFRecordIO {
return CompressionType.UNCOMPRESSED;
}
- /**
- * A {@link WriteOperation
- * WriteOperation} for TFRecord files.
- */
- private static class TFRecordWriteOperation extends WriteOperation<byte[]> {
+ /** A {@link WriteOperation WriteOperation} for TFRecord files. */
+ private static class TFRecordWriteOperation extends WriteOperation<byte[], Void> {
private TFRecordWriteOperation(TFRecordSink sink) {
super(sink);
}
@Override
- public Writer<byte[]> createWriter() throws Exception {
+ public Writer<byte[], Void> createWriter() throws Exception {
return new TFRecordWriter(this);
}
}
- /**
- * A {@link Writer Writer}
- * for TFRecord files.
- */
- private static class TFRecordWriter extends Writer<byte[]> {
+ /** A {@link Writer Writer} for TFRecord files. */
+ private static class TFRecordWriter extends Writer<byte[], Void> {
private WritableByteChannel outChannel;
private TFRecordCodec codec;
- private TFRecordWriter(WriteOperation<byte[]> writeOperation) {
+ private TFRecordWriter(WriteOperation<byte[], Void> writeOperation) {
super(writeOperation, MimeTypes.BINARY);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index f1eb7c0..5241589 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -22,12 +22,15 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
import org.apache.beam.sdk.io.Read.Bounded;
@@ -37,6 +40,7 @@ import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@@ -65,19 +69,8 @@ import org.apache.beam.sdk.values.PDone;
* <p>To write a {@link PCollection} to one or more text files, use {@code TextIO.write()}, using
* {@link TextIO.Write#to(String)} to specify the output prefix of the files to write.
*
- * <p>By default, all input is put into the global window before writing. If per-window writes are
- * desired - for example, when using a streaming runner -
- * {@link TextIO.Write#withWindowedWrites()} will cause windowing and triggering to be
- * preserved. When producing windowed writes, the number of output shards must be set explicitly
- * using {@link TextIO.Write#withNumShards(int)}; some runners may set this for you to a
- * runner-chosen value, so you may need not set it yourself. A {@link FilenamePolicy} can also be
- * set in case you need better control over naming files created by unique windows.
- * {@link DefaultFilenamePolicy} policy for producing unique filenames might not be appropriate
- * for your use case.
- *
- * <p>Any existing files with the same names as generated output files will be overwritten.
- *
* <p>For example:
+ *
* <pre>{@code
* // A simple Write to a local file (only runs locally):
* PCollection<String> lines = ...;
@@ -85,10 +78,49 @@ import org.apache.beam.sdk.values.PDone;
*
* // Same as above, only with Gzip compression:
* PCollection<String> lines = ...;
- * lines.apply(TextIO.write().to("/path/to/file.txt"));
+ * lines.apply(TextIO.write().to("/path/to/file.txt"))
* .withSuffix(".txt")
* .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP));
* }</pre>
+ *
+ * <p>By default, all input is put into the global window before writing. If per-window writes are
+ * desired - for example, when using a streaming runner - {@link TextIO.Write#withWindowedWrites()}
+ * will cause windowing and triggering to be preserved. When producing windowed writes with a
+ * streaming runner that supports triggers, the number of output shards must be set explicitly using
+ * {@link TextIO.Write#withNumShards(int)}; some runners may set this for you to a runner-chosen
+ * value, so you may need not set it yourself. If setting an explicit template using {@link
+ * TextIO.Write#withShardNameTemplate(String)}, make sure that the template contains placeholders
+ * for the window and the pane; W is expanded into the window text, and P into the pane; the default
+ * template will include both the window and the pane in the filename.
+ *
+ * <p>If you want better control over how filenames are generated than the default policy allows, a
+ * custom {@link FilenamePolicy} can also be set using {@link TextIO.Write#to(FilenamePolicy)}.
+ *
+ * <p>TextIO also supports dynamic, value-dependent file destinations. The most general form of this
+ * is done via {@link TextIO.Write#to(DynamicDestinations)}. A {@link DynamicDestinations} class
+ * allows you to convert any input value into a custom destination object, and map that destination
+ * object to a {@link FilenamePolicy}. This allows using different filename policies (or more
+ * commonly, differently-configured instances of the same policy) based on the input record. Often
+ * this is used in conjunction with {@link TextIO#writeCustomType(SerializableFunction)}, which
+ * allows your {@link DynamicDestinations} object to examine the input type and takes a format
+ * function to convert that type to a string for writing.
+ *
+ * <p>A convenience shortcut is provided for the case where the default naming policy is used, but
+ * different configurations of this policy are wanted based on the input record. Default naming
+ * policies can be configured using the {@link DefaultFilenamePolicy.Params} object.
+ *
+ * <pre>{@code
+ * PCollection<UserEvent>> lines = ...;
+ * lines.apply(TextIO.<UserEvent>writeCustomType(new FormatEvent())
+ * .to(new SerializableFunction<UserEvent, Params>() {
+ * public String apply(UserEvent value) {
+ * return new Params().withBaseFilename(baseDirectory + "/" + value.country());
+ * }
+ * }),
+ * new Params().withBaseFilename(baseDirectory + "/empty");
+ * }</pre>
+ *
+ * <p>Any existing files with the same names as generated output files will be overwritten.
*/
public class TextIO {
/**
@@ -105,11 +137,29 @@ public class TextIO {
* line.
*/
public static Write write() {
- return new AutoValue_TextIO_Write.Builder()
+ return new TextIO.Write();
+ }
+
+ /**
+ * A {@link PTransform} that writes a {@link PCollection} to a text file (or multiple text files
+ * matching a sharding pattern), with each element of the input collection encoded into its own
+ * line.
+ *
+ * <p>This version allows you to apply {@link TextIO} writes to a PCollection of a custom type
+ * {@link T}, along with a format function that converts the input type {@link T} to the String
+ * that will be written to the file. The advantage of this is it allows a user-provided {@link
+ * DynamicDestinations} object, set via {@link Write#to(DynamicDestinations)} to examine the
+ * user's custom type when choosing a destination.
+ */
+ public static <T> TypedWrite<T> writeCustomType(SerializableFunction<T, String> formatFunction) {
+ return new AutoValue_TextIO_TypedWrite.Builder<T>()
.setFilenamePrefix(null)
+ .setTempDirectory(null)
.setShardTemplate(null)
.setFilenameSuffix(null)
.setFilenamePolicy(null)
+ .setDynamicDestinations(null)
+ .setFormatFunction(formatFunction)
.setWritableByteChannelFactory(FileBasedSink.CompressionType.UNCOMPRESSED)
.setWindowedWrites(false)
.setNumShards(0)
@@ -223,18 +273,21 @@ public class TextIO {
}
}
-
- /////////////////////////////////////////////////////////////////////////////
+ // ///////////////////////////////////////////////////////////////////////////
/** Implementation of {@link #write}. */
@AutoValue
- public abstract static class Write extends PTransform<PCollection<String>, PDone> {
+ public abstract static class TypedWrite<T> extends PTransform<PCollection<T>, PDone> {
/** The prefix of each file written, combined with suffix and shardTemplate. */
@Nullable abstract ValueProvider<ResourceId> getFilenamePrefix();
/** The suffix of each file written, combined with prefix and shardTemplate. */
@Nullable abstract String getFilenameSuffix();
+ /** The base directory used for generating temporary files. */
+ @Nullable
+ abstract ValueProvider<ResourceId> getTempDirectory();
+
/** An optional header to add to each file. */
@Nullable abstract String getHeader();
@@ -250,6 +303,13 @@ public class TextIO {
/** A policy for naming output files. */
@Nullable abstract FilenamePolicy getFilenamePolicy();
+ /** Allows for value-dependent {@link DynamicDestinations} to be vended. */
+ @Nullable
+ abstract DynamicDestinations<T, ?> getDynamicDestinations();
+
+ /** A function that converts T to a String, for writing to the file. */
+ abstract SerializableFunction<T, String> getFormatFunction();
+
/** Whether to write windowed output files. */
abstract boolean getWindowedWrites();
@@ -259,66 +319,68 @@ public class TextIO {
*/
abstract WritableByteChannelFactory getWritableByteChannelFactory();
- abstract Builder toBuilder();
+ abstract Builder<T> toBuilder();
@AutoValue.Builder
- abstract static class Builder {
- abstract Builder setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix);
- abstract Builder setShardTemplate(@Nullable String shardTemplate);
- abstract Builder setFilenameSuffix(@Nullable String filenameSuffix);
- abstract Builder setHeader(@Nullable String header);
- abstract Builder setFooter(@Nullable String footer);
- abstract Builder setFilenamePolicy(@Nullable FilenamePolicy filenamePolicy);
- abstract Builder setNumShards(int numShards);
- abstract Builder setWindowedWrites(boolean windowedWrites);
- abstract Builder setWritableByteChannelFactory(
+ abstract static class Builder<T> {
+ abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix);
+
+ abstract Builder<T> setTempDirectory(ValueProvider<ResourceId> tempDirectory);
+
+ abstract Builder<T> setShardTemplate(@Nullable String shardTemplate);
+
+ abstract Builder<T> setFilenameSuffix(@Nullable String filenameSuffix);
+
+ abstract Builder<T> setHeader(@Nullable String header);
+
+ abstract Builder<T> setFooter(@Nullable String footer);
+
+ abstract Builder<T> setFilenamePolicy(@Nullable FilenamePolicy filenamePolicy);
+
+ abstract Builder<T> setDynamicDestinations(
+ @Nullable DynamicDestinations<T, ?> dynamicDestinations);
+
+ abstract Builder<T> setFormatFunction(SerializableFunction<T, String> formatFunction);
+
+ abstract Builder<T> setNumShards(int numShards);
+
+ abstract Builder<T> setWindowedWrites(boolean windowedWrites);
+
+ abstract Builder<T> setWritableByteChannelFactory(
WritableByteChannelFactory writableByteChannelFactory);
- abstract Write build();
+ abstract TypedWrite<T> build();
}
/**
- * Writes to text files with the given prefix. The given {@code prefix} can reference any
- * {@link FileSystem} on the classpath.
- *
- * <p>The name of the output files will be determined by the {@link FilenamePolicy} used.
+ * Writes to text files with the given prefix. The given {@code prefix} can reference any {@link
+ * FileSystem} on the classpath. This prefix is used by the {@link DefaultFilenamePolicy} to
+ * generate filenames.
*
* <p>By default, a {@link DefaultFilenamePolicy} will be used built using the specified prefix
- * to define the base output directory and file prefix, a shard identifier (see
- * {@link #withNumShards(int)}), and a common suffix (if supplied using
- * {@link #withSuffix(String)}).
+ * to define the base output directory and file prefix, a shard identifier (see {@link
+ * #withNumShards(int)}), and a common suffix (if supplied using {@link #withSuffix(String)}).
+ *
+ * <p>This default policy can be overridden using {@link #to(FilenamePolicy)}, in which case
+ * {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should not be set.
+ * Custom filename policies do not automatically see this prefix - you should explicitly pass
+ * the prefix into your {@link FilenamePolicy} object if you need this.
*
- * <p>This default policy can be overridden using {@link #withFilenamePolicy(FilenamePolicy)},
- * in which case {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should
- * not be set.
+ * <p>If {@link #withTempDirectory} has not been called, this filename prefix will be used to
+ * infer a directory for temporary files.
*/
- public Write to(String filenamePrefix) {
+ public TypedWrite<T> to(String filenamePrefix) {
return to(FileBasedSink.convertToFileResourceIfPossible(filenamePrefix));
}
- /**
- * Writes to text files with prefix from the given resource.
- *
- * <p>The name of the output files will be determined by the {@link FilenamePolicy} used.
- *
- * <p>By default, a {@link DefaultFilenamePolicy} will be used built using the specified prefix
- * to define the base output directory and file prefix, a shard identifier (see
- * {@link #withNumShards(int)}), and a common suffix (if supplied using
- * {@link #withSuffix(String)}).
- *
- * <p>This default policy can be overridden using {@link #withFilenamePolicy(FilenamePolicy)},
- * in which case {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should
- * not be set.
- */
+ /** Like {@link #to(String)}. */
@Experimental(Kind.FILESYSTEM)
- public Write to(ResourceId filenamePrefix) {
+ public TypedWrite<T> to(ResourceId filenamePrefix) {
return toResource(StaticValueProvider.of(filenamePrefix));
}
- /**
- * Like {@link #to(String)}.
- */
- public Write to(ValueProvider<String> outputPrefix) {
+ /** Like {@link #to(String)}. */
+ public TypedWrite<T> to(ValueProvider<String> outputPrefix) {
return toResource(NestedValueProvider.of(outputPrefix,
new SerializableFunction<String, ResourceId>() {
@Override
@@ -329,43 +391,77 @@ public class TextIO {
}
/**
- * Like {@link #to(ResourceId)}.
+ * Writes to files named according to the given {@link FileBasedSink.FilenamePolicy}. A
+ * directory for temporary files must be specified using {@link #withTempDirectory}.
*/
+ public TypedWrite<T> to(FilenamePolicy filenamePolicy) {
+ return toBuilder().setFilenamePolicy(filenamePolicy).build();
+ }
+
+ /**
+ * Use a {@link DynamicDestinations} object to vend {@link FilenamePolicy} objects. These
+ * objects can examine the input record when creating a {@link FilenamePolicy}. A directory for
+ * temporary files must be specified using {@link #withTempDirectory}.
+ */
+ public TypedWrite<T> to(DynamicDestinations<T, ?> dynamicDestinations) {
+ return toBuilder().setDynamicDestinations(dynamicDestinations).build();
+ }
+
+ /**
+ * Write to dynamic destinations using the default filename policy. The destinationFunction maps
+ * the input record to a {@link DefaultFilenamePolicy.Params} object that specifies where the
+ * records should be written (base filename, file suffix, and shard template). The
+ * emptyDestination parameter specified where empty files should be written for when the written
+ * {@link PCollection} is empty.
+ */
+ public TypedWrite<T> to(
+ SerializableFunction<T, Params> destinationFunction, Params emptyDestination) {
+ return to(DynamicFileDestinations.toDefaultPolicies(destinationFunction, emptyDestination));
+ }
+
+ /** Like {@link #to(ResourceId)}. */
@Experimental(Kind.FILESYSTEM)
- public Write toResource(ValueProvider<ResourceId> filenamePrefix) {
+ public TypedWrite<T> toResource(ValueProvider<ResourceId> filenamePrefix) {
return toBuilder().setFilenamePrefix(filenamePrefix).build();
}
+ /** Set the base directory used to generate temporary files. */
+ @Experimental(Kind.FILESYSTEM)
+ public TypedWrite<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
+ return toBuilder().setTempDirectory(tempDirectory).build();
+ }
+
+ /** Set the base directory used to generate temporary files. */
+ @Experimental(Kind.FILESYSTEM)
+ public TypedWrite<T> withTempDirectory(ResourceId tempDirectory) {
+ return withTempDirectory(StaticValueProvider.of(tempDirectory));
+ }
+
/**
* Uses the given {@link ShardNameTemplate} for naming output files. This option may only be
- * used when {@link #withFilenamePolicy(FilenamePolicy)} has not been configured.
+ * used when using one of the default filename-prefix to() overrides - i.e. not when using
+ * either {@link #to(FilenamePolicy)} or {@link #to(DynamicDestinations)}.
*
* <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
* used.
*/
- public Write withShardNameTemplate(String shardTemplate) {
+ public TypedWrite<T> withShardNameTemplate(String shardTemplate) {
return toBuilder().setShardTemplate(shardTemplate).build();
}
/**
- * Configures the filename suffix for written files. This option may only be used when
- * {@link #withFilenamePolicy(FilenamePolicy)} has not been configured.
+ * Configures the filename suffix for written files. This option may only be used when using one
+ * of the default filename-prefix to() overrides - i.e. not when using either {@link
+ * #to(FilenamePolicy)} or {@link #to(DynamicDestinations)}.
*
* <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
* used.
*/
- public Write withSuffix(String filenameSuffix) {
+ public TypedWrite<T> withSuffix(String filenameSuffix) {
return toBuilder().setFilenameSuffix(filenameSuffix).build();
}
/**
- * Configures the {@link FileBasedSink.FilenamePolicy} that will be used to name written files.
- */
- public Write withFilenamePolicy(FilenamePolicy filenamePolicy) {
- return toBuilder().setFilenamePolicy(filenamePolicy).build();
- }
-
- /**
* Configures the number of output shards produced overall (when using unwindowed writes) or
* per-window (when using windowed writes).
*
@@ -375,14 +471,13 @@ public class TextIO {
*
* @param numShards the number of shards to use, or 0 to let the system decide.
*/
- public Write withNumShards(int numShards) {
+ public TypedWrite<T> withNumShards(int numShards) {
checkArgument(numShards >= 0);
return toBuilder().setNumShards(numShards).build();
}
/**
- * Forces a single file as output and empty shard name template. This option is only compatible
- * with unwindowed writes.
+ * Forces a single file as output and empty shard name template.
*
* <p>For unwindowed writes, constraining the number of shards is likely to reduce the
* performance of a pipeline. Setting this value is not recommended unless you require a
@@ -390,7 +485,7 @@ public class TextIO {
*
* <p>This is equivalent to {@code .withNumShards(1).withShardNameTemplate("")}
*/
- public Write withoutSharding() {
+ public TypedWrite<T> withoutSharding() {
return withNumShards(1).withShardNameTemplate("");
}
@@ -399,7 +494,7 @@ public class TextIO {
*
* <p>A {@code null} value will clear any previously configured header.
*/
- public Write withHeader(@Nullable String header) {
+ public TypedWrite<T> withHeader(@Nullable String header) {
return toBuilder().setHeader(header).build();
}
@@ -408,48 +503,82 @@ public class TextIO {
*
* <p>A {@code null} value will clear any previously configured footer.
*/
- public Write withFooter(@Nullable String footer) {
+ public TypedWrite<T> withFooter(@Nullable String footer) {
return toBuilder().setFooter(footer).build();
}
/**
- * Returns a transform for writing to text files like this one but that has the given
- * {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output.
- * The default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
+ * Returns a transform for writing to text files like this one but that has the given {@link
+ * WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output. The
+ * default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
*
* <p>A {@code null} value will reset the value to the default value mentioned above.
*/
- public Write withWritableByteChannelFactory(
+ public TypedWrite<T> withWritableByteChannelFactory(
WritableByteChannelFactory writableByteChannelFactory) {
return toBuilder().setWritableByteChannelFactory(writableByteChannelFactory).build();
}
- public Write withWindowedWrites() {
+ /**
+ * Preserves windowing of input elements and writes them to files based on the element's window.
+ *
+ * <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated using
+ * {@link FilenamePolicy#windowedFilename}. See also {@link WriteFiles#withWindowedWrites()}.
+ */
+ public TypedWrite<T> withWindowedWrites() {
return toBuilder().setWindowedWrites(true).build();
}
+ private DynamicDestinations<T, ?> resolveDynamicDestinations() {
+ DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations();
+ if (dynamicDestinations == null) {
+ FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
+ if (usedFilenamePolicy == null) {
+ usedFilenamePolicy =
+ DefaultFilenamePolicy.fromStandardParameters(
+ getFilenamePrefix(),
+ getShardTemplate(),
+ getFilenameSuffix(),
+ getWindowedWrites());
+ }
+ dynamicDestinations = DynamicFileDestinations.constant(usedFilenamePolicy);
+ }
+ return dynamicDestinations;
+ }
+
@Override
- public PDone expand(PCollection<String> input) {
- checkState(getFilenamePrefix() != null,
- "Need to set the filename prefix of a TextIO.Write transform.");
+ public PDone expand(PCollection<T> input) {
+ checkState(
+ getFilenamePrefix() != null || getTempDirectory() != null,
+ "Need to set either the filename prefix or the tempDirectory of a TextIO.Write "
+ + "transform.");
checkState(
- (getFilenamePolicy() == null)
- || (getShardTemplate() == null && getFilenameSuffix() == null),
- "Cannot set a filename policy and also a filename template or suffix.");
-
- FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
- if (usedFilenamePolicy == null) {
- usedFilenamePolicy = DefaultFilenamePolicy.constructUsingStandardParameters(
- getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites());
+ getFilenamePolicy() == null || getDynamicDestinations() == null,
+ "Cannot specify both a filename policy and dynamic destinations");
+ if (getFilenamePolicy() != null || getDynamicDestinations() != null) {
+ checkState(
+ getShardTemplate() == null && getFilenameSuffix() == null,
+ "shardTemplate and filenameSuffix should only be used with the default "
+ + "filename policy");
}
- WriteFiles<String> write =
+ return expandTyped(input, resolveDynamicDestinations());
+ }
+
+ public <DestinationT> PDone expandTyped(
+ PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) {
+ ValueProvider<ResourceId> tempDirectory = getTempDirectory();
+ if (tempDirectory == null) {
+ tempDirectory = getFilenamePrefix();
+ }
+ WriteFiles<T, DestinationT, String> write =
WriteFiles.to(
- new TextSink(
- getFilenamePrefix(),
- usedFilenamePolicy,
+ new TextSink<>(
+ tempDirectory,
+ dynamicDestinations,
getHeader(),
getFooter(),
- getWritableByteChannelFactory()));
+ getWritableByteChannelFactory()),
+ getFormatFunction());
if (getNumShards() > 0) {
write = write.withNumShards(getNumShards());
}
@@ -463,27 +592,26 @@ public class TextIO {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- String prefixString = "";
- if (getFilenamePrefix() != null) {
- prefixString = getFilenamePrefix().isAccessible()
- ? getFilenamePrefix().get().toString() : getFilenamePrefix().toString();
+ resolveDynamicDestinations().populateDisplayData(builder);
+ String tempDirectory = null;
+ if (getTempDirectory() != null) {
+ tempDirectory =
+ getTempDirectory().isAccessible()
+ ? getTempDirectory().get().toString()
+ : getTempDirectory().toString();
}
builder
- .addIfNotNull(DisplayData.item("filePrefix", prefixString)
- .withLabel("Output File Prefix"))
- .addIfNotNull(DisplayData.item("fileSuffix", getFilenameSuffix())
- .withLabel("Output File Suffix"))
- .addIfNotNull(DisplayData.item("shardNameTemplate", getShardTemplate())
- .withLabel("Output Shard Name Template"))
- .addIfNotDefault(DisplayData.item("numShards", getNumShards())
- .withLabel("Maximum Output Shards"), 0)
- .addIfNotNull(DisplayData.item("fileHeader", getHeader())
- .withLabel("File Header"))
- .addIfNotNull(DisplayData.item("fileFooter", getFooter())
- .withLabel("File Footer"))
- .add(DisplayData
- .item("writableByteChannelFactory", getWritableByteChannelFactory().toString())
- .withLabel("Compression/Transformation Type"));
+ .addIfNotDefault(
+ DisplayData.item("numShards", getNumShards()).withLabel("Maximum Output Shards"), 0)
+ .addIfNotNull(
+ DisplayData.item("tempDirectory", tempDirectory)
+ .withLabel("Directory for temporary files"))
+ .addIfNotNull(DisplayData.item("fileHeader", getHeader()).withLabel("File Header"))
+ .addIfNotNull(DisplayData.item("fileFooter", getFooter()).withLabel("File Footer"))
+ .add(
+ DisplayData.item(
+ "writableByteChannelFactory", getWritableByteChannelFactory().toString())
+ .withLabel("Compression/Transformation Type"));
}
@Override
@@ -493,6 +621,128 @@ public class TextIO {
}
/**
+ * This class is used as the default return value of {@link TextIO#write()}.
+ *
+ * <p>All methods in this class delegate to the appropriate method of {@link TextIO.TypedWrite}.
+ * This class exists for backwards compatibility, and will be removed in Beam 3.0.
+ */
+ public static class Write extends PTransform<PCollection<String>, PDone> {
+ @VisibleForTesting TypedWrite<String> inner;
+
+ Write() {
+ this(TextIO.writeCustomType(SerializableFunctions.<String>identity()));
+ }
+
+ Write(TypedWrite<String> inner) {
+ this.inner = inner;
+ }
+
+ /** See {@link TypedWrite#to(String)}. */
+ public Write to(String filenamePrefix) {
+ return new Write(inner.to(filenamePrefix));
+ }
+
+ /** See {@link TypedWrite#to(ResourceId)}. */
+ @Experimental(Kind.FILESYSTEM)
+ public Write to(ResourceId filenamePrefix) {
+ return new Write(inner.to(filenamePrefix));
+ }
+
+ /** See {@link TypedWrite#to(ValueProvider)}. */
+ public Write to(ValueProvider<String> outputPrefix) {
+ return new Write(inner.to(outputPrefix));
+ }
+
+ /** See {@link TypedWrite#toResource(ValueProvider)}. */
+ @Experimental(Kind.FILESYSTEM)
+ public Write toResource(ValueProvider<ResourceId> filenamePrefix) {
+ return new Write(inner.toResource(filenamePrefix));
+ }
+
+ /** See {@link TypedWrite#to(FilenamePolicy)}. */
+ @Experimental(Kind.FILESYSTEM)
+ public Write to(FilenamePolicy filenamePolicy) {
+ return new Write(inner.to(filenamePolicy));
+ }
+
+ /** See {@link TypedWrite#to(DynamicDestinations)}. */
+ @Experimental(Kind.FILESYSTEM)
+ public Write to(DynamicDestinations<String, ?> dynamicDestinations) {
+ return new Write(inner.to(dynamicDestinations));
+ }
+
+ /** See {@link TypedWrite#to(SerializableFunction, Params)}. */
+ @Experimental(Kind.FILESYSTEM)
+ public Write to(
+ SerializableFunction<String, Params> destinationFunction, Params emptyDestination) {
+ return new Write(inner.to(destinationFunction, emptyDestination));
+ }
+
+ /** See {@link TypedWrite#withTempDirectory(ValueProvider)}. */
+ @Experimental(Kind.FILESYSTEM)
+ public Write withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
+ return new Write(inner.withTempDirectory(tempDirectory));
+ }
+
+ /** See {@link TypedWrite#withTempDirectory(ResourceId)}. */
+ @Experimental(Kind.FILESYSTEM)
+ public Write withTempDirectory(ResourceId tempDirectory) {
+ return new Write(inner.withTempDirectory(tempDirectory));
+ }
+
+ /** See {@link TypedWrite#withShardNameTemplate(String)}. */
+ public Write withShardNameTemplate(String shardTemplate) {
+ return new Write(inner.withShardNameTemplate(shardTemplate));
+ }
+
+ /** See {@link TypedWrite#withSuffix(String)}. */
+ public Write withSuffix(String filenameSuffix) {
+ return new Write(inner.withSuffix(filenameSuffix));
+ }
+
+ /** See {@link TypedWrite#withNumShards(int)}. */
+ public Write withNumShards(int numShards) {
+ return new Write(inner.withNumShards(numShards));
+ }
+
+ /** See {@link TypedWrite#withoutSharding()}. */
+ public Write withoutSharding() {
+ return new Write(inner.withoutSharding());
+ }
+
+ /** See {@link TypedWrite#withHeader(String)}. */
+ public Write withHeader(@Nullable String header) {
+ return new Write(inner.withHeader(header));
+ }
+
+ /** See {@link TypedWrite#withFooter(String)}. */
+ public Write withFooter(@Nullable String footer) {
+ return new Write(inner.withFooter(footer));
+ }
+
+ /** See {@link TypedWrite#withWritableByteChannelFactory(WritableByteChannelFactory)}. */
+ public Write withWritableByteChannelFactory(
+ WritableByteChannelFactory writableByteChannelFactory) {
+ return new Write(inner.withWritableByteChannelFactory(writableByteChannelFactory));
+ }
+
+ /** See {@link TypedWrite#withWindowedWrites}. */
+ public Write withWindowedWrites() {
+ return new Write(inner.withWindowedWrites());
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ inner.populateDisplayData(builder);
+ }
+
+ @Override
+ public PDone expand(PCollection<String> input) {
+ return inner.expand(input);
+ }
+ }
+
+ /**
* Possible text file compression types.
*/
public enum CompressionType {